欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

SpringBoot如何優(yōu)雅的使用RocketMQ

MQ,是一種跨進(jìn)程的通信機(jī)制,用于上下游傳遞消息。在傳統(tǒng)的互聯(lián)網(wǎng)架構(gòu)中通常使用MQ來對上下游來做解耦合。

成都創(chuàng)新互聯(lián)主要從事網(wǎng)站設(shè)計、成都網(wǎng)站制作、網(wǎng)頁設(shè)計、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)天山,十多年網(wǎng)站建設(shè)經(jīng)驗,價格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):18980820575

舉例:當(dāng)A系統(tǒng)對B系統(tǒng)進(jìn)行消息通訊,如A系統(tǒng)發(fā)布一條系統(tǒng)公告,B系統(tǒng)可以訂閱該頻道進(jìn)行系統(tǒng)公告同步,整個過程中A系統(tǒng)并不關(guān)系B系統(tǒng)會不會同步,由訂閱該頻道的系統(tǒng)自行處理。

什么是RocketMQ?

官方說明:

隨著使用越來越多的隊列和虛擬主題,ActiveMQ IO模塊遇到了瓶頸。我們盡力通過節(jié)流,斷路器或降級來解決此問題,但效果不佳。因此,我們那時開始關(guān)注流行的消息傳遞解決方案Kafka。不幸的是,Kafka不能滿足我們的要求,特別是在低延遲和高可靠性方面。

看到這里可以很清楚的知道RcoketMQ 是一款低延遲、高可靠、可伸縮、易于使用的消息中間件。

具有以下特性:

  • 支持發(fā)布/訂閱(Pub/Sub)和點對點(P2P)消息模型
  • 能夠保證嚴(yán)格的消息順序,在一個隊列中可靠的先進(jìn)先出(FIFO)和嚴(yán)格的順序傳遞
  • 提供豐富的消息拉取模式,支持拉(pull)和推(push)兩種消息模式
  • 單一隊列百萬消息的堆積能力,億級消息堆積能力
  • 支持多種消息協(xié)議,如 JMS、MQTT 等
  • 分布式高可用的部署架構(gòu),滿足至少一次消息傳遞語義
RocketMQ環(huán)境安裝

下載地址:https://rocketmq.apache.org/dowloading/releases/

從官方下載二進(jìn)制或者源碼來進(jìn)行使用。源碼編譯需要Maven3.2x,JDK8

在根目錄進(jìn)行打包:

mvn -Prelease-all -DskipTests clean packager -U

distribution/target/apache-rocketmq文件夾中會存在一個文件夾版,zip,tar三個可運行的完整程序。

使用rocketmq-4.6.0.zip:

  1. 啟動名稱服務(wù) mqnamesrv.cmd
  2. 啟動數(shù)據(jù)中心 mqbroker.cmd -n localhost:9876
SpringBoot環(huán)境中使用RocketMQ

SpringBoot 入門:https://www.cnblogs.com/SimpleWu/p/10027237.html
SpringBoot 常用start:https://www.cnblogs.com/SimpleWu/p/9798146.html
當(dāng)前環(huán)境版本為:

  • SpringBoot 2.0.6.RELEASE
  • SpringCloud Finchley.RELEASE
  • SpringCldod Alibaba 0.2.1.RELEASE
  • RocketMQ 4.3.0
    在項目工程中導(dǎo)入:
    <!-- MQ Begin -->
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>${rocketmq.version}</version>
    </dependency>
    <!-- MQ End -->

    由于我們這邊已經(jīng)有工程了所以就不在進(jìn)行創(chuàng)建這種過程了。主要是看看如何使用RocketMQ。
    創(chuàng)建RocketMQProperties配置屬性類,類中內(nèi)容如下:

    @ConfigurationProperties(prefix = "rocketmq")
    public class RocketMQProperties {
    private boolean isEnable = false;
    private String namesrvAddr = "localhost:9876";
    private String groupName = "default";
    private int producerMaxMessageSize = 1024;
    private int producerSendMsgTimeout = 2000;
    private int producerRetryTimesWhenSendFailed = 2;
    private int consumerConsumeThreadMin = 5;
    private int consumerConsumeThreadMax = 30;
    private int consumerConsumeMessageBatchMaxSize = 1;
    //省略get set
    }

    現(xiàn)在我們所有子系統(tǒng)中的生產(chǎn)者,消費者對應(yīng):
    isEnable 是否開啟mq
    namesrvAddr 集群地址
    groupName 分組名稱
    設(shè)置為統(tǒng)一已方便系統(tǒng)對接,如有其它需求在進(jìn)行擴(kuò)展,類中我們已經(jīng)給了默認(rèn)值也可以在配置文件或配置中心中獲取配置,配置如下:

    #發(fā)送同一類消息的設(shè)置為同一個group,保證唯一,默認(rèn)不需要設(shè)置,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一標(biāo)示
    rocketmq.groupName=please_rename_unique_group_name
    #是否開啟自動配置
    rocketmq.isEnable=true
    #mq的nameserver地址
    rocketmq.namesrvAddr=127.0.0.1:9876
    #消息最大長度 默認(rèn)1024*4(4M)
    rocketmq.producer.maxMessageSize=4096
    #發(fā)送消息超時時間,默認(rèn)3000
    rocketmq.producer.sendMsgTimeout=3000
    #發(fā)送消息失敗重試次數(shù),默認(rèn)2
    rocketmq.producer.retryTimesWhenSendFailed=2
    #消費者線程數(shù)量
    rocketmq.consumer.consumeThreadMin=5
    rocketmq.consumer.consumeThreadMax=32
    #設(shè)置一次消費消息的條數(shù),默認(rèn)為1條
    rocketmq.consumer.consumeMessageBatchMaxSize=1

創(chuàng)建消費者接口 RocketConsumer.java 該接口用戶約束消費者需要的核心步驟:

/**
 * 消費者接口
 * 
 * @author SimpleWu
 *
 */
public interface RocketConsumer {

/**
     * 初始化消費者
     */
    public abstract void init();

    /**
     * 注冊監(jiān)聽
     * 
     * @param messageListener
     */
    public void registerMessageListener(MessageListener messageListener);

}

創(chuàng)建抽象消費者 AbstractRocketConsumer.java:

/**
 * 消費者基本信息
 * 
 * @author SimpelWu
 */
public abstract class AbstractRocketConsumer implements RocketConsumer {

    protected String topics;
    protected String tags;
    protected MessageListener messageListener;
    protected String consumerTitel;
    protected MQPushConsumer mqPushConsumer;

    /**
     * 必要的信息
     * 
     * @param topics
     * @param tags
     * @param consumerTitel
     */
    public void necessary(String topics, String tags, String consumerTitel) {
        this.topics = topics;
        this.tags = tags;
        this.consumerTitel = consumerTitel;
    }

    public abstract void init();

    @Override
    public void registerMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;
    }

}

在類中我們必須指定這個topics,tags與消息監(jiān)聽邏輯
public abstract void init();該方法是用于初始化消費者,由子類實現(xiàn)。
接下來我們編寫自動配置類RocketMQConfiguation.java,該類用戶初始化一個默認(rèn)的生產(chǎn)者連接,以及加載所有的消費者。
@EnableConfigurationProperties({ RocketMQProperties.class }) 使用該配置文件
@Configuration 標(biāo)注為配置類
@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true") 只有當(dāng)配置中指定rocketmq.isEnable = true的時候才會生效
核心內(nèi)容如下:

/**
 * mq配置
 * 
 * @author SimpleWu
 */
@Configuration
@EnableConfigurationProperties({ RocketMQProperties.class })
@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true")
public class RocketMQConfiguation {

    private RocketMQProperties properties;

    private ApplicationContext applicationContext;

    private Logger log = LoggerFactory.getLogger(RocketMQConfiguation.class);

    public RocketMQConfiguation(RocketMQProperties properties, ApplicationContext applicationContext) {
        this.properties = properties;
        this.applicationContext = applicationContext;
    }

    /**
     * 注入一個默認(rèn)的消費者
     * @return
     * @throws MQClientException
     */
    @Bean
    public DefaultMQProducer getRocketMQProducer() throws MQClientException {
        if (StringUtils.isEmpty(properties.getGroupName())) {
            throw new MQClientException(-1, "groupName is blank");
        }

        if (StringUtils.isEmpty(properties.getNamesrvAddr())) {
            throw new MQClientException(-1, "nameServerAddr is blank");
        }
        DefaultMQProducer producer;
        producer = new DefaultMQProducer(properties.getGroupName());

        producer.setNamesrvAddr(properties.getNamesrvAddr());
        // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");

        // 如果需要同一個jvm中不同的producer往不同的mq集群發(fā)送消息,需要設(shè)置不同的instanceName
        // producer.setInstanceName(instanceName);
        producer.setMaxMessageSize(properties.getProducerMaxMessageSize());
        producer.setSendMsgTimeout(properties.getProducerSendMsgTimeout());
        // 如果發(fā)送消息失敗,設(shè)置重試次數(shù),默認(rèn)為2次
        producer.setRetryTimesWhenSendFailed(properties.getProducerRetryTimesWhenSendFailed());

        try {
            producer.start();
            log.info("producer is start ! groupName:{},namesrvAddr:{}", properties.getGroupName(),
                    properties.getNamesrvAddr());
        } catch (MQClientException e) {
            log.error(String.format("producer is error {}", e.getMessage(), e));
            throw e;
        }
        return producer;

    }

    /**
     * SpringBoot啟動時加載所有消費者
     */
    @PostConstruct
    public void initConsumer() {
        Map<String, AbstractRocketConsumer> consumers = applicationContext.getBeansOfType(AbstractRocketConsumer.class);
        if (consumers == null || consumers.size() == 0) {
            log.info("init rocket consumer 0");
        }
        Iterator<String> beans = consumers.keySet().iterator();
        while (beans.hasNext()) {
            String beanName = (String) beans.next();
            AbstractRocketConsumer consumer = consumers.get(beanName);
            consumer.init();
            createConsumer(consumer);
            log.info("init success consumer title {} , toips {} , tags {}", consumer.consumerTitel, consumer.tags,
                    consumer.topics);
        }
    }

    /**
     * 通過消費者信心創(chuàng)建消費者
     * 
     * @param consumerPojo
     */
    public void createConsumer(AbstractRocketConsumer arc) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.properties.getGroupName());
        consumer.setNamesrvAddr(this.properties.getNamesrvAddr());
        consumer.setConsumeThreadMin(this.properties.getConsumerConsumeThreadMin());
        consumer.setConsumeThreadMax(this.properties.getConsumerConsumeThreadMax());
        consumer.registerMessageListener(arc.messageListenerConcurrently);
        /**
         * 設(shè)置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 如果非第一次啟動,那么按照上次消費的位置繼續(xù)消費
         */
        // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        /**
         * 設(shè)置消費模型,集群還是廣播,默認(rèn)為集群
         */
        // consumer.setMessageModel(MessageModel.CLUSTERING);

        /**
         * 設(shè)置一次消費消息的條數(shù),默認(rèn)為1條
         */
        consumer.setConsumeMessageBatchMaxSize(this.properties.getConsumerConsumeMessageBatchMaxSize());
        try {
            consumer.subscribe(arc.topics, arc.tags);
            consumer.start();
            arc.mqPushConsumer=consumer;
        } catch (MQClientException e) {
            log.error("info consumer title {}", arc.consumerTitel, e);
        }

    }

}

然后在src/main/resources文件夾中創(chuàng)建目錄與文件META-INF/spring.factories里面添加自動配置類即可開啟啟動配置,我們只需要導(dǎo)入依賴即可:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.xcloud.config.rocketmq.RocketMQConfiguation

接下來在服務(wù)中導(dǎo)入依賴,然后通過我們的抽象類獲取所有必要信息對消費者進(jìn)行創(chuàng)建,該步驟會在所有消費者初始化完成后進(jìn)行,且只會管理是Spring Bean的消費者。下面我們看看如何創(chuàng)建一個消費者,創(chuàng)建消費者的步驟非常簡單,只需要繼承AbstractRocketConsumer然后再加上Spring的@Component就能夠完成消費者的創(chuàng)建,我們可以在類中自定義消費的主題與標(biāo)簽。
br/>下面我們看看如何創(chuàng)建一個消費者,創(chuàng)建消費者的步驟非常簡單,只需要繼承AbstractRocketConsumer然后再加上Spring的@Component就能夠完成消費者的創(chuàng)建,我們可以在類中自定義消費的主題與標(biāo)簽。
創(chuàng)建一個默認(rèn)的消費者 DefaultConsumerMQ.java

@Component
public class DefaultConsumerMQ extends AbstractRocketConsumer {
    /**
     * 初始化消費者
     */
    @Override
    public void init() {
        // 設(shè)置主題,標(biāo)簽與消費者標(biāo)題
        super.necessary("TopicTest", "*", "這是標(biāo)題");
        //消費者具體執(zhí)行邏輯
        registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach(msg -> {
                    System.out.printf("consumer message boyd %s %n", new String(msg.getBody()));
                });
                // 標(biāo)記該消息已經(jīng)被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
    }
}

super.necessary("TopicTest", "*", "這是標(biāo)題"); 是必須要設(shè)置的,代表該消費者監(jiān)聽TopicTest主題下所有tags,標(biāo)題那個字段是我自己定義的,所以對于該配置來說沒什么意義。
我們可以在這里注入Spring的Bean來進(jìn)行任意邏輯處理。
創(chuàng)建一個消息發(fā)送類進(jìn)行測試

@Override
public String qmtest(@PathVariable("name")String name) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException {
    Message msg = new Message("TopicTest", "tags1", name.getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 發(fā)送消息到一個Broker
    SendResult sendResult = defaultMQProducer.send(msg);
    // 通過sendResult返回消息是否成功送達(dá)
    System.out.printf("%s%n", sendResult);
    return null;
}

我們來通過Http請求測試:

http://localhost:10001/demo/base/mq/hello  consumer message boyd hello 
http://localhost:10001/demo/base/mq/嘿嘿嘿嘿嘿  consumer message boyd 嘿嘿嘿嘿嘿 

好了到這里簡單的start算是設(shè)計完成了,后面還有一些:順序消息生產(chǎn),順序消費消息,異步消息生產(chǎn)等一系列功能,官人可參照官方去自行處理。

  • ActiveMQ 沒經(jīng)過大規(guī)模吞吐量場景的驗證,社區(qū)不高不活躍。
  • RabbitMQ 集群動態(tài)擴(kuò)展麻煩,且與當(dāng)前程序語言不至于難以定制化。
  • kafka 支持主要的MQ功能,功能無法達(dá)到程序需求的要求,所以不使用,且與當(dāng)前程序語言不至于難以定制化。
  • rocketMQ 經(jīng)過全世界的女人的洗禮,已經(jīng)很強大;MQ功能較為完善,還是分布式的,擴(kuò)展性好;支持復(fù)雜MQ業(yè)務(wù)場景。(業(yè)務(wù)復(fù)雜可做首選)

網(wǎng)站欄目:SpringBoot如何優(yōu)雅的使用RocketMQ
網(wǎng)站路徑:http://aaarwkj.com/article2/gjogic.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供自適應(yīng)網(wǎng)站、網(wǎng)站制作、定制網(wǎng)站做網(wǎng)站、Google、網(wǎng)站策劃

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站優(yōu)化排名
国产日韩欧美国产精品| 亚洲av成人在线资源| av真人青青小草一区二区欧美| 国产亚洲欧美另类网爆| 日韩精品极品在线免费视频| 91欧美精品一区二区| 精品人妻一区二区三区四| 91成年精品一区在线观看| 亚洲精品视频一区二区| 成人av资源在线观看| 色播五月麻豆激情综合网| 欧美日本一道本一区二区三区 | 日本午夜视频在线观看| 国产午夜激情自拍视频| 欧美国产激情一区二区| 美女黄色午夜福利网站| 午夜激情视频免费国产| 高潮少妇水多毛多av| 精品国产品国语在线不卡| 亚洲日本高清一二三区| 国产精品一区久久91| 欧美私人影院—区二区日本| 亚洲美女高清一区二区三区| 水蜜桃成人在线视频免费观看| 亚洲欧美日韩专区一区| av大全网站免费一区二区| 亚洲一区二区偷拍精品| 麻豆精品新av中文字幕| 十八女毛片一区二区三区| 免费看真人性生活视频| 中文字幕色视频在线观看| 精品一区二区日韩在线| 成人av免费高清在线播放| 欧美丰满人妻少妇视频在线| 97在线资源视频播放| 亚洲午夜福利影院在线免费观看 | 日本精品视频免费网| 亚洲国产日韩欧美第一页| 青草视频在线播放免费| 欧美一区二区三区日| 亚洲免费av第一区第二区|