欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

RocketMQ源碼中如何實(shí)現(xiàn)注冊(cè)服務(wù)器

這篇文章給大家分享的是有關(guān)RocketMQ源碼中如何實(shí)現(xiàn)注冊(cè)服務(wù)器的內(nèi)容。小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過(guò)來(lái)看看吧。

成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作、外貿(mào)網(wǎng)站建設(shè),成都做網(wǎng)站公司-創(chuàng)新互聯(lián)已向數(shù)千家企業(yè)提供了,網(wǎng)站設(shè)計(jì),網(wǎng)站制作,網(wǎng)絡(luò)營(yíng)銷等服務(wù)!設(shè)計(jì)與技術(shù)結(jié)合,多年網(wǎng)站推廣經(jīng)驗(yàn),合理的價(jià)格為您打造企業(yè)品質(zhì)網(wǎng)站。

NamesrvStartup

該類用于啟動(dòng)注冊(cè)服務(wù)器。其main方法委托了main0方法,該方法的執(zhí)行邏輯如下:

  1. 調(diào)用方法NamesrvStartup#createNamesrvController創(chuàng)建一個(gè)NamesrvController實(shí)例,聲明為controller

  2. 調(diào)用方法NamesrvStartup#start將這個(gè)controller啟動(dòng)。

那么下面就分別來(lái)看下兩個(gè)方法的具體內(nèi)容。

createNamesrvController

這個(gè)方法最重要的就是用構(gòu)造方法創(chuàng)建了NamesrvController對(duì)象。而在調(diào)用構(gòu)造方法之前有較多的代碼是用于解析命令行對(duì)象,以及可能的情況下讀取文件中的配置信息、打印當(dāng)前的整體配置信息。

這些額外配置不存在的時(shí)候,默認(rèn)配置下,注冊(cè)服務(wù)器是監(jiān)聽于9876端口。

start

該方法的作用是啟動(dòng)入?yún)⒌?code>NamesrvController實(shí)例。具體來(lái)說(shuō),流程如下:

  1. 執(zhí)行方法NamesrvController#initialize進(jìn)行初始化。

  2. 為運(yùn)行時(shí)添加一個(gè)hook,在JVM關(guān)閉的時(shí)候,執(zhí)行方法NamesrvController#shutdown對(duì)注冊(cè)服務(wù)器執(zhí)行優(yōu)雅關(guān)閉。

  3. 執(zhí)行方法NamesrvController#start啟動(dòng)注冊(cè)服務(wù)器。

NamesrvController

這個(gè)類用于控制注冊(cè)服務(wù)器。

構(gòu)造方法

構(gòu)造方法中主要是為了幾個(gè)重要屬性進(jìn)行賦值操作。比如初始化kvConfigManagerrouteInfoManager這兩個(gè)重要的屬性。

initialize

該方法用于初始化注冊(cè)服務(wù)器,執(zhí)行邏輯如下:

  1. 執(zhí)行方法kvconfig.KVConfigManager#load加載配置信息。默認(rèn)情況下,加載 ${user.home}/namesrv/kvConfig.json 文件的內(nèi)容到屬性kvconfig.KVConfigManager#configTable中。

  2. 新建一個(gè)NettyRemotingServer對(duì)象,為屬性NamesrvController#remotingServer賦值。這個(gè)新建的對(duì)象,使用了BrokerHousekeepingService作為入?yún)?。?code>BrokerHousekeepingService的作用就是在發(fā)生通道關(guān)閉、異常、空閑等情況時(shí),將該通道從路由信息里刪除。

  3. 創(chuàng)建一個(gè)線程池,賦值給屬性NamesrvController#remotingExecutor,用于注冊(cè)服務(wù)器在Netty中的業(yè)務(wù)執(zhí)行。

  4. 調(diào)用方法NamesrvController#registerProcessor將業(yè)務(wù)處理器注冊(cè)到RemotingServer中。使用的線程池就是步驟3創(chuàng)建的線程池。

  5. 創(chuàng)建一個(gè)間隔時(shí)間為10秒的周期性任務(wù),任務(wù)內(nèi)容是調(diào)用方法RouteInfoManager#scanNotActiveBroker掃描非激活模式的Broker。

start

該方法沒(méi)有更多內(nèi)容,只是簡(jiǎn)單了啟動(dòng)了RemotingServer。在這個(gè)方法之后,就可以開始監(jiān)聽Broker上送的注冊(cè)請(qǐng)求。

KVConfigManager

該類是注冊(cè)服務(wù)器的配置存儲(chǔ)類。會(huì)將配置信息存儲(chǔ)在文件 ${user.home}/namesrv/kvConfig.json 。內(nèi)部用來(lái)存儲(chǔ)配置信息的是一個(gè)HashMap<String, HashMap<String, String>> 結(jié)構(gòu),也就是兩級(jí)結(jié)構(gòu)。

第一級(jí)是命名空間,第二集是KV對(duì),都是字符串形式。

該類的load方法可以從文件中加載數(shù)據(jù)到內(nèi)存里,persist方法可以將內(nèi)存中的數(shù)據(jù)再寫入到文件中。

DefaultRequestProcessor

這個(gè)類是 rocketmq-namesrv 這個(gè)包下面,代碼量最多的類了。因?yàn)闃I(yè)務(wù)處理都實(shí)現(xiàn)在了這個(gè)類上面。

按照NettyRequestProcessor接口的實(shí)現(xiàn)套路,業(yè)務(wù)請(qǐng)求的分流都是在processRequest方法中,這里也是,接下來(lái)就一個(gè)個(gè)看這個(gè)類支持的命令。

PUT_KV_CONFIG

該命令沒(méi)有請(qǐng)求體,請(qǐng)求頭中有namespace、keyvalue字段,調(diào)用方法kvconfig.KVConfigManager#putKVConfig將配置項(xiàng)放入到配置管理器中即可。

GET_KV_CONFIG

該命令沒(méi)有請(qǐng)求體,請(qǐng)求頭中有namespacekey字段,調(diào)用方法kvconfig.KVConfigManager#getKVConfig獲取對(duì)應(yīng)配置項(xiàng)。

如果配置項(xiàng)存在,返回成功響應(yīng)。如果配置信息不存在,返回失敗響應(yīng),響應(yīng)碼為QUERY_NOT_FOUND。

DELETE_KV_CONFIG

該命令沒(méi)有請(qǐng)求體,請(qǐng)求頭中有namespace、key字段,調(diào)用方法kvconfig.KVConfigManager#deleteKVConfig刪除對(duì)應(yīng)配置項(xiàng)。

QUERY_DATA_VERSION

該命令用于查詢注冊(cè)服務(wù)器上Broker的數(shù)據(jù)版本號(hào)。具體執(zhí)行邏輯如下:

  1. 從命令的內(nèi)容體解析出DataVersion對(duì)象,從請(qǐng)求頭中解析出BrokerAddr數(shù)據(jù)。使用這兩個(gè)作為入?yún)ⅲ{(diào)用方法RouteInfoManager#isBrokerTopicConfigChanged判斷與服務(wù)器上該BrokerAddr的版本號(hào)是否一致,將結(jié)果聲明為changed。

  2. 如果changedfalse,表明版本號(hào)沒(méi)有變化,那么服務(wù)器上的數(shù)據(jù)在當(dāng)前時(shí)間還是有效的,調(diào)用方法RouteInfoManager#updateBrokerInfoUpdateTimestamp更新這個(gè)數(shù)據(jù)的有效時(shí)間。

  3. 調(diào)用方法RouteInfoManager#queryBrokerTopicConfig查詢服務(wù)器上BrokerAddr對(duì)應(yīng)的版本號(hào),聲明為nameSeverDataVersion。

  4. 構(gòu)建命令響應(yīng)對(duì)象,如果nameSeverDataVersion不為null,則編碼后設(shè)置到內(nèi)容體。在響應(yīng)頭中設(shè)置changed屬性,值為步驟1產(chǎn)生的聲明對(duì)象。

REGISTER_BROKER

該命令用于Broker信息的注冊(cè)。首先獲取請(qǐng)求頭中MQ的版本號(hào),如果版本號(hào)大于等于3.0.11,則調(diào)用方法processor.DefaultRequestProcessor#registerBrokerWithFilterServer進(jìn)行信息注冊(cè);否則調(diào)用方法processor.DefaultRequestProcessor#registerBroker進(jìn)行信息注冊(cè)。

registerBrokerWithFilterServer

方法的執(zhí)行邏輯如下:

  1. 對(duì)請(qǐng)求命令進(jìn)行解碼工作,創(chuàng)建出RegisterBrokerRequestHeader對(duì)象。使用該對(duì)象對(duì)象和請(qǐng)求中的body字段執(zhí)行crc校驗(yàn),如果校驗(yàn)失敗,返回系統(tǒng)錯(cuò)誤響應(yīng)。否則,繼續(xù)后續(xù)流程。

  2. 如果命令請(qǐng)求對(duì)象中包含內(nèi)容體,則解碼出RegisterBrokerBody對(duì)象,聲明為registerBrokerBody。如果命令請(qǐng)求對(duì)象不包含內(nèi)容體,則手動(dòng)創(chuàng)建RegisterBrokerBody對(duì)象,并且將其DataVersion的版本號(hào)設(shè)置為0,時(shí)間戳設(shè)置為0.

  3. 調(diào)用方法RouteInfoManager#registerBroker注冊(cè)路由信息,將結(jié)果聲明為result。

  4. 創(chuàng)建類型為RegisterBrokerResponseHeader的響應(yīng)頭對(duì)象,聲明為responseHeader。將resultmasterAddrHaServerAddr屬性設(shè)置到響應(yīng)頭對(duì)象中。

  5. 從配置管理器中以ORDER_TOPIC_CONFIG作為命名空間,取出該命名空間下面的配置數(shù)據(jù)對(duì)象,編碼后將二進(jìn)制設(shè)置為響應(yīng)的內(nèi)容體。

  6. 返回響應(yīng)對(duì)象。

registerBroker

registerBrokerWithFilterServer方法的流程基本一致,只不過(guò)在調(diào)用方法RouteInfoManager#registerBroker的時(shí)候,入?yún)⒌?code>filterServerList為null。

UNREGISTER_BROKER

該命令用于注銷 Broker 的注冊(cè)。調(diào)用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#unregisterBroker完成,而該方法內(nèi)部則是委托給了方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unregisterBroker

GET_ROUTEINFO_BY_TOPIC

該命名用于查詢主題的路由信息,調(diào)用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic。

該方法用于在路由管理器中根據(jù)主題名稱獲取全量的路由信息,具體流程如下:

  1. 使用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData根據(jù)請(qǐng)求的主題名稱得到類型為TopicRouteData的結(jié)果,聲明為topicRouteData。

  2. 如果topicRouteData不為null,則執(zhí)行如下子流程。

    1. 如果配置org.apache.rocketmq.common.namesrv.NamesrvConfig#orderMessageEnable開啟,則從命名空間ORDER_TOPIC_CONFIG下面,獲取入?yún)⒅黝}名稱的配置信息,聲明為orderTopicConf。將orderTopicConf設(shè)置到屬性org.apache.rocketmq.common.protocol.route.TopicRouteData#orderTopicConf

    2. topicRouteData進(jìn)行編碼,設(shè)置為響應(yīng)的內(nèi)容體,返回響應(yīng)對(duì)象。

  3. 如果topicRouteData為null,則返回TOPIC_NOT_EXIST響應(yīng)。

GET_BROKER_CLUSTER_INFO

該命令調(diào)用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getBrokerClusterInfo。該方法的邏輯就是調(diào)用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllClusterInfo得到一個(gè)編碼后的內(nèi)容體,將這個(gè)內(nèi)容體設(shè)置為響應(yīng)的內(nèi)容體,返回響應(yīng)對(duì)象即可。

編碼的內(nèi)容體數(shù)據(jù)結(jié)構(gòu)類是ClusterInfo,其屬性如下

HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

WIPE_WRITE_PERM_OF_BROKER

該命令用于擦除Broker的寫權(quán)限,也就說(shuō)所有在該Broker上的主題都沒(méi)有寫入權(quán)限了。調(diào)用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#wipeWritePermOfBroker實(shí)現(xiàn),該方法的邏輯如下:

  1. 調(diào)用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#wipeWritePermOfBrokerByLock擦除入?yún)?code>Broker的寫權(quán)限,方法的返回值為擦除的隊(duì)列信息個(gè)數(shù)。將結(jié)果聲明為wipeTopicCnt

  2. wipeTopicCnt設(shè)置到響應(yīng)頭的對(duì)應(yīng)屬性,返回響應(yīng)。

GET_ALL_TOPIC_LIST_FROM_NAMESERVER

該命令用于獲取注冊(cè)服務(wù)器上全量的主題信息,調(diào)用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getAllTopicListFromNameserver實(shí)現(xiàn)。

該方法內(nèi)部調(diào)用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllTopicList獲取所有的主題名稱形成的列表,并且編碼為二進(jìn)制數(shù)組,設(shè)置為響應(yīng)的內(nèi)容體,將響應(yīng)返回。

DELETE_TOPIC_IN_NAMESRV

該命令用于刪除服務(wù)器上的主題信息,通過(guò)方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#deleteTopicInNamesrv實(shí)現(xiàn)。方法實(shí)現(xiàn)也簡(jiǎn)單,直接從topicQueueTable中刪除對(duì)應(yīng)的主題名稱即可。

GET_KVLIST_BY_NAMESPACE

該命令用于獲取服務(wù)器上特定命名空間下的配置信息。通過(guò)方法org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#getKVListByNamespace獲取到對(duì)應(yīng)的配置信息,并且編碼為二進(jìn)制數(shù)組。

如果數(shù)組存在,則設(shè)置到響應(yīng)的內(nèi)容體中,返回成功響應(yīng)。

如果數(shù)組不存在,則返回QUERY_NOT_FOUND響應(yīng)。

GET_TOPICS_BY_CLUSTER

該命令用于獲取集群下所有的主題名稱,調(diào)用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getTopicsByCluster完成。該方法內(nèi)部調(diào)用org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getTopicsByCluster獲取集群下的所有主題名稱的編碼結(jié)果,將編碼結(jié)果的二進(jìn)制數(shù)組設(shè)置到響應(yīng)的內(nèi)容體中,返回成功響應(yīng)。

GET_SYSTEM_TOPIC_LIST_FROM_NS

這個(gè)命令有點(diǎn)奇怪,看命令名稱是獲取系統(tǒng)主題列表。但是從方法實(shí)現(xiàn)上,內(nèi)部的內(nèi)容整體是混亂的。這個(gè)命令暫且放下,等看到相關(guān)聯(lián)的請(qǐng)求查詢的時(shí)候在處理。

GET_UNIT_TOPIC_LIST

該命令用于獲取集群下,有unit標(biāo)識(shí)的主題名稱集合。通過(guò)方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getUnitTopicList實(shí)現(xiàn),該方法內(nèi)部調(diào)用了方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getUnitTopics來(lái)返回具備unit標(biāo)識(shí)的主題名稱集合的編碼后二進(jìn)制數(shù)組。將這個(gè)數(shù)組設(shè)置為響應(yīng)的內(nèi)容體,并且返回。

GET_HAS_UNIT_SUB_TOPIC_LIST

該命令用于獲取集群下,有unit_sub標(biāo)識(shí)的主題名稱集合。做法上與GET_UNIT_TOPIC_LIST命令是相同的,只不過(guò)用的標(biāo)識(shí)不同。

GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST

該命令用于獲取集群下,同時(shí)有unitunit_sub標(biāo)識(shí)的主題名稱集合。做法上與上述的一致,只不過(guò)用的標(biāo)識(shí)不同。

UPDATE_NAMESRV_CONFIG

這個(gè)命令是用于管理端直接發(fā)送配置的文本到注冊(cè)服務(wù),用于更新注冊(cè)服務(wù)自身的配置,而后將配置信息持久化到磁盤文件。

GET_NAMESRV_CONFIG

這個(gè)命令用于獲取注冊(cè)服務(wù)的配置信息,將配置信息設(shè)置到響應(yīng)的內(nèi)容體中。

RouteInfoManager

該類是路由信息的管理器,其中使用了多個(gè)類來(lái)抽象各種路由信息。下面先看下這些定義類。

QueueData

該類保存了Broker中的隊(duì)列信息。有如下屬性:

  • brokerName,Broker的名稱,默認(rèn)情況下是Broker所在機(jī)器的域名,可以由配置定義。

  • readQueueNums,用于讀取的隊(duì)列數(shù)量。

  • writeQueueNums,用于寫入的隊(duì)列數(shù)量。

  • perm,該Broker的權(quán)限信息,權(quán)限指的是是否可讀、是否可寫。

  • topicSynFlag,主題同步標(biāo)識(shí)。

BrokerData

該類保存了Broker集群的地址信息,有如下屬性:

  • cluster,集群標(biāo)識(shí)。

  • brokerName,Broker名稱。

  • brokerAddrs,brokerId和BrokerAddr的映射表。該屬性存儲(chǔ)了同一個(gè)Broker名稱下id和地址的映射關(guān)系。

BrokerLiveInfo

該類保存了具體某個(gè)Broker的存活信息,有如下屬性

  • lastUpdateTimestamp,最近一次數(shù)據(jù)更新時(shí)間。

  • dataVersion,該Broker的主題配置信息的版本號(hào)。

  • channel,Netty的Channel對(duì)象,該對(duì)象即是Broker與服務(wù)器之間的鏈接對(duì)象。

  • haServerAddr,高可用主節(jié)點(diǎn)地址。格式為${ip}:${port} 。

存儲(chǔ)屬性

RouteInfoManager內(nèi)部管理著5個(gè)Map結(jié)構(gòu),用于存儲(chǔ)路由相關(guān)信息,這些信息用代碼來(lái)看會(huì)更清晰一些,如下:

HashMap<String/* topic */, List<QueueData>> topicQueueTable;
HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

registerBroker

該方法用于實(shí)現(xiàn)Broker信息注冊(cè)到路由管理器上,具體方法流程如下:

  1. clusterAddrTable中以入?yún)⒌?code>clusterName獲取集群下所有Broker的名稱,聲明為brokerNames。

  2. 如果brokerNames為null,則為其賦值一個(gè)空的HashSet<String>。并且在clusterAddrTable放入這個(gè)clusterNamebrokerNames兩個(gè)值。

  3. brokerNames中添加本次注冊(cè)上來(lái)的Broker的名稱。

  4. brokerAddrTablebrokerName獲取BrokerData對(duì)象,如果不存在則新建一個(gè)并且放入到brokerAddrTable中。

  5. 取出步驟4中brokerData中的brokerAddrs映射,遍歷其中的元素,如果值與入?yún)⒌?code>brokerAddr相等,鍵與入?yún)⒌?code>brokerId不等,則刪除這個(gè)這一鍵值對(duì)。這種情況說(shuō)明此時(shí)該IP對(duì)應(yīng)的Broker信息已經(jīng)發(fā)生了變化。

  6. 將入?yún)⒌?code>brokerId和brokerAddr放入到brokerAddrs中。

  7. 如果brokerId為0也就是主節(jié)點(diǎn),并且入?yún)⒌?code>topicConfigWrapper不為null,也就是說(shuō)Broker發(fā)送的注冊(cè)命令是包含了請(qǐng)求體,那么執(zhí)行子流程。否則繼續(xù)后續(xù)流程。

    1. brokerLiveTable查詢?cè)?code>broker的版本號(hào),與topicConfigWrapper的版本號(hào)對(duì)比,確認(rèn)是否有變化。如果有變化,或者該Broker是新注冊(cè)的(brokerName第一次注冊(cè)或者brokerId第一次注冊(cè)),那么就很有可能本次攜帶了新的主題配置信息。則需要更更新注冊(cè)服務(wù)器上主題配置信息。也就執(zhí)行后續(xù)流程。否則結(jié)束子流程,繼續(xù)執(zhí)行步驟8.

    2. 遍歷屬性TopicConfigSerializeWrapper#topicConfigTable,對(duì)集合中每一個(gè)元素調(diào)用方法RouteInfoManager#createAndUpdateQueueData,更新主題對(duì)應(yīng)的隊(duì)列信息。

  8. 構(gòu)建BrokerLiveInfo對(duì)象,放入brokerLiveTable中。

  9. 如果入?yún)⒌?code>filterServerList不為null,則放入filterServerTable。

  10. 如果brokerId不為0,也就是當(dāng)前是從節(jié)點(diǎn)在注冊(cè)自己,則從brokerAddrs獲取主節(jié)點(diǎn)的地址。如果主節(jié)點(diǎn)地址存在,則進(jìn)一步獲取其 HaServer 地址。將這兩個(gè)數(shù)據(jù)設(shè)置到返回的結(jié)果對(duì)象result中。

  11. 返回結(jié)果對(duì)象result。從代碼可以看出,如果當(dāng)前注冊(cè)不是從節(jié)點(diǎn),或者對(duì)應(yīng)的主節(jié)點(diǎn)不存在,則result是一個(gè)空對(duì)象。

createAndUpdateQueueData

該方法是用于創(chuàng)建或更新 topicQueueTable 中的QueueData對(duì)象的。具體流程如下:

  1. 構(gòu)建一個(gè)QueuData對(duì)象,里面的屬性來(lái)自brokerName、topicConfig 對(duì)象。

  2. 從 topicQueueTable 中獲取topicConfig 主題對(duì)應(yīng)的 queueDataList 對(duì)象。

  3. 如果 queueDataList 不存在,意味著該主題是第一次出現(xiàn)在注冊(cè)服務(wù)器中。構(gòu)建一個(gè)新的linkedList對(duì)象,添加queueData對(duì)象到其中,并且將queueDataList放入到topicQueueTable中。流程結(jié)束。

  4. 如果queueDataList存在,則對(duì)其元素遍歷,執(zhí)行如下子操作。

    1. 元素的brokerName屬性與入?yún)⒌?code>brokerName值相同,則繼續(xù)執(zhí)行后續(xù)流程,否則進(jìn)入下一次循環(huán)迭代。

    2. 判斷元素與步驟1構(gòu)建的對(duì)象是否相同,如果相同,不做操作;如果不同,意味著數(shù)據(jù)有變化,將元素從集合中刪除。

  5. 如果步驟4中有元素被刪除,則將步驟1的對(duì)象,添加到queueDataList中。

unregisterBroker

該方法用于在刪除路由管理器中某一個(gè)Broker的信息。具體流程如下:

  1. brokerLiveTable中刪除該Broker信息。

  2. filterServerTable刪除該Broker的信息。

  3. 聲明一個(gè)局部變量removeBrokerName。從brokerAddrTable獲取該BrokerName對(duì)應(yīng)的brokerData。如果其不為空,則執(zhí)行子流程。

    1. brokerDatabrokerAddrs刪除該brokerId對(duì)應(yīng)的映射。

    2. 如果brokerAddrs集合為空,則從brokerAddrTable刪除該brokerName對(duì)應(yīng)的映射。為removeBrokerName賦值true

  4. 如果removeBrokerName為真,則執(zhí)行子流程,否則流程結(jié)束。

    1. clusterAddrTable獲取該clusterName對(duì)應(yīng)的brokerName的集合,聲明為nameSet。

    2. nameSet不為null的情況下,從nameSet刪除本次的brokerName。如果刪除后nameSet為空,則從clusterAddrTable刪除該brokerName的映射。

    3. 調(diào)用方法removeTopicByBrokerName刪除brokerName對(duì)應(yīng)的主題信息。

removeTopicByBrokerName

該方法用于刪除brokerName對(duì)應(yīng)的主題配置信息,具體執(zhí)行邏輯如下:

  1. 遍歷topicQueueTable,為每一個(gè)元素執(zhí)行后續(xù)邏輯。

  2. 針對(duì)每一個(gè)元素,取出其QueueData列表,遍歷該對(duì)象。執(zhí)行子流程。

    1. 遍歷QueueData列表,如果元素QueueDatabrokerName與入?yún)?code>brokerName相同,則從列表中刪除該元素。

    2. 遍歷完畢后,如果列表為空,則從topicQueueTable中刪除該映射。

pickupTopicRouteData

首先來(lái)看下數(shù)據(jù)結(jié)構(gòu)對(duì)象TopicRouteData的定義,其屬性如下

String orderTopicConf;
List<QueueData> queueDatas;
List<BrokerData> brokerDatas;
HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable

從數(shù)據(jù)結(jié)構(gòu)對(duì)象也可以簡(jiǎn)單的推測(cè)出pickupTopicRouteData方法的實(shí)現(xiàn)邏輯。大致來(lái)說(shuō)分為幾個(gè)步驟:

  1. topicQueueTable按照主題名稱查詢queueDatas。

  2. 根據(jù)queueDatas中每一個(gè)元素QueueDatabrokerName屬性從brokerAddrTable取得brokerData對(duì)象,組成成一個(gè)List,也就是brokerDatas。

  3. 根據(jù)步驟2的brokerDatasfilterServerTable查詢到對(duì)應(yīng)的filterServer列表,組裝為映射。

  4. 將步驟1到3的值組裝為TopicRouteData對(duì)象返回給調(diào)用者。

wipeWritePermOfBrokerByLock

該方法會(huì)以可中斷的方式獲取寫鎖,獲取成功后調(diào)用方法wipeWritePermOfBroker。如果獲取失敗則返回0,獲取成功則執(zhí)行方法wipeWritePermOfBroker執(zhí)行擦除工作。

wipeWritePermOfBroker方法的內(nèi)容也很簡(jiǎn)單,遍歷topicQueueTable,針對(duì)每一個(gè)元素,在遍歷其QueueData,如果brokerName與入?yún)⒌?code>brokerName相同就意味著找到對(duì)應(yīng)的QueueData。將這個(gè)里面的perm屬性重新設(shè)置值,去掉代表寫權(quán)限的標(biāo)志位即可。

getUnitTopics

該方法用于獲取具備unit標(biāo)識(shí)的主題名稱集合。具體流程如下:

  1. 以可中斷的方式獲取讀鎖。遍歷topicQueueTable元素。

  2. 如果鍵值對(duì)中的QueueData列表的首個(gè)元素的topicSynFlag屬性值包含了unit標(biāo)識(shí),將這個(gè)鍵值對(duì)的key,也即是主題名稱加入到臨時(shí)集合中。

  3. 遍歷完后后,返回臨時(shí)集合編碼的二進(jìn)制數(shù)組。

onChannelDestroy

當(dāng)一個(gè)Broker的通道關(guān)閉的時(shí)候,會(huì)觸發(fā)到這個(gè)方法。這個(gè)方法的代碼雖然比較多,但是方法思路很簡(jiǎn)單,首先通過(guò)Channel在brokerLiveTable中找到對(duì)應(yīng)的BrokerLiveInfo對(duì)象。并且依靠這個(gè)對(duì)象的信息,在路由管理器中刪除所有相關(guān)的信息接口。

感謝各位的閱讀!關(guān)于“RocketMQ源碼中如何實(shí)現(xiàn)注冊(cè)服務(wù)器”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!

網(wǎng)站標(biāo)題:RocketMQ源碼中如何實(shí)現(xiàn)注冊(cè)服務(wù)器
文章轉(zhuǎn)載:http://aaarwkj.com/article42/pdhphc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站排名、網(wǎng)站建設(shè)ChatGPT、標(biāo)簽優(yōu)化、企業(yè)網(wǎng)站制作、網(wǎng)站內(nèi)鏈

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)
91精品日日躁夜夜躁欧美| 91色综合久久久久婷婷| 91国产性感美女视频| 黄片免费在线播放欧美| 把熟睡的少妇弄到高潮| 精品熟妇人妻一区二区三区| 亚洲男人天堂在线视频| 偷拍福利视频一区二区三区| 放荡成熟人妻中文字幕| 精品在线免费视频观看| 国产一区免费二区三区四区| 精品熟女少妇av免费观看| 久久亚洲精品中文字幕馆| 囗交囗爆吞精在线视频| 91精品人妻互换一区二区 | 国产在线精品专区第一页 | 男人天堂手机视频在线| 夜夜高潮夜夜爽免费观看| 男人自拍天堂在线视频| 国产一级r内射视频播放| 亚洲日本成人av在线观看| 97资源视频在线播放| 国产av高清视频在线| 91无人区一区二区三乱码| 中文字幕女同系列av厨房| 福利在线午夜绝顶三级| 欧美欧成人一区二区三区a∨| 久久久久久国产精品亚洲| 日日躁夜夜躁狠狠躁黑人| 天天躁日日躁夜夜躁夜夜| 久久国产精品午夜视频| 91性感视频在线播放| 亚洲人成网站在线免费看| 在线看黄色片播放器日韩| 亚洲国产欧美日韩国产| 日本不卡一区二区在线视频| 精品蜜臀国产av一区二区| 国产在线播放精品视频| 中文字幕日韩精品在线看| 厕所偷拍视频一区二区三区| 中文字幕日韩乱码一级在线|