現如今市面上注冊中心的輪子很多,我實際使用過的就有三款:Eureka、Gsched、Nacos,由于當前參與 Nacos 集群的維護和開發工作,期間也參與了 Nacos 社區的一些開發和 Bug Fix 工作,過程中對 Nacos 原理有了一定的積累,今天給大家分享一下 Nacos 動態服務發現的原理。
01 什么是動態服務發現?
服務發現是指使用一個注冊中心來記錄分布式系統中的全部服務的信息,以便其他服務能夠快速的找到這些已注冊的服務。
在單體應用中,DNS+Nginx 可以滿足服務發現的要求,此時服務的IP列表配置在 nginx 上。在微服務架構中,由于服務粒度變的更細,服務的上下線更加頻繁,我們需要一款注冊中心來動態感知服務的上下線,并且推送IP列表變化給服務消費者,架構如下圖。
基于 Spring Boot + MyBatis Plus + Vue & Element 實現的后臺管理系統 + 用戶小程序,支持 RBAC 動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能
項目地址:https://github.com/YunaiV/ruoyi-vue-pro
視頻教程:https://doc.iocoder.cn/video/
02 Nacos 實現動態服務發現的原理
Nacos實現動態服務發現的核心原理如下圖,我們接下來的內容將圍繞這個圖來進行。
2.1 通訊協議
整個服務注冊與發現過程,都離不開通訊協議,在1.x的 Nacos 版本中服務端只支持 http 協議,后來為了提升性能在2.x版本引入了谷歌的 grpc,grpc 是一款長連接協議,極大的減少了 http 請求頻繁的連接創建和銷毀過程,能大幅度提升性能,節約資源。
據官方測試,Nacos服務端 grpc 版本,相比 http 版本的性能提升了9倍以上。
2.2 Nacos 服務注冊
簡單來講,服務注冊的目的就是客戶端將自己的ip端口等信息上報給 Nacos 服務端,過程如下:
創建長連接:Nacos SDK 通過Nacos服務端域名解析出服務端ip列表,選擇其中一個ip創建 grpc 連接,并定時檢查連接狀態,當連接斷開,則自動選擇服務端ip列表中的下一個ip進行重連。
健康檢查請求:在正式發起注冊之前,Nacos SDK 向服務端發送一個空請求,服務端回應一個空請求,若Nacos SDK 未收到服務端回應,則認為服務端不健康,并進行一定次數重試,如果都未收到回應,則注冊失敗。
發起注冊:當你查看Nacos java SDK的注冊方法時,你會發現沒有返回值,這是因為Nacos SDK做了補償機制,在真實給服務端上報數據之前,會先往緩存中插入一條記錄表示開始注冊,注冊成功之后再從緩存中標記這條記錄為注冊成功,當注冊失敗時,緩存中這條記錄是未注冊成功的狀態,Nacos SDK開啟了一個定時任務,定時查詢異常的緩存數據,重新發起注冊。
Nacos SDK注冊失敗時的自動補償機制時序圖。
相關源碼如下:
@Override publicvoidregisterService(StringserviceName,StringgroupName,Instanceinstance)throwsNacosException{ NAMING_LOGGER.info("[REGISTER-SERVICE]{}registeringservice{}withinstance{}",namespaceId,serviceName, instance); //添加redo日志 redoService.cacheInstanceForRedo(serviceName,groupName,instance); doRegisterService(serviceName,groupName,instance); } publicvoiddoRegisterService(StringserviceName,StringgroupName,Instanceinstance)throwsNacosException{ //向服務端發起注冊 InstanceRequestrequest=newInstanceRequest(namespaceId,serviceName,groupName, NamingRemoteConstants.REGISTER_INSTANCE,instance); requestToServer(request,Response.class); //標記注冊成功 redoService.instanceRegistered(serviceName,groupName); }
執行補償定時任務RedoScheduledTask。
@Override publicvoidrun(){ if(!redoService.isConnected()){ LogUtils.NAMING_LOGGER.warn("GrpcConnectionisdisconnect,skipcurrentredotask"); return; } try{ redoForInstances(); redoForSubscribes(); }catch(Exceptione){ LogUtils.NAMING_LOGGER.warn("Redotaskrunwithunexpectedexception:",e); } } privatevoidredoForInstances(){ for(InstanceRedoDataeach:redoService.findInstanceRedoData()){ try{ redoForInstance(each); }catch(NacosExceptione){ LogUtils.NAMING_LOGGER.error("Redoinstanceoperation{}for{}@@{}failed.",each.getRedoType(), each.getGroupName(),each.getServiceName(),e); } } }
服務端數據同步(Distro協議):Nacos SDK只會與服務端某個節點建立長連接,當服務端接受到客戶端注冊的實例數據后,還需要將實例數據同步給其他節點。Nacos自己實現了一個一致性協議名為Distro,服務注冊的時候會觸發Distro一次同步,每個Nacos節點之間會定時互相發送Distro數據,以此保證數據最終一致。
服務實例上線推送:Nacos服務端收到服務實例數據后會將服務的最新實例列表通過grpc推送給該服務的所有訂閱者。
服務注冊過程源碼時序圖:整理了一下服務注冊過程整體時序圖,對源碼實現感興趣的可以按照根據這個時序圖view一下源碼。
2.3 Nacos 心跳機制
目前主流的注冊中心,比如Consul、Eureka、Zk包括我們公司自研的Gsched,都是通過心跳機制來感知服務的下線。Nacos也是通過心跳機制來實現的。
Nacos目前SDK維護了兩個分支的版本(1.x、2.x),這兩個版本心跳機制的實現不一樣。其中1.x版本的SDK通過http協議來定時向服務端發送心跳維持自己的健康狀態,2.x版本的SDK則通過grpc自身的心跳機制來保活,當Nacos服務端接受不到服務實例的心跳,會認為實例下線。如下圖:
grpc監測到連接斷開事件,發送ClientDisconnectEvent。
publicclassConnectionBasedClientManagerextendsClientConnectionEventListenerimplementsClientManager{ //連接斷開,發送連接斷開事件 publicbooleanclientDisconnected(StringclientId){ Loggers.SRV_LOG.info("Clientconnection{}disconnect,removeinstancesandsubscribers",clientId); ConnectionBasedClientclient=clients.remove(clientId); if(null==client){ returntrue; } client.release(); NotifyCenter.publishEvent(newClientEvent.ClientDisconnectEvent(client)); returntrue; } }
移除客戶端注冊的服務實例
publicclassClientServiceIndexesManagerextendsSmartSubscriber{ @Override publicvoidonEvent(Eventevent){ //接收失去連接事件 if(eventinstanceofClientEvent.ClientDisconnectEvent){ handleClientDisconnect((ClientEvent.ClientDisconnectEvent)event); }elseif(eventinstanceofClientOperationEvent){ handleClientOperation((ClientOperationEvent)event); } } privatevoidhandleClientDisconnect(ClientEvent.ClientDisconnectEventevent){ Clientclient=event.getClient(); for(Serviceeach:client.getAllSubscribeService()){ removeSubscriberIndexes(each,client.getClientId()); } //移除客戶端注冊的服務實例 for(Serviceeach:client.getAllPublishedService()){ removePublisherIndexes(each,client.getClientId()); } } //移除客戶端注冊的服務實例 privatevoidremovePublisherIndexes(Serviceservice,StringclientId){ if(!publisherIndexes.containsKey(service)){ return; } publisherIndexes.get(service).remove(clientId); NotifyCenter.publishEvent(newServiceEvent.ServiceChangedEvent(service,true)); } }
2.4 Nacos 服務訂閱
當一個服務發生上下線,Nacos如何知道要推送給哪些客戶端?
Nacos SDK 提供了訂閱和取消訂閱方法,當客戶端向服務端發起訂閱請求,服務端會記錄發起調用的客戶端為該服務的訂閱者,同時將服務的最新實例列表返回。當客戶端發起了取消訂閱,服務端就會從該服務的訂閱者列表中把當前客戶端移除。
當客戶端發起訂閱時,服務端除了會同步返回最新的服務實例列表,還會異步的通過grpc推送給該訂閱者最新的服務實例列表,這樣做的目的是為了異步更新客戶端本地緩存的服務數據。
當客戶端訂閱的服務上下線,該服務所有的訂閱者會立刻收到最新的服務列表并且將服務最新的實例數據更新到內存。
我們也看一下相關源碼,服務端接收到訂閱數據,首先保存到內存中。
@Override publicvoidsubscribeService(Serviceservice,Subscribersubscriber,StringclientId){ Servicesingleton=ServiceManager.getInstance().getSingletonIfExist(service).orElse(service); Clientclient=clientManager.getClient(clientId); //校驗長連接是否正常 if(!clientIsLegal(client,clientId)){ return; } //保存訂閱數據 client.addServiceSubscriber(singleton,subscriber); client.setLastUpdatedTime(); //發送訂閱事件 NotifyCenter.publishEvent(newClientOperationEvent.ClientSubscribeServiceEvent(singleton,clientId)); } privatevoidhandleClientOperation(ClientOperationEventevent){ Serviceservice=event.getService(); StringclientId=event.getClientId(); if(eventinstanceofClientOperationEvent.ClientRegisterServiceEvent){ addPublisherIndexes(service,clientId); }elseif(eventinstanceofClientOperationEvent.ClientDeregisterServiceEvent){ removePublisherIndexes(service,clientId); }elseif(eventinstanceofClientOperationEvent.ClientSubscribeServiceEvent){ //處理訂閱操作 addSubscriberIndexes(service,clientId); }elseif(eventinstanceofClientOperationEvent.ClientUnsubscribeServiceEvent){ removeSubscriberIndexes(service,clientId); } }
然后發布訂閱事件。
privatevoidaddSubscriberIndexes(Serviceservice,StringclientId){ //保存訂閱數據 subscriberIndexes.computeIfAbsent(service,(key)->newConcurrentHashSet<>()); //Fix#5404,Onlyfirsttimeaddneednotifyevent. if(subscriberIndexes.get(service).add(clientId)){ //發布訂閱事件 NotifyCenter.publishEvent(newServiceEvent.ServiceSubscribedEvent(service,clientId)); } }
服務端自己消費訂閱事件,并且推送給訂閱的客戶端最新的服務實例數據。
@Override publicvoidonEvent(Eventevent){ if(!upgradeJudgement.isUseGrpcFeatures()){ return; } if(eventinstanceofServiceEvent.ServiceChangedEvent){ //Ifservicechanged,pushtoallsubscribers. ServiceEvent.ServiceChangedEventserviceChangedEvent=(ServiceEvent.ServiceChangedEvent)event; Serviceservice=serviceChangedEvent.getService(); delayTaskEngine.addTask(service,newPushDelayTask(service,PushConfig.getInstance().getPushTaskDelay())); }elseif(eventinstanceofServiceEvent.ServiceSubscribedEvent){ //Ifserviceissubscribedbyoneclient,onlypushthisclient. ServiceEvent.ServiceSubscribedEventsubscribedEvent=(ServiceEvent.ServiceSubscribedEvent)event; Serviceservice=subscribedEvent.getService(); delayTaskEngine.addTask(service,newPushDelayTask(service,PushConfig.getInstance().getPushTaskDelay(), subscribedEvent.getClientId())); } }
2.5 Nacos 推送
推送方式
前面說了服務的注冊和訂閱都會發生推送(服務端->客戶端),那推送到底是如何實現的呢?
在早期的Nacos版本,當服務實例變化,服務端會通過udp協議將最新的數據發送給客戶端,后來發現udp推送有一定的丟包率,于是新版本的Nacos支持了grpc推送。Nacos服務端會自動判斷客戶端的版本來選擇哪種方式來進行推送,如果你使用1.4.2以前的SDK進行注冊,那Nacos服務端會使用udp協議來進行推送,反之則使用grpc。
推送失敗重試
當發送推送時,客戶端可能正在重啟,或者連接不穩定導致推送失敗,這個時候Nacos會進行重試。Nacos將每個推送都封裝成一個任務對象,放入到隊列中,再開啟一個線程不停的從隊列取出任務執行,執行之前會先刪除該任務,如果執行失敗則將任務重新添加到隊列,該線程會記錄任務執行的時間,如果超過1秒,則會記錄到日志。
推送部分源碼
添加推送任務到執行隊列中。
privatestaticclassPushDelayTaskProcessorimplementsNacosTaskProcessor{ privatefinalPushDelayTaskExecuteEngineexecuteEngine; publicPushDelayTaskProcessor(PushDelayTaskExecuteEngineexecuteEngine){ this.executeEngine=executeEngine; } @Override publicbooleanprocess(NacosTasktask){ PushDelayTaskpushDelayTask=(PushDelayTask)task; Serviceservice=pushDelayTask.getService(); NamingExecuteTaskDispatcher.getInstance() .dispatchAndExecuteTask(service,newPushExecuteTask(service,executeEngine,pushDelayTask)); returntrue; } }
推送任務PushExecuteTask 的執行。
publicclassPushExecuteTaskextendsAbstractExecuteTask{ //..省略 @Override publicvoidrun(){ try{ //封裝要推送的服務實例數據 PushDataWrapperwrapper=generatePushData(); ClientManagerclientManager=delayTaskEngine.getClientManager(); //如果是服務上下線導致的推送,獲取所有訂閱者 //如果是訂閱導致的推送,獲取訂閱者 for(Stringeach:getTargetClientIds()){ Clientclient=clientManager.getClient(each); if(null==client){ //meansthisclienthasdisconnect continue; } Subscribersubscriber=clientManager.getClient(each).getSubscriber(service); //推送給訂閱者 delayTaskEngine.getPushExecutor().doPushWithCallback(each,subscriber,wrapper, newNamingPushCallback(each,subscriber,wrapper.getOriginalData(),delayTask.isPushToAll())); } }catch(Exceptione){ Loggers.PUSH.error("Pushtaskforservice"+service.getGroupedServiceName()+"executefailed",e); //當推送發生異常,重新將推送任務放入執行隊列 delayTaskEngine.addTask(service,newPushDelayTask(service,1000L)); } } //如果是服務上下線導致的推送,獲取所有訂閱者 //如果是訂閱導致的推送,獲取訂閱者 privateCollectiongetTargetClientIds(){ returndelayTask.isPushToAll()?delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service) :delayTask.getTargetClients(); }
執行推送任務線程InnerWorker 的執行。
/** *Innerexecuteworker. */ privateclassInnerWorkerextendsThread{ InnerWorker(Stringname){ setDaemon(false); setName(name); } @Override publicvoidrun(){ while(!closed.get()){ try{ //從隊列中取出任務PushExecuteTask Runnabletask=queue.take(); longbegin=System.currentTimeMillis(); //執行PushExecuteTask task.run(); longduration=System.currentTimeMillis()-begin; if(duration>1000L){ log.warn("task{}takes{}ms",task,duration); } }catch(Throwablee){ log.error("[TASK-FAILED]"+e.toString(),e); } } } }
2.6 Nacos SDK 查詢服務實例
服務消費者首先需要調用Nacos SDK的接口來獲取最新的服務實例,然后才能從獲取到的實例列表中以加權輪詢的方式選擇出一個實例(包含ip,port等信息),最后再發起調用。
前面已經提到Nacos服務發生上下線、訂閱的時候都會推送最新的服務實例列表到當客戶端,客戶端再更新本地內存中的緩沖數據,所以調用Nacos SDK提供的查詢實例列表的接口時,不會直接請求服務端獲取數據,而是會優先使用內存中的服務數據,只有內存中查不到的情況下才會發起訂閱請求服務端數據。
Nacos SDK內存中的數據除了接受來自服務端的推送更新之外,自己本地也會有一個定時任務定時去獲取服務端數據來進行兜底。Nacos SDK在查詢的時候也了容災機制,即從磁盤獲取服務數據,而這個磁盤的數據其實也是來自于內存,有一個定時任務定時從內存緩存中獲取然后加載到磁盤。Nacos SDK的容災機制默認關閉,可通過設置環境變量failover-mode=true來開啟。
架構圖
用戶查詢流程
查詢服務實例部分源碼
privatefinalConcurrentMapserviceInfoMap; @Override publicList getAllInstances(StringserviceName,StringgroupName,List clusters, booleansubscribe)throwsNacosException{ ServiceInfoserviceInfo; StringclusterString=StringUtils.join(clusters,","); //這里默認傳過來是true if(subscribe){ //從本地內存獲取服務數據,如果獲取不到則從磁盤獲取 serviceInfo=serviceInfoHolder.getServiceInfo(serviceName,groupName,clusterString); if(null==serviceInfo||!clientProxy.isSubscribed(serviceName,groupName,clusterString)){ //如果從本地獲取不到數據,則調用訂閱方法 serviceInfo=clientProxy.subscribe(serviceName,groupName,clusterString); } }else{ //適用于不走訂閱,直接從服務端獲取數據的情況 serviceInfo=clientProxy.queryInstancesOfService(serviceName,groupName,clusterString,0,false); } List list; if(serviceInfo==null||CollectionUtils.isEmpty(list=serviceInfo.getHosts())){ returnnewArrayList (); } returnlist; } } //從本地內存獲取服務數據,如果開啟了故障轉移則直接從磁盤獲取,因為當服務端掛了,本地啟動時內存中也沒有數據 publicServiceInfogetServiceInfo(finalStringserviceName,finalStringgroupName,finalStringclusters){ NAMING_LOGGER.debug("failover-mode:{}",failoverReactor.isFailoverSwitch()); StringgroupedServiceName=NamingUtils.getGroupedName(serviceName,groupName); Stringkey=ServiceInfo.getKey(groupedServiceName,clusters); //故障轉移則直接從磁盤獲取 if(failoverReactor.isFailoverSwitch()){ returnfailoverReactor.getService(key); } //返回內存中數據 returnserviceInfoMap.get(key); }
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現的后臺管理系統 + 用戶小程序,支持 RBAC 動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能
項目地址:https://github.com/YunaiV/yudao-cloud
視頻教程:https://doc.iocoder.cn/video/
03 結語
本篇文章向大家介紹 Nacos 服務發現的基本概念和核心能力以及實現的原理,旨在讓大家對 Nacos 的服務注冊與發現功能有更多的了解,做到心中有數。
-
管理系統
+關注
關注
1文章
2573瀏覽量
36030 -
nacos
+關注
關注
0文章
10瀏覽量
219
原文標題:4 個維度搞懂 Nacos 注冊中心
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論