欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

RocketMQ中如何實(shí)現(xiàn)producer消息發(fā)送

這篇文章主要介紹了RocketMQ中如何實(shí)現(xiàn)producer消息發(fā)送,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

在阿合奇等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站建設(shè)、成都做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作定制網(wǎng)站建設(shè),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),全網(wǎng)整合營銷推廣,成都外貿(mào)網(wǎng)站建設(shè)公司,阿合奇網(wǎng)站建設(shè)費(fèi)用合理。

RocketMQ中一個topic可以分布在多個broker上,每個broker上又可以包含多個message queue。每個message具體發(fā)送到哪個broker的哪個message queue中,這個決策過程是在client端完成的。client會從nameserver訂閱topic的路由信息,按照一定的負(fù)載均衡算法為每個message選擇出一個queue,并完成投遞。producer的消息投遞過程大體如下圖:

RocketMQ中如何實(shí)現(xiàn)producer消息發(fā)送

1. 出錯重試

producer的投遞出錯重試分為兩種情況,同步出錯重試(sync)以及異步出錯重試(async)。同步發(fā)送出錯重試是在DefaultMQProducerImpl.sendDefaultImpl 中完成:

            // SYNC模式則開啟重試,ASYNC模式重試在MQClientAPIImpl中實(shí)現(xiàn)
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) { // 循環(huán)重試最多timesTotal次
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        // ....

                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        // ...
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        // ...
                        exception = e;
                        continue; // 出錯,進(jìn)入下一次重試
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        // ...
                        exception = e;
                        continue; // 出錯,進(jìn)入下一次重試
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        // ...
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue; // 出錯,進(jìn)入下一次重試
                            default:
                                if (sendResult != null) { // 放棄重試
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        // ...
                        throw e; // 放棄重試
                    }
                } else {
                    break;
                }
            }

異步發(fā)送的出錯重試則是在更底層的MQClientAPIImpl.sendMessageAsync里實(shí)現(xiàn):

     // 調(diào)用invokeAsync發(fā)送消息      
     this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                long cost = System.currentTimeMillis() - beginStartTime;
                RemotingCommand response = responseFuture.getResponseCommand();
                // ...

                if (response != null) {
                    try {
                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                        assert sendResult != null;
                        if (context != null) {
                            context.setSendResult(sendResult);
                            context.getProducer().executeSendMessageHookAfter(context);
                        }

                        try {
                            sendCallback.onSuccess(sendResult);
                        } catch (Throwable e) {
                        }

                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                    } catch (Exception e) {
                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                        // 發(fā)送成功,processSendReponse異常重試
                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, e, context, false, producer);
                    }
                } else {
                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                    if (!responseFuture.isSendRequestOK()) {
                        MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
                        // 消息發(fā)送失敗,重試
                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    } else if (responseFuture.isTimeout()) {
                        MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
                            responseFuture.getCause());
                        // 發(fā)送超時,重試
                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    } else {
                        MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
                        // 其他錯誤,重試
                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    }
                }
            }
        });

在異常處理的onExceptionImpl方法中會再次觸發(fā)sendMessageAsync方法:

    private void onExceptionImpl(final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int timesTotal,
        final AtomicInteger curTimes,
        final Exception e,
        final SendMessageContext context,
        final boolean needRetry,
        final DefaultMQProducerImpl producer
    ) {
        int tmp = curTimes.incrementAndGet(); // 將已重試次數(shù)+1
        if (needRetry && tmp <= timesTotal) { // 重試
            String retryBrokerName = brokerName;//by default, it will send to the same broker
            if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
                MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
                retryBrokerName = mqChosen.getBrokerName();
            }
            String addr = instance.findBrokerAddressInPublish(retryBrokerName);
            log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
                retryBrokerName);
            try {
                request.setOpaque(RemotingCommand.createNewRequestId());
                sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                    timesTotal, curTimes, context, producer);
            } catch (InterruptedException e1) {
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, false, producer);
            } catch (RemotingConnectException e1) {
                producer.updateFaultItem(brokerName, 3000, true);
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, true, producer);
            } catch (RemotingTooMuchRequestException e1) {
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, false, producer);
            } catch (RemotingException e1) {
                producer.updateFaultItem(brokerName, 3000, true);
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, true, producer);
            }
        } else {

            // ...
        }
    }

2. 負(fù)載均衡

消息的負(fù)載均衡是通過MQFaultStrategy.selectOneMessageQueue方法來實(shí)現(xiàn)的:

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    // 輪詢的方式選擇下一個queue
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // 根據(jù)latency統(tǒng)計(jì)數(shù)據(jù)判斷是否可用
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        // 優(yōu)先選擇同一個broker上的queue
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“RocketMQ中如何實(shí)現(xiàn)producer消息發(fā)送”這篇文章對大家有幫助,同時也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!

標(biāo)題名稱:RocketMQ中如何實(shí)現(xiàn)producer消息發(fā)送
標(biāo)題鏈接:http://aaarwkj.com/article20/pchjco.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供關(guān)鍵詞優(yōu)化、網(wǎng)站導(dǎo)航、標(biāo)簽優(yōu)化用戶體驗(yàn)、全網(wǎng)營銷推廣、面包屑導(dǎo)航

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站建設(shè)
亚洲午夜一区二区三区精品| 国产日韩欧美在线精品| 欧美一区二区三区人妻熟妇| 欧美日韩欧美国产精品| 青青草原成年人免费看| 青青草免费视频观看在线| 欧美颜射一区二区三区| 欧美日韩国产一区在线| 亚洲第六页亚洲第一页| 欧美午夜福利视频电影| 日本久久91跳蛋视频| 给我搜亚洲免费播放黄色大片| 亚洲天堂av一区二区在线| 国产免费播放一区二区三区| 粉嫩av北条麻妃电影| 亚洲精品熟女国产中文| 精品一区二区三区高清| 欧美黄片完整版在线观看 | 日韩精品在线观看不卡| 亚洲激情一区在线观看| 亚洲中文字幕av天堂久久| 女厕所偷拍一区二区三区| 国产亚洲加勒比久久精品| 性生活视性生活大片日本| 国产欧美日韩一区二区三区四区| 亚洲欧美另类国产一区| 久草免费人妻视频在线| 亚洲清纯唯美激情四射| 日韩人妻有码中文字幕| 在线激情视频一区二区| 亚洲精品一区二区牛仔裤 | 日本精品亚洲一区二区三区| 国产91啦中文在线观看| 一区三区精品久久久精品| 亚洲日本中文字幕免费观看 | 国产我不卡在线观看免费| 国产三级久久精品三级91| 免费看国产一级黄色大片| 老牛av一区二区三区| 国产一区二区麻豆视频| 久久婷婷av一区二区三区|