欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

怎么使用RocketMQ事務(wù)消息解決分布式事務(wù)

本篇文章為大家展示了怎么使用RocketMQ事務(wù)消息解決分布式事務(wù),內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。

在瀘溪等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都做網(wǎng)站、成都網(wǎng)站建設(shè)、成都外貿(mào)網(wǎng)站建設(shè) 網(wǎng)站設(shè)計(jì)制作按需網(wǎng)站設(shè)計(jì),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),成都全網(wǎng)營(yíng)銷,外貿(mào)營(yíng)銷網(wǎng)站建設(shè),瀘溪網(wǎng)站建設(shè)費(fèi)用合理。

初步認(rèn)識(shí)RocketMQ的核心模塊

怎么使用RocketMQ事務(wù)消息解決分布式事務(wù)

rocketmq模塊

rocketmq-broker:接受生產(chǎn)者發(fā)來(lái)的消息并存儲(chǔ)(通過(guò)調(diào)用rocketmq-store),消費(fèi)者從這里取得消息。

rocketmq-client:提供發(fā)送、接受消息的客戶端API。

rocketmq-namesrv:NameServer,類似于Zookeeper,這里保存著消息的TopicName,隊(duì)列等運(yùn)行時(shí)的元信息。(有點(diǎn)NameNode的味道)

rocketmq-common:通用的一些類,方法,數(shù)據(jù)結(jié)構(gòu)等

rocketmq-remoting:基于Netty4的client/server + fastjson序列化 + 自定義二進(jìn)制協(xié)議

rocketmq-store:消息、索引存儲(chǔ)等

rocketmq-filtersrv:消息過(guò)濾器Server,需要注意的是,要實(shí)現(xiàn)這種過(guò)濾,需要上傳代碼到MQ!【一般而言,我們利用Tag足以滿足大部分的過(guò)濾需求,如果更靈活更復(fù)雜的過(guò)濾需求,可以考慮filtersrv組件】

rocketmq-tools:命令行工具

分布式消息隊(duì)列RocketMQ—事務(wù)消息—解決分布式事務(wù)

說(shuō)到分布式事務(wù),就會(huì)談到那個(gè)經(jīng)典的”賬號(hào)轉(zhuǎn)賬”問(wèn)題:2個(gè)賬號(hào),分布處于2個(gè)不同的DB,或者說(shuō)2個(gè)不同的子系統(tǒng)里面,A要扣錢,B要加錢,如何保證原子性?

一般的思路都是通過(guò)消息中間件來(lái)實(shí)現(xiàn)“最終一致性”:A系統(tǒng)扣錢,然后發(fā)條消息給中間件,B系統(tǒng)接收此消息,進(jìn)行加錢。

但這里面有個(gè)問(wèn)題:A是先update DB,后發(fā)送消息呢? 還是先發(fā)送消息,后update DB?

假設(shè)先update DB成功,發(fā)送消息網(wǎng)絡(luò)失敗,重發(fā)又失敗,怎么辦?
假設(shè)先發(fā)送消息成功,update DB失敗。消息已經(jīng)發(fā)出去了,又不能撤回,怎么辦?

所以,這里下個(gè)結(jié)論: 只要發(fā)送消息和update DB這2個(gè)操作不是原子的,無(wú)論誰(shuí)先誰(shuí)后,都是有問(wèn)題的。

那這個(gè)問(wèn)題怎么解決呢?

錯(cuò)誤的方案0

有人可能想到了,我可以把“發(fā)送消息”這個(gè)網(wǎng)絡(luò)調(diào)用和update DB放在同1個(gè)事務(wù)里面,如果發(fā)送消息失敗,update DB自動(dòng)回滾。這樣不就保證2個(gè)操作的原子性了嗎?

這個(gè)方案看似正確,其實(shí)是錯(cuò)誤的,原因有2:

(1)網(wǎng)絡(luò)的2將軍問(wèn)題:發(fā)送消息失敗,發(fā)送方并不知道是消息中間件真的沒(méi)有收到消息呢?還是消息已經(jīng)收到了,只是返回response的時(shí)候失敗了?

     如果是已經(jīng)收到消息了,而發(fā)送端認(rèn)為沒(méi)有收到,執(zhí)行update db的回滾操作。則會(huì)導(dǎo)致A賬號(hào)的錢沒(méi)有扣,B賬號(hào)的錢卻加了。

(2)把網(wǎng)絡(luò)調(diào)用放在DB事務(wù)里面,可能會(huì)因?yàn)榫W(wǎng)絡(luò)的延時(shí),導(dǎo)致DB長(zhǎng)事務(wù)。嚴(yán)重的,會(huì)block整個(gè)DB。這個(gè)風(fēng)險(xiǎn)很大。

     基于以上分析,我們知道,這個(gè)方案其實(shí)是錯(cuò)誤的!

方案1–業(yè)務(wù)方自己實(shí)現(xiàn)

假設(shè)消息中間件沒(méi)有提供“事務(wù)消息”功能,比如你用的是Kafka。那如何解決這個(gè)問(wèn)題呢?

解決方案如下:
(1)Producer端準(zhǔn)備1張消息表,把update DB和insert message這2個(gè)操作,放在一個(gè)DB事務(wù)里面。

(2)準(zhǔn)備一個(gè)后臺(tái)程序,源源不斷的把消息表中的message傳送給消息中間件。失敗了,不斷重試重傳。允許消息重復(fù),但消息不會(huì)丟,順序也不會(huì)打亂。

(3)Consumer端準(zhǔn)備一個(gè)判重表。處理過(guò)的消息,記在判重表里面。實(shí)現(xiàn)業(yè)務(wù)的冪等。但這里又涉及一個(gè)原子性問(wèn)題:如果保證消息消費(fèi) + insert message到判重表這2個(gè)操作的原子性?

消費(fèi)成功,但insert判重表失敗,怎么辦?關(guān)于這個(gè),在Kafka的源碼分析系列,第1篇, exactly once問(wèn)題的時(shí)候,有過(guò)討論。

通過(guò)上面3步,我們基本就解決了這里update db和發(fā)送網(wǎng)絡(luò)消息這2個(gè)操作的原子性問(wèn)題。

但這個(gè)方案的一個(gè)缺點(diǎn)就是:需要設(shè)計(jì)DB消息表,同時(shí)還需要一個(gè)后臺(tái)任務(wù),不斷掃描本地消息。導(dǎo)致消息的處理和業(yè)務(wù)邏輯耦合額外增加業(yè)務(wù)方的負(fù)擔(dān)。

方案2 – RocketMQ 事務(wù)消息

為了能解決該問(wèn)題,同時(shí)又不和業(yè)務(wù)耦合,RocketMQ提出了“事務(wù)消息”的概念。

具體來(lái)說(shuō),就是把消息的發(fā)送分成了2個(gè)階段:Prepare階段和確認(rèn)階段。

具體來(lái)說(shuō),上面的2個(gè)步驟,被分解成3個(gè)步驟:
(1) 發(fā)送Prepared消息
(2) update DB
(3) 根據(jù)update DB結(jié)果成功或失敗,Confirm或者取消Prepared消息。

可能有人會(huì)問(wèn)了,前2步執(zhí)行成功了,最后1步失敗了怎么辦?這里就涉及到了RocketMQ的關(guān)鍵點(diǎn):RocketMQ會(huì)定期(默認(rèn)是1分鐘)掃描所有的Prepared消息,詢問(wèn)發(fā)送方,到底是要確認(rèn)這條消息發(fā)出去?還是取消此條消息?

具體代碼實(shí)現(xiàn)如下:

也就是定義了一個(gè)checkListener,RocketMQ會(huì)回調(diào)此Listener,從而實(shí)現(xiàn)上面所說(shuō)的方案。

// 也就是上文所說(shuō)的,當(dāng)RocketMQ發(fā)現(xiàn)`Prepared消息`時(shí),會(huì)根據(jù)這個(gè)Listener實(shí)現(xiàn)的策略來(lái)決斷事務(wù)
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 構(gòu)造事務(wù)消息的生產(chǎn)者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 設(shè)置事務(wù)決斷處理類
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事務(wù)的處理邏輯,相當(dāng)于示例中檢查Bob賬戶并扣錢的邏輯
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 構(gòu)造MSG,省略構(gòu)造參數(shù)
Message msg = new Message(......);
// 發(fā)送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();

然后執(zhí)行本地事務(wù),具體代碼如下

public TransactionSendResult sendMessageInTransaction(.....)  {
    // 邏輯代碼,非實(shí)際代碼
    // 1.發(fā)送消息
    sendResult = this.send(msg);
    // sendResult.getSendStatus() == SEND_OK
    // 2.如果消息發(fā)送成功,處理與消息關(guān)聯(lián)的本地事務(wù)單元
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
    // 3.結(jié)束事務(wù)
    this.endTransaction(sendResult, localTransactionState, localException);
}

怎么使用RocketMQ事務(wù)消息解決分布式事務(wù)

上面所說(shuō)的消息中間件上注冊(cè)的listener,超時(shí)以后會(huì)回調(diào)producer的接口以確定事務(wù)執(zhí)行情況

怎么使用RocketMQ事務(wù)消息解決分布式事務(wù)

總結(jié):對(duì)比方案2和方案1,RocketMQ最大的改變,其實(shí)就是把“掃描消息表”這個(gè)事情,不讓業(yè)務(wù)方做,而是消息中間件幫著做了。

至于消息表,其實(shí)還是沒(méi)有省掉。因?yàn)橄⒅虚g件要詢問(wèn)發(fā)送方,事物是否執(zhí)行成功,還是需要一個(gè)“變相的本地消息表”,記錄事物執(zhí)行狀態(tài)。

人工介入

可能有人又要說(shuō)了,無(wú)論方案1,還是方案2,發(fā)送端把消息成功放入了隊(duì)列,但消費(fèi)端消費(fèi)失敗怎么辦?

消費(fèi)失敗了,重試,還一直失敗怎么辦?是不是要自動(dòng)回滾整個(gè)流程?

答案是人工介入。從工程實(shí)踐角度講,這種整個(gè)流程自動(dòng)回滾的代價(jià)是非常巨大的,不但實(shí)現(xiàn)復(fù)雜,還會(huì)引入新的問(wèn)題。比如自動(dòng)回滾失敗,又怎么處理?

對(duì)應(yīng)這種極低概率的case,采取人工處理,會(huì)比實(shí)現(xiàn)一個(gè)高復(fù)雜的自動(dòng)化回滾系統(tǒng),更加可靠,也更加簡(jiǎn)單。

上述內(nèi)容就是怎么使用RocketMQ事務(wù)消息解決分布式事務(wù),你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

名稱欄目:怎么使用RocketMQ事務(wù)消息解決分布式事務(wù)
文章來(lái)源:http://aaarwkj.com/article34/igihpe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站設(shè)計(jì)、企業(yè)網(wǎng)站制作、移動(dòng)網(wǎng)站建設(shè)、域名注冊(cè)、網(wǎng)站營(yíng)銷

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

搜索引擎優(yōu)化
丰满人妻被猛烈进入中文版| 国产l精品国产亚洲区久久 | 中国吞精囗交免费视频| 久久综合婷婷亚洲五月| 亚洲精品紧身裙女教师av| 日本少妇一区二区99| 日本高清不卡在线一区二区| 国产精品原创传媒在线观看| 国产内射一级一片高清视频观看| 在线精品91国产在线观看| 亚洲成a人片777777久久| 深夜日本福利在线观看| 91精品国产色综合久久不| 国产一区二区主播不卡| 日韩一区不卡在线观看| 久久午夜av一区二区| 亚洲国产中文字幕高清| 深夜三级福利在线观看| 国产夫妻性生活视频播放| 十八禁在线观看网址免费| 亚洲国产欧美日韩在线一区| 色哟哟网站一区二区精品久久| 精品欧美日韩国产一区| 91女厕偷拍女厕偷拍| 成人久久精品一区二区| 粗长挺进新婚人妻诗岚| 午夜性色在线视频福利| 人妻丰满熟妇九九久久| 91在线国产精品视频| 中国一级黄片免费欧美| 精品免费av在线播放| 人人看男人的天堂东京| 18禁止看的视频免费| 黄色亚洲大片免费在线观看 | 一二三四在线观看日本资讯| 中国女人内射69xx| 夫妻晚上同房太猛视频| 亚洲黄色成人免费观看| 中国亚洲黄色录像免费看| 国产成人av在线观看| 日本熟妇一区二区三区高清视频|