項(xiàng)目背景
最近公司某物聯(lián)網(wǎng)項(xiàng)目需要使用socket長(zhǎng)連接進(jìn)行消息通訊,搗鼓了一版代碼上線(xiàn),結(jié)果BUG不斷,本猿寢食難安,于是求助度娘,數(shù)日未眠項(xiàng)目終于平穩(wěn)運(yùn)行了,本著開(kāi)源共享的精神,本猿把項(xiàng)目代碼提煉成了一個(gè)demo項(xiàng)目,盡量摒棄了其中丑陋的業(yè)務(wù)部分,希望與同學(xué)們共同學(xué)習(xí)進(jìn)步。
基于 Spring Boot + MyBatis Plus + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶(hù)小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶(hù)、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
正文
一、項(xiàng)目架構(gòu)
本項(xiàng)目使用了netty、redis以及springboot2.2.0
二、項(xiàng)目模塊
本項(xiàng)目目錄結(jié)構(gòu)如下圖:
netty-tcp-core是公共模塊,主要是工具類(lèi)。netty-tcp-server是netty服務(wù)端,服務(wù)端僅作測(cè)試使用,實(shí)際項(xiàng)目中我們只使用了客戶(hù)端。netty-tcp-client是客戶(hù)端,也是本文的重點(diǎn)。
三、業(yè)務(wù)流程
我們實(shí)際項(xiàng)目中使用RocketMQ作為消息隊(duì)列,本項(xiàng)目由于是demo項(xiàng)目于是改為了BlockingQueue。數(shù)據(jù)流為:
生產(chǎn)者->消息隊(duì)列->消費(fèi)者(客戶(hù)端)->tcp通道->服務(wù)端->tcp通道->客戶(hù)端。
當(dāng)消費(fèi)者接收到某設(shè)備發(fā)送的消息后,將判斷緩存中是否存在該設(shè)備與服務(wù)端的連接,如果存在并且通道活躍則使用該通道發(fā)送消息,如果不存在則創(chuàng)建通道并在通道激活后立即發(fā)送消息,當(dāng)客戶(hù)端收到來(lái)自服務(wù)端的消息時(shí)進(jìn)行響應(yīng)的業(yè)務(wù)處理。
四、代碼詳解
1.消息隊(duì)列
由于本demo項(xiàng)目移除了消息中間件,于是需要自己創(chuàng)建一個(gè)本地隊(duì)列模擬真實(shí)使用場(chǎng)景
packageorg.example.client; importorg.example.client.model.NettyMsgModel; importjava.util.concurrent.ArrayBlockingQueue; /** *本項(xiàng)目為演示使用本地隊(duì)列實(shí)際生產(chǎn)中應(yīng)該使用消息中間件代替(rocketmq或rabbitmq) * *@authorReWind00 *@date2023/2/1511:20 */ publicclassQueueHolder{ privatestaticfinalArrayBlockingQueuequeue=newArrayBlockingQueue<>(100); publicstaticArrayBlockingQueue get(){ returnqueue; } }
使用一個(gè)類(lèi)保存隊(duì)列的靜態(tài)實(shí)例以便在任何類(lèi)中都可以快速引用。接下來(lái)我們需要啟動(dòng)一個(gè)線(xiàn)程去監(jiān)聽(tīng)隊(duì)列中的消息,一但消息投遞到隊(duì)列中,我們就取出消息然后異步多線(xiàn)程處理該消息。
publicclassLoopThreadimplementsRunnable{ @Override publicvoidrun(){ for(inti=0;i{ while(true){ //取走BlockingQueue里排在首位的對(duì)象,若BlockingQueue為空,阻斷進(jìn)入等待狀態(tài)直到 try{ NettyMsgModelnettyMsgModel=QueueHolder.get().take(); messageProcessor.process(nettyMsgModel); }catch(InterruptedExceptione){ log.error(e.getMessage(),e); } } }); } } }
使用take方法會(huì)使該線(xiàn)程一直阻塞直到隊(duì)列收到消息后進(jìn)入下一次循環(huán)。
2.執(zhí)行類(lèi)
process方法來(lái)自于MessageProcessor類(lèi),該類(lèi)為單例,但是會(huì)有多線(xiàn)程同時(shí)執(zhí)行。
publicvoidprocess(NettyMsgModelnettyMsgModel){ Stringimei=nettyMsgModel.getImei(); try{ synchronized(this){//為避免收到同一臺(tái)設(shè)備多條消息后重復(fù)創(chuàng)建客戶(hù)端,必須加鎖 if(redisCache.hasKey(NETTY_QUEUE_LOCK+imei)){//上一條消息處理中 log.info("imei={}消息處理中,重新入列",imei); //放回隊(duì)列重新等待消費(fèi)延遲x秒(實(shí)際項(xiàng)目中應(yīng)該使用rocketmq或者rabbitmq實(shí)現(xiàn)延遲消費(fèi)) newTimer().schedule(newTimerTask(){ @Override publicvoidrun(){ QueueHolder.get().offer(nettyMsgModel); } },2000); log.info("imei={}消息處理中,重新入列完成",imei); return; }else{ //如果沒(méi)有在連接中的直接加鎖 redisCache.setCacheObject(NETTY_QUEUE_LOCK+imei,"1",120,TimeUnit.SECONDS); } } //緩存中存在則發(fā)送消息 if(NettyClientHolder.get().containsKey(imei)){ NettyClientnettyClient=NettyClientHolder.get().get(imei); if(null!=nettyClient.getChannelFuture()&&nettyClient.getChannelFuture().channel().isActive()){//通道活躍直接發(fā)送消息 if(!nettyClient.getChannelFuture().channel().isWritable()){ log.warn("警告,通道不可寫(xiě),imei={},channelId={}",nettyClient.getImei(), nettyClient.getChannelFuture().channel().id()); } nettyClient.send(nettyMsgModel.getMsg()); }else{ log.info("clientimei={},通道不活躍,主動(dòng)關(guān)閉",nettyClient.getImei()); nettyClient.close(); //重新創(chuàng)建客戶(hù)端發(fā)送 this.createClientAndSend(nettyMsgModel); } }else{//緩存中不存在則創(chuàng)建新的客戶(hù)端 this.createClientAndSend(nettyMsgModel); } }catch(Exceptione){ log.error(e.getMessage(),e); }finally{ //執(zhí)行完后解鎖 redisCache.deleteObject(NETTY_QUEUE_LOCK+imei); } }
其中imei是我們?cè)O(shè)備的唯一標(biāo)識(shí),我們可以用imei作為緩存的key來(lái)確認(rèn)是否已創(chuàng)建過(guò)連接。由于我們消息的并發(fā)量可能會(huì)很大,所以存在當(dāng)某設(shè)備的連接正在創(chuàng)建的過(guò)程中,另一個(gè)線(xiàn)程收到該設(shè)備消息也開(kāi)始創(chuàng)建連接的情況,所以我們使用synchronized 代碼塊以及redis分布式鎖來(lái)避免此情況的發(fā)生。當(dāng)一條消息獲得鎖后,在鎖釋放前,后續(xù)消息將會(huì)被重新放回消息隊(duì)列并延遲消費(fèi)。
獲取鎖的線(xiàn)程會(huì)根據(jù)imei判斷緩存是否存在連接,如果存在直接發(fā)送消息,如果不存在則進(jìn)入創(chuàng)建客戶(hù)端的方法。
privatevoidcreateClientAndSend(NettyMsgModelnettyMsgModel){ log.info("創(chuàng)建客戶(hù)端執(zhí)行中imei={}",nettyMsgModel.getImei()); //此處的DemoClientHandler可以根據(jù)自己的業(yè)務(wù)定義 NettyClientnettyClient=SpringUtils.getBean(NettyClient.class,nettyMsgModel.getImei(),nettyMsgModel.getBizData(), this.createDefaultWorkGroup(this.workerThread),DemoClientHandler.class); executor.execute(nettyClient);//執(zhí)行客戶(hù)端初始化 try{ //利用鎖等待客戶(hù)端激活 synchronized(nettyClient){ longc1=System.currentTimeMillis(); nettyClient.wait(5000);//最多阻塞5秒5秒后客戶(hù)端仍然未激活則自動(dòng)解鎖 longc2=System.currentTimeMillis(); log.info("創(chuàng)建客戶(hù)端wait耗時(shí)={}ms",c2-c1); } if(null!=nettyClient.getChannelFuture()&&nettyClient.getChannelFuture().channel().isActive()){//連接成功 //存入緩存 NettyClientHolder.get().put(nettyMsgModel.getImei(),nettyClient); //客戶(hù)端激活后發(fā)送消息 nettyClient.send(nettyMsgModel.getMsg()); }else{//連接失敗 log.warn("客戶(hù)端創(chuàng)建失敗,imei={}",nettyMsgModel.getImei()); nettyClient.close(); //可以把消息重新入列處理 } }catch(Exceptione){ log.error("客戶(hù)端初始化發(fā)送消息異常===>{}",e.getMessage(),e); } }
當(dāng)netty客戶(hù)端實(shí)例創(chuàng)建后使用線(xiàn)程池執(zhí)行初始化,由于是異步執(zhí)行,我們此時(shí)立刻發(fā)送消息很可能客戶(hù)端還沒(méi)有完成連接,因此必須加鎖等待。進(jìn)入synchronized 代碼塊,使用wait方法等待客戶(hù)端激活后解鎖,參數(shù)5000為自動(dòng)解鎖的毫秒數(shù),意思是如果客戶(hù)端出現(xiàn)異常情況遲遲未能連接成功并激活通道、解鎖,則最多5000毫秒后該鎖自動(dòng)解開(kāi)。
這參數(shù)在實(shí)際使用時(shí)可以視情況調(diào)整,在并發(fā)量很大的情況下,5秒的阻塞可能會(huì)導(dǎo)致線(xiàn)程池耗盡,或內(nèi)存溢出。待客戶(hù)端創(chuàng)建成功并激活后則立即發(fā)送消息。
3.客戶(hù)端
packageorg.example.client; importio.netty.bootstrap.Bootstrap; importio.netty.buffer.Unpooled; importio.netty.channel.*; importio.netty.channel.socket.SocketChannel; importio.netty.channel.socket.nio.NioSocketChannel; importio.netty.handler.codec.DelimiterBasedFrameDecoder; importio.netty.handler.codec.string.StringDecoder; importio.netty.handler.codec.string.StringEncoder; importio.netty.handler.timeout.IdleStateHandler; importio.netty.util.CharsetUtil; importlombok.Getter; importlombok.NoArgsConstructor; importlombok.extern.slf4j.Slf4j; importorg.example.client.handler.BaseClientHandler; importorg.example.core.util.SpringUtils; importorg.springframework.beans.factory.annotation.Value; importorg.springframework.context.annotation.Scope; importorg.springframework.stereotype.Component; importorg.springframework.util.StringUtils; importjava.util.Map; importjava.util.concurrent.TimeUnit; importjava.util.concurrent.atomic.AtomicBoolean; importjava.util.concurrent.atomic.AtomicInteger; /** *@authorReWind00 *@date2023/2/159:59 */ @Slf4j @Component @Scope("prototype") @Getter @NoArgsConstructor publicclassNettyClientimplementsRunnable{ @Value("${netty.server.port}") privateintport; @Value("${netty.server.host}") privateStringhost; //客戶(hù)端唯一標(biāo)識(shí) privateStringimei; //自定義業(yè)務(wù)數(shù)據(jù) privateMapbizData; privateEventLoopGroupworkGroup; privateClass clientHandlerClass; privateChannelFuturechannelFuture; publicNettyClient(Stringimei,Map bizData,EventLoopGroupworkGroup,Class clientHandlerClass){ this.imei=imei; this.bizData=bizData; this.workGroup=workGroup; this.clientHandlerClass=clientHandlerClass; } @Override publicvoidrun(){ try{ this.init(); log.info("客戶(hù)端啟動(dòng)imei={}",imei); }catch(Exceptione){ log.error("客戶(hù)端啟動(dòng)失敗:{}",e.getMessage(),e); } } publicvoidclose(){ if(null!=this.channelFuture){ this.channelFuture.channel().close(); } NettyClientHolder.get().remove(this.imei); } publicvoidsend(Stringmessage){ try{ if(!this.channelFuture.channel().isActive()){ log.info("通道不活躍imei={}",this.imei); return; } if(!StringUtils.isEmpty(message)){ log.info("隊(duì)列消息發(fā)送===>{}",message); this.channelFuture.channel().writeAndFlush(message); } }catch(Exceptione){ log.error(e.getMessage(),e); } } privatevoidinit()throwsException{ //將本實(shí)例傳遞到handler BaseClientHandlerclientHandler=SpringUtils.getBean(clientHandlerClass,this); Bootstrapb=newBootstrap(); //2通過(guò)輔助類(lèi)去構(gòu)造server/client b.group(workGroup) .channel(NioSocketChannel.class) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,3000) .option(ChannelOption.SO_RCVBUF,1024*32) .option(ChannelOption.SO_SNDBUF,1024*32) .handler(newChannelInitializer (){ @Override protectedvoidinitChannel(SocketChannelch)throwsException{ ch.pipeline().addLast(newDelimiterBasedFrameDecoder(1024*1024,Unpooled.copiedBuffer(" ".getBytes()))); ch.pipeline().addLast(newStringEncoder(CharsetUtil.UTF_8));//String解碼。 ch.pipeline().addLast(newStringDecoder(CharsetUtil.UTF_8));//String解碼。 ////心跳設(shè)置 ch.pipeline().addLast(newIdleStateHandler(0,0,600,TimeUnit.SECONDS)); ch.pipeline().addLast(clientHandler); } }); this.connect(b); } privatevoidconnect(Bootstrapb)throwsInterruptedException{ longc1=System.currentTimeMillis(); finalintmaxRetries=2;//重連2次 finalAtomicIntegercount=newAtomicInteger(); finalAtomicBooleanflag=newAtomicBoolean(false); try{ this.channelFuture=b.connect(host,port).addListener( newChannelFutureListener(){ publicvoidoperationComplete(ChannelFuturefuture)throwsException{ if(!future.isSuccess()){ if(count.incrementAndGet()>maxRetries){ log.warn("imei={}重連超過(guò){}次",imei,maxRetries); }else{ log.info("imei={}重連第{}次",imei,count); b.connect(host,port).addListener(this); } }else{ log.info("imei={}連接成功,連接IP:{}連接端口:{}",imei,host,port); flag.set(true); } } }).sync();//同步連接 }catch(Exceptione){ log.error(e.getMessage(),e); } log.info("設(shè)備imei={},channelId={}連接耗時(shí)={}ms",imei,channelFuture.channel().id(),System.currentTimeMillis()-c1); if(flag.get()){ channelFuture.channel().closeFuture().sync();//連接成功后將持續(xù)阻塞該線(xiàn)程 } } }
netty客戶(hù)端為多實(shí)例,每個(gè)實(shí)例綁定一個(gè)線(xiàn)程,持續(xù)阻塞到客戶(hù)端關(guān)閉為止,每個(gè)客戶(hù)端中可以保存自己的業(yè)務(wù)數(shù)據(jù),以便在后續(xù)與服務(wù)端交互時(shí)處理業(yè)務(wù)使用??蛻?hù)端執(zhí)行連接時(shí),給了2次重試的機(jī)會(huì),如果3次都沒(méi)連接成功則放棄。后續(xù)可以選擇將該消息重新入列消費(fèi)。我們實(shí)際項(xiàng)目中,此處還應(yīng)該預(yù)先給服務(wù)端發(fā)送一條登錄消息,待服務(wù)端確認(rèn)后才能執(zhí)行后續(xù)通訊,這需要視實(shí)際情況進(jìn)行調(diào)整。
另一個(gè)需要注意的點(diǎn)是EventLoopGroup是從構(gòu)造函數(shù)傳入的,而不是在客戶(hù)端中創(chuàng)建的,因?yàn)楫?dāng)客戶(hù)端數(shù)量非常多時(shí),每個(gè)客戶(hù)端都創(chuàng)建自己的線(xiàn)程組會(huì)極大的消耗服務(wù)器資源,因此我們?cè)趯?shí)際使用中是按業(yè)務(wù)去創(chuàng)建統(tǒng)一的線(xiàn)程組給該業(yè)務(wù)下的所有客戶(hù)端共同使用的,線(xiàn)程組的大小需要根據(jù)業(yè)務(wù)需求靈活配置。
在init方法中,我們給客戶(hù)端加上了一個(gè)handler來(lái)處理與服務(wù)端的交互,下面來(lái)看一下具體實(shí)現(xiàn)。
packageorg.example.client.handler; importio.netty.channel.ChannelHandlerContext; importio.netty.handler.timeout.IdleState; importio.netty.handler.timeout.IdleStateEvent; importlombok.extern.slf4j.Slf4j; importorg.example.client.NettyClient; importorg.springframework.context.annotation.Scope; importorg.springframework.stereotype.Component; importjava.util.Map; /** *@authorReWind00 *@date2023/2/1510:09 */ @Slf4j @Component @Scope("prototype") publicclassDemoClientHandlerextendsBaseClientHandler{ privatefinalStringimei; privatefinalMapbizData; privatefinalNettyClientnettyClient; privateintallIdleCounter=0; privatestaticfinalintMAX_IDLE_TIMES=3; publicDemoClientHandler(NettyClientnettyClient){ this.nettyClient=nettyClient; this.imei=nettyClient.getImei(); this.bizData=nettyClient.getBizData(); } @Override publicvoidchannelActive(ChannelHandlerContextctx)throwsException{ log.info("客戶(hù)端imei={},通道激活成功",this.imei); synchronized(this.nettyClient){//當(dāng)通道激活后解鎖隊(duì)列線(xiàn)程,然后再發(fā)送消息 this.nettyClient.notify(); } } @Override publicvoidchannelInactive(ChannelHandlerContextctx)throwsException{ log.warn("客戶(hù)端imei={},通道斷開(kāi)連接",this.imei); } @Override publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{ log.info("客戶(hù)端imei={},收到消息:{}",this.imei,msg); //處理業(yè)務(wù)... if("shutdown".equals(msg)){ this.nettyClient.close(); } } @Override publicvoiduserEventTriggered(ChannelHandlerContextctx,Objectevt)throwsException{ if(evtinstanceofIdleStateEvent){ IdleStateEvente=(IdleStateEvent)evt; booleanflag=false; if(e.state()==IdleState.ALL_IDLE){ this.allIdleCounter++; log.info("客戶(hù)端imei={}觸發(fā)閑讀或?qū)懙趝}次",this.imei,this.allIdleCounter); if(this.allIdleCounter>=MAX_IDLE_TIMES){ flag=true; } } if(flag){ log.warn("讀寫(xiě)超時(shí)達(dá)到{}次,主動(dòng)斷開(kāi)連接",MAX_IDLE_TIMES); ctx.channel().close(); } } } @Override publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{ log.error("客戶(hù)端imei={},連接異常{}",imei,cause.getMessage(),cause); } }
DemoClientHandler也是多實(shí)例bean,每個(gè)實(shí)例持有自己的NettyClient引用,以便在后續(xù)處理具體業(yè)務(wù)。在channelActive方法中,我們可以看到執(zhí)行了客戶(hù)端實(shí)例的notify方法,此處就是在客戶(hù)端創(chuàng)建成功并且通道激活后解除wait鎖的地方。channelRead方法就是我們處理服務(wù)端發(fā)送過(guò)來(lái)的消息的方法,我們的具體業(yè)務(wù)應(yīng)該在該方法執(zhí)行,當(dāng)然不建議長(zhǎng)時(shí)間阻塞客戶(hù)端的工作線(xiàn)程,可以考慮異步處理。
最后我們看一下客戶(hù)端緩存類(lèi)。
packageorg.example.client; importjava.util.concurrent.ConcurrentHashMap; /** *@authorReWind00 *@date2023/2/1511:01 */ publicclassNettyClientHolder{ privatestaticfinalConcurrentHashMapclientMap=newConcurrentHashMap<>(); publicstaticConcurrentHashMap get(){ returnclientMap; } }
由于netty的通道無(wú)法序列化,因此不能存入redis,只能緩存在本地內(nèi)存中,其本質(zhì)就是一個(gè)ConcurrentHashMap。
五、測(cè)試
packageorg.example.client.controller; importorg.example.client.QueueHolder; importorg.example.client.model.NettyMsgModel; importorg.springframework.web.bind.annotation.GetMapping; importorg.springframework.web.bind.annotation.RequestMapping; importorg.springframework.web.bind.annotation.RequestParam; importorg.springframework.web.bind.annotation.RestController; /** *@authorReWind00 *@date2023/2/1513:48 */ @RestController @RequestMapping("/demo") publicclassDemoController{ /** *間隔發(fā)送兩條消息 */ @GetMapping("testOne") publicvoidtestOne(){ QueueHolder.get().offer(NettyMsgModel.create("87654321","HelloWorld!")); try{ Thread.sleep(5000); }catch(InterruptedExceptione){ e.printStackTrace(); } QueueHolder.get().offer(NettyMsgModel.create("87654321","HelloWorldToo!")); } /** *任意發(fā)送消息 * *@paramimei *@parammsg */ @GetMapping("testTwo") publicvoidtestTwo(@RequestParamStringimei,@RequestParamStringmsg){ QueueHolder.get().offer(NettyMsgModel.create(imei,msg)); } /** *連續(xù)發(fā)送兩條消息第二條由于redis鎖將會(huì)重新放回隊(duì)列延遲消費(fèi) */ @GetMapping("testThree") publicvoidtestThree(){ QueueHolder.get().offer(NettyMsgModel.create("12345678","HelloWorld!")); QueueHolder.get().offer(NettyMsgModel.create("12345678","HelloWorldToo!")); } }
測(cè)試接口代碼如上,調(diào)用testOne,日志如下:
可以看到第一條消息觸發(fā)了客戶(hù)端創(chuàng)建流程,創(chuàng)建后發(fā)送了消息,而5秒后的第二條消息直接通過(guò)已有通道發(fā)送了。
測(cè)試接口代碼如上,調(diào)用testTwo,日志如下:
發(fā)送shutdown可以主動(dòng)斷開(kāi)已有連接。
測(cè)試接口代碼如上,調(diào)用testThree,日志如下:
可以看到第二條消息重新入列并被延遲消費(fèi)了。
六、源碼
https://gitee.com/jaster/netty-tcp-demo
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶(hù)小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶(hù)、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
后記
本demo項(xiàng)目?jī)H作學(xué)習(xí)交流使用,如果要應(yīng)用到生產(chǎn)環(huán)境還有些許不足,有問(wèn)題的同學(xué)可以留言交流。
-
通訊
+關(guān)注
關(guān)注
9文章
911瀏覽量
35012 -
TCP
+關(guān)注
關(guān)注
8文章
1377瀏覽量
79183 -
開(kāi)源
+關(guān)注
關(guān)注
3文章
3394瀏覽量
42628 -
spring
+關(guān)注
關(guān)注
0文章
340瀏覽量
14366 -
服務(wù)端
+關(guān)注
關(guān)注
0文章
66瀏覽量
7027 -
SpringBoot
+關(guān)注
關(guān)注
0文章
174瀏覽量
189
原文標(biāo)題:使用 Netty+SpringBoot 打造的 TCP 長(zhǎng)連接通訊方案
文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論