這篇文章將為大家詳細(xì)講解有關(guān)如何進(jìn)行RocketMQ事務(wù)消息實(shí)現(xiàn),文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
成都創(chuàng)新互聯(lián)專注于平遠(yuǎn)企業(yè)網(wǎng)站建設(shè),自適應(yīng)網(wǎng)站建設(shè),成都商城網(wǎng)站開發(fā)。平遠(yuǎn)網(wǎng)站建設(shè)公司,為平遠(yuǎn)等地區(qū)提供建站服務(wù)。全流程按需求定制開發(fā),專業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,成都創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)摘要: 事務(wù)消息提交或回滾的實(shí)現(xiàn)原理就是根據(jù)commitlogOffset找到消息,如果是提交動(dòng)作,就恢復(fù)原消息的主題與隊(duì)列,再次存入commitlog文件進(jìn)而轉(zhuǎn)到消息消費(fèi)隊(duì)列,供消費(fèi)者消費(fèi),然后將原預(yù)處理消息存入一個(gè)新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理;回滾消息與提交事務(wù)消息不同的是,提交事務(wù)消息會(huì)將消息恢復(fù)原主題與隊(duì)列,再次存儲(chǔ)在commitlog文件中。
若您對(duì)RocketMQ技術(shù)感興趣,請(qǐng)加入 RocketMQ技術(shù)交流群
小編將重點(diǎn)分析RocketMQ Broker如何處理事務(wù)消息提交、回滾命令,根據(jù)前面的介紹,其入口EndTransactionProcessor#processRequest:
OperationResult result = new OperationResult();if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // @1result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); // @2 if (result.getResponseCode() == ResponseCode.SUCCESS) { // @3 RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); // @4 if (res.getCode() == ResponseCode.SUCCESS) { MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); // @5 msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); // @6 RemotingCommand sendResult = sendFinalMessage(msgInner); // @7 if (sendResult.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); // @8 } return sendResult; } return res; } }
代碼@1:如果請(qǐng)求為提交事務(wù),進(jìn)入事務(wù)消息提交處理流程。
代碼@2:提交消息,別被這名字誤導(dǎo)了,該方法主要是根據(jù)commitLogOffset從commitlog文件中查找消息返回OperationResult實(shí)例:
private MessageExt prepareMessage :消息對(duì)象。
private int responseCode:查找結(jié)果。
private String responseRemark :錯(cuò)誤提示。
代碼@3:如果成功查找到消息,則繼續(xù)處理,否則返回給客戶端,消息未找到錯(cuò)誤信息。
代碼@4:驗(yàn)證消息必要字段。
驗(yàn)證消息的生產(chǎn)組與請(qǐng)求信息中的生產(chǎn)者組是否一致。
驗(yàn)證消息的隊(duì)列偏移量(queueOffset)與請(qǐng)求信息中的偏移量是否一致。
驗(yàn)證消息的commitLogOffset與請(qǐng)求信息中的CommitLogOffset是否一致。
代碼@5:調(diào)用endMessageTransaction方法,該方法主要的目的就是恢復(fù)事務(wù)消息的真實(shí)的主題、隊(duì)列,并設(shè)置事務(wù)ID。
代碼@6:設(shè)置消息的相關(guān)屬性,這一步應(yīng)該直接在endMessageTransaction中實(shí)現(xiàn)就好,統(tǒng)一恢復(fù)原消息的數(shù)量,特別關(guān)注的是取消了事務(wù)相關(guān)的系統(tǒng)標(biāo)記。
代碼@7:發(fā)送最終消息,其實(shí)現(xiàn)原理非常簡(jiǎn)單,調(diào)用MessageStore將消息存儲(chǔ)在commitlog文件中,此時(shí)的消息,會(huì)被轉(zhuǎn)發(fā)到原消息主題對(duì)應(yīng)的消費(fèi)隊(duì)列,被消費(fèi)者消費(fèi)。
代碼@8:刪除預(yù)處理消息(prepare),其實(shí)是將消息存儲(chǔ)在主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC的主題中,代表這些消息已經(jīng)被處理(提交或回滾)。
上述就是事務(wù)消息提交的流程,事務(wù)回滾類似,接下來大概分析一下事務(wù)消息回滾的流程。
EndTransactionProcessor#processRequest else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); // @1 if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); // @2 } return res; } }
代碼@1:回滾消息,其實(shí)內(nèi)部就是根據(jù)commitlogOffset查找消息。
代碼@2:將消息存儲(chǔ)在RMQ_SYS_TRANS_OP_HALF_TOPIC中,代表該消息已被處理,與提交事務(wù)消息不同的是,提交事務(wù)消息會(huì)將消息恢復(fù)原主題與隊(duì)列,再次存儲(chǔ)在commitlog文件中。
事務(wù)消息在Broker服務(wù)端的提交回滾流程就介紹到這了。其核心實(shí)現(xiàn)就是根據(jù)commitlogOffset找到消息,如果是提交動(dòng)作,就恢復(fù)原消息的主題與隊(duì)列,再次存入commitlog文件進(jìn)而轉(zhuǎn)到消息消費(fèi)隊(duì)列,供消費(fèi)者消費(fèi),然后將原預(yù)處理消息存入一個(gè)新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理;回滾消息與提交事務(wù)消息不同的是,提交事務(wù)消息會(huì)將消息恢復(fù)原主題與隊(duì)列,再次存儲(chǔ)在commitlog文件中。
關(guān)于如何進(jìn)行RocketMQ事務(wù)消息實(shí)現(xiàn)就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。
分享文章:如何進(jìn)行RocketMQ事務(wù)消息實(shí)現(xiàn)-創(chuàng)新互聯(lián)
當(dāng)前鏈接:http://aaarwkj.com/article14/jspde.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站內(nèi)鏈、網(wǎng)站排名、響應(yīng)式網(wǎng)站、電子商務(wù)、App開發(fā)、定制開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容