這篇文章主要為大家展示了“spring boot集成rabbitmq的示例分析”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“spring boot集成rabbitmq的示例分析”這篇文章吧。
網(wǎng)站建設(shè)公司,為您提供網(wǎng)站建設(shè),網(wǎng)站制作,網(wǎng)頁設(shè)計(jì)及定制網(wǎng)站建設(shè)服務(wù),專注于企業(yè)網(wǎng)站建設(shè),高端網(wǎng)頁制作,對(duì)地磅秤等多個(gè)行業(yè)擁有豐富的網(wǎng)站建設(shè)經(jīng)驗(yàn)的網(wǎng)站建設(shè)公司。專業(yè)網(wǎng)站設(shè)計(jì),網(wǎng)站優(yōu)化推廣哪家好,專業(yè)成都網(wǎng)站營銷優(yōu)化,H5建站,響應(yīng)式網(wǎng)站。
一、RabbitMQ的介紹
RabbitMQ是消息中間件的一種,消息中間件即分布式系統(tǒng)中完成消息的發(fā)送和接收的基礎(chǔ)軟件.這些軟件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,現(xiàn)已經(jīng)轉(zhuǎn)讓給apache).
消息中間件的工作過程可以用生產(chǎn)者消費(fèi)者模型來表示.即,生產(chǎn)者不斷的向消息隊(duì)列發(fā)送信息,而消費(fèi)者從消息隊(duì)列中消費(fèi)信息.具體過程如下:
從上圖可看出,對(duì)于消息隊(duì)列來說,生產(chǎn)者,消息隊(duì)列,消費(fèi)者是最重要的三個(gè)概念,生產(chǎn)者發(fā)消息到消息隊(duì)列中去,消費(fèi)者監(jiān)聽指定的消息隊(duì)列,并且當(dāng)消息隊(duì)列收到消息之后,接收消息隊(duì)列傳來的消息,并且給予相應(yīng)的處理.消息隊(duì)列常用于分布式系統(tǒng)之間互相信息的傳遞.
對(duì)于RabbitMQ來說,除了這三個(gè)基本模塊以外,還添加了一個(gè)模塊,即交換機(jī)(Exchange).它使得生產(chǎn)者和消息隊(duì)列之間產(chǎn)生了隔離,生產(chǎn)者將消息發(fā)送給交換機(jī),而交換機(jī)則根據(jù)調(diào)度策略把相應(yīng)的消息轉(zhuǎn)發(fā)給對(duì)應(yīng)的消息隊(duì)列.那么RabitMQ的工作流程如下所示:
緊接著說一下交換機(jī).交換機(jī)的主要作用是接收相應(yīng)的消息并且綁定到指定的隊(duì)列.交換機(jī)有四種類型,分別為Direct,topic,headers,Fanout.
Direct是RabbitMQ默認(rèn)的交換機(jī)模式,也是最簡(jiǎn)單的模式.即創(chuàng)建消息隊(duì)列的時(shí)候,指定一個(gè)BindingKey.當(dāng)發(fā)送者發(fā)送消息的時(shí)候,指定對(duì)應(yīng)的Key.當(dāng)Key和消息隊(duì)列的BindingKey一致的時(shí)候,消息將會(huì)被發(fā)送到該消息隊(duì)列中.
topic轉(zhuǎn)發(fā)信息主要是依據(jù)通配符,隊(duì)列和交換機(jī)的綁定主要是依據(jù)一種模式(通配符+字符串),而當(dāng)發(fā)送消息的時(shí)候,只有指定的Key和該模式相匹配的時(shí)候,消息才會(huì)被發(fā)送到該消息隊(duì)列中.
headers也是根據(jù)一個(gè)規(guī)則進(jìn)行匹配,在消息隊(duì)列和交換機(jī)綁定的時(shí)候會(huì)指定一組鍵值對(duì)規(guī)則,而發(fā)送消息的時(shí)候也會(huì)指定一組鍵值對(duì)規(guī)則,當(dāng)兩組鍵值對(duì)規(guī)則相匹配的時(shí)候,消息會(huì)被發(fā)送到匹配的消息隊(duì)列中.
Fanout是路由廣播的形式,將會(huì)把消息發(fā)給綁定它的全部隊(duì)列,即便設(shè)置了key,也會(huì)被忽略.
概念:
生產(chǎn)者 消息的產(chǎn)生方,負(fù)責(zé)將消息推送到消息隊(duì)列
消費(fèi)者 消息的最終接受方,負(fù)責(zé)監(jiān)聽隊(duì)列中的對(duì)應(yīng)消息,消費(fèi)消息
隊(duì)列 消息的寄存器,負(fù)責(zé)存放生產(chǎn)者發(fā)送的消息
交換機(jī) 負(fù)責(zé)根據(jù)一定規(guī)則分發(fā)生產(chǎn)者產(chǎn)生的消息
綁定 完成交換機(jī)和隊(duì)列之間的綁定
模式:
1、direct
直連模式,用于實(shí)例間的任務(wù)分發(fā)
2、topic
話題模式,通過可配置的規(guī)則分發(fā)給綁定在該exchange上的隊(duì)列
3、headers
適用規(guī)則復(fù)雜的分發(fā),用headers里的參數(shù)表達(dá)規(guī)則
4、fanout
分發(fā)給所有綁定到該exchange上的隊(duì)列,忽略routing key
安裝
單機(jī)版安裝很簡(jiǎn)單,大概步驟如下:
# 安裝erlang包 yum install erlang # 安裝socat yum install socat # 安裝rabbit rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm # 啟動(dòng)服務(wù) rabbitmq-server start # 增加管理控制功能 rabbitmq-plugins enable rabbitmq_management # 增加用戶: sudo rabbitmqctl add_user root password rabbitmqctl set_user_tags root administrator rabbitmqctl set_permissions -p / root '.*' '.*' '.*'
集群安裝,可參考這篇文章:
rabbitmq集群安裝
以上就是rabbitmq的介紹,下面開始本文的正文:spring boot 集成rabbitmq ,本人在學(xué)習(xí)rabbitmq時(shí)發(fā)現(xiàn)網(wǎng)上很少有系統(tǒng)性介紹springboot和rabbitmq如何集成的,其他人總結(jié)的都片段化,所以結(jié)合個(gè)人調(diào)研過程,整理此篇文章。
二、springboot配置
廢話少說直接上代碼:
配置參數(shù)
application.yml:
spring: rabbitmq: addresses: 192.168.1.1:5672 username: username password: password publisher-confirms: true virtual-host: /
java config讀取參數(shù)
/** * RabbitMq配置文件讀取類 * * @author chenhf * @create 2017-10-23 上午9:31 **/ @Configuration @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitMqConfig { @Value("${spring.rabbitmq.addresses}") private String addresses; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.publisher-confirms}") private Boolean publisherConfirms; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; // 構(gòu)建mq實(shí)例工廠 @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(addresses); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setPublisherConfirms(publisherConfirms); connectionFactory.setVirtualHost(virtualHost); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ return new RabbitAdmin(connectionFactory); } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate(){ RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } }
三、rabbitmq生產(chǎn)者配置
主要配置了直連和話題模式,其中話題模式設(shè)置兩個(gè)隊(duì)列(queueTopicTest1、queueTopicTest2),此兩個(gè)隊(duì)列在和交換機(jī)綁定時(shí)分別設(shè)置不同的routingkey(.TEST.以及l(fā)azy.#)來驗(yàn)證匹配模式。
/** * 用于配置交換機(jī)和隊(duì)列對(duì)應(yīng)關(guān)系 * 新增消息隊(duì)列應(yīng)該按照如下步驟 * 1、增加queue bean,參見queueXXXX方法 * 2、增加queue和exchange的binding * @author chenhf * @create 2017-10-23 上午10:33 **/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class RabbitMqExchangeConfig { /** logger */ private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class); /** * @Author:chenhf * @Description: 主題型交換機(jī) * @Date:下午5:49 2017/10/23 * @param * @return */ @Bean TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){ TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode()); rabbitAdmin.declareExchange(contractTopicExchange); logger.debug("完成主題型交換機(jī)bean實(shí)例化"); return contractTopicExchange; } /** * 直連型交換機(jī) */ @Bean DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) { DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode()); rabbitAdmin.declareExchange(contractDirectExchange); logger.debug("完成直連型交換機(jī)bean實(shí)例化"); return contractDirectExchange; } //在此可以定義隊(duì)列 @Bean Queue queueTest(RabbitAdmin rabbitAdmin){ Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode()); rabbitAdmin.declareQueue(queue); logger.debug("測(cè)試隊(duì)列實(shí)例化完成"); return queue; } //topic 1 @Bean Queue queueTopicTest1(RabbitAdmin rabbitAdmin){ Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode()); rabbitAdmin.declareQueue(queue); logger.debug("話題測(cè)試隊(duì)列1實(shí)例化完成"); return queue; } //topic 2 @Bean Queue queueTopicTest2(RabbitAdmin rabbitAdmin){ Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode()); rabbitAdmin.declareQueue(queue); logger.debug("話題測(cè)試隊(duì)列2實(shí)例化完成"); return queue; } //在此處完成隊(duì)列和交換機(jī)綁定 @Bean Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){ Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode()); rabbitAdmin.declareBinding(binding); logger.debug("測(cè)試隊(duì)列與直連型交換機(jī)綁定完成"); return binding; } //topic binding1 @Bean Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){ Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode()); rabbitAdmin.declareBinding(binding); logger.debug("測(cè)試隊(duì)列與話題交換機(jī)1綁定完成"); return binding; } //topic binding2 @Bean Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){ Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode()); rabbitAdmin.declareBinding(binding); logger.debug("測(cè)試隊(duì)列與話題交換機(jī)2綁定完成"); return binding; } }
在這里用到枚舉類:RabbitMqEnum
/** * 定義rabbitMq需要的常量 * * @author chenhf * @create 2017-10-23 下午4:07 **/ public class RabbitMqEnum { /** * @param * @Author:chenhf * @Description:定義數(shù)據(jù)交換方式 * @Date:下午4:08 2017/10/23 * @return */ public enum Exchange { CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分發(fā)"), CONTRACT_TOPIC("CONTRACT_TOPIC", "消息訂閱"), CONTRACT_DIRECT("CONTRACT_DIRECT", "點(diǎn)對(duì)點(diǎn)"); private String code; private String name; Exchange(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } } /** * describe: 定義隊(duì)列名稱 * creat_user: chenhf * creat_date: 2017/10/31 **/ public enum QueueName { TESTQUEUE("TESTQUEUE", "測(cè)試隊(duì)列"), TOPICTEST1("TOPICTEST1", "topic測(cè)試隊(duì)列"), TOPICTEST2("TOPICTEST2", "topic測(cè)試隊(duì)列"); private String code; private String name; QueueName(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } } /** * describe: 定義routing_key * creat_user: chenhf * creat_date: 2017/10/31 **/ public enum QueueEnum { TESTQUEUE("TESTQUEUE1", "測(cè)試隊(duì)列key"), TESTTOPICQUEUE1("*.TEST.*", "topic測(cè)試隊(duì)列key"), TESTTOPICQUEUE2("lazy.#", "topic測(cè)試隊(duì)列key"); private String code; private String name; QueueEnum(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } } }
以上完成消息生產(chǎn)者的定義,下面封裝調(diào)用接口
測(cè)試時(shí)直接調(diào)用此工具類,testUser類需自己實(shí)現(xiàn)
rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser); rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser); rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser);
/** * rabbitmq發(fā)送消息工具類 * * @author chenhf * @create 2017-10-26 上午11:10 **/ @Component public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{ /** logger */ private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class); private RabbitTemplate rabbitTemplate; @Autowired public RabbitMqSender(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; this.rabbitTemplate.setConfirmCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { logger.info("confirm: " + correlationData.getId()); } /** * 發(fā)送到 指定routekey的指定queue * @param routeKey * @param obj */ public void sendRabbitmqDirect(String routeKey,Object obj) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); logger.info("send: " + correlationData.getId()); this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData); } /** * 所有發(fā)送到Topic Exchange的消息被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定Topic的Queue上 * @param routeKey * @param obj */ public void sendRabbitmqTopic(String routeKey,Object obj) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); logger.info("send: " + correlationData.getId()); this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData); } }
四、rabbitmq消費(fèi)者配置
springboot注解方式監(jiān)聽隊(duì)列,無法手動(dòng)指定回調(diào),所以采用了實(shí)現(xiàn)ChannelAwareMessageListener接口,重寫onMessage來進(jìn)行手動(dòng)回調(diào),詳見以下代碼,詳細(xì)介紹可以在spring的官網(wǎng)上找amqp相關(guān)章節(jié)閱讀
直連消費(fèi)者
通過設(shè)置TestUser的name來測(cè)試回調(diào),分別發(fā)兩條消息,一條UserName為1,一條為2,查看控制臺(tái)中隊(duì)列中消息是否被消費(fèi)
/** * 消費(fèi)者配置 * * @author chenhf * @create 2017-10-30 下午3:14 **/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class ExampleAmqpConfiguration { @Bean("testQueueContainer") public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("TESTQUEUE"); container.setMessageListener(exampleListener()); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); return container; } @Bean("testQueueListener") public ChannelAwareMessageListener exampleListener() { return new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody()); //通過設(shè)置TestUser的name來測(cè)試回調(diào),分別發(fā)兩條消息,一條UserName為1,一條為2,查看控制臺(tái)中隊(duì)列中消息是否被消費(fèi) if ("2".equals(testUser.getUserName())){ System.out.println(testUser.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } if ("1".equals(testUser.getUserName())){ System.out.println(testUser.toString()); channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); } } }; } }
topic消費(fèi)者1
/** * 消費(fèi)者配置 * * @author chenhf * @create 2017-10-30 下午3:14 **/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class TopicAmqpConfiguration { @Bean("topicTest1Container") public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("TOPICTEST1"); container.setMessageListener(exampleListener1()); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); return container; } @Bean("topicTest1Listener") public ChannelAwareMessageListener exampleListener1(){ return new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody()); System.out.println("TOPICTEST1:"+testUser.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }; } }
topic消費(fèi)者2
/** * 消費(fèi)者配置 * * @author chenhf * @create 2017-10-30 下午3:14 **/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class TopicAmqpConfiguration2 { @Bean("topicTest2Container") public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("TOPICTEST2"); container.setMessageListener(exampleListener()); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); return container; } @Bean("topicTest2Listener") public ChannelAwareMessageListener exampleListener() { return new ChannelAwareMessageListener() { @Override public void
以上是“spring boot集成rabbitmq的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
本文標(biāo)題:springboot集成rabbitmq的示例分析
文章路徑:http://aaarwkj.com/article22/gipjcc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營銷、品牌網(wǎng)站設(shè)計(jì)、動(dòng)態(tài)網(wǎng)站、域名注冊(cè)、網(wǎng)站內(nèi)鏈、App設(shè)計(jì)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)