欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

springboot集成rabbitmq的示例分析

這篇文章主要為大家展示了“spring boot集成rabbitmq的示例分析”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“spring boot集成rabbitmq的示例分析”這篇文章吧。

網(wǎng)站建設(shè)公司,為您提供網(wǎng)站建設(shè),網(wǎng)站制作,網(wǎng)頁設(shè)計(jì)及定制網(wǎng)站建設(shè)服務(wù),專注于企業(yè)網(wǎng)站建設(shè),高端網(wǎng)頁制作,對(duì)地磅秤等多個(gè)行業(yè)擁有豐富的網(wǎng)站建設(shè)經(jīng)驗(yàn)的網(wǎng)站建設(shè)公司。專業(yè)網(wǎng)站設(shè)計(jì),網(wǎng)站優(yōu)化推廣哪家好,專業(yè)成都網(wǎng)站營銷優(yōu)化,H5建站,響應(yīng)式網(wǎng)站。

一、RabbitMQ的介紹  

RabbitMQ是消息中間件的一種,消息中間件即分布式系統(tǒng)中完成消息的發(fā)送和接收的基礎(chǔ)軟件.這些軟件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,現(xiàn)已經(jīng)轉(zhuǎn)讓給apache).

消息中間件的工作過程可以用生產(chǎn)者消費(fèi)者模型來表示.即,生產(chǎn)者不斷的向消息隊(duì)列發(fā)送信息,而消費(fèi)者從消息隊(duì)列中消費(fèi)信息.具體過程如下:

spring boot集成rabbitmq的示例分析

從上圖可看出,對(duì)于消息隊(duì)列來說,生產(chǎn)者,消息隊(duì)列,消費(fèi)者是最重要的三個(gè)概念,生產(chǎn)者發(fā)消息到消息隊(duì)列中去,消費(fèi)者監(jiān)聽指定的消息隊(duì)列,并且當(dāng)消息隊(duì)列收到消息之后,接收消息隊(duì)列傳來的消息,并且給予相應(yīng)的處理.消息隊(duì)列常用于分布式系統(tǒng)之間互相信息的傳遞.

對(duì)于RabbitMQ來說,除了這三個(gè)基本模塊以外,還添加了一個(gè)模塊,即交換機(jī)(Exchange).它使得生產(chǎn)者和消息隊(duì)列之間產(chǎn)生了隔離,生產(chǎn)者將消息發(fā)送給交換機(jī),而交換機(jī)則根據(jù)調(diào)度策略把相應(yīng)的消息轉(zhuǎn)發(fā)給對(duì)應(yīng)的消息隊(duì)列.那么RabitMQ的工作流程如下所示:

spring boot集成rabbitmq的示例分析

緊接著說一下交換機(jī).交換機(jī)的主要作用是接收相應(yīng)的消息并且綁定到指定的隊(duì)列.交換機(jī)有四種類型,分別為Direct,topic,headers,Fanout.

Direct是RabbitMQ默認(rèn)的交換機(jī)模式,也是最簡(jiǎn)單的模式.即創(chuàng)建消息隊(duì)列的時(shí)候,指定一個(gè)BindingKey.當(dāng)發(fā)送者發(fā)送消息的時(shí)候,指定對(duì)應(yīng)的Key.當(dāng)Key和消息隊(duì)列的BindingKey一致的時(shí)候,消息將會(huì)被發(fā)送到該消息隊(duì)列中.

topic轉(zhuǎn)發(fā)信息主要是依據(jù)通配符,隊(duì)列和交換機(jī)的綁定主要是依據(jù)一種模式(通配符+字符串),而當(dāng)發(fā)送消息的時(shí)候,只有指定的Key和該模式相匹配的時(shí)候,消息才會(huì)被發(fā)送到該消息隊(duì)列中.

headers也是根據(jù)一個(gè)規(guī)則進(jìn)行匹配,在消息隊(duì)列和交換機(jī)綁定的時(shí)候會(huì)指定一組鍵值對(duì)規(guī)則,而發(fā)送消息的時(shí)候也會(huì)指定一組鍵值對(duì)規(guī)則,當(dāng)兩組鍵值對(duì)規(guī)則相匹配的時(shí)候,消息會(huì)被發(fā)送到匹配的消息隊(duì)列中.

Fanout是路由廣播的形式,將會(huì)把消息發(fā)給綁定它的全部隊(duì)列,即便設(shè)置了key,也會(huì)被忽略. 

概念:

  • 生產(chǎn)者 消息的產(chǎn)生方,負(fù)責(zé)將消息推送到消息隊(duì)列

  • 消費(fèi)者 消息的最終接受方,負(fù)責(zé)監(jiān)聽隊(duì)列中的對(duì)應(yīng)消息,消費(fèi)消息

  • 隊(duì)列 消息的寄存器,負(fù)責(zé)存放生產(chǎn)者發(fā)送的消息

  • 交換機(jī) 負(fù)責(zé)根據(jù)一定規(guī)則分發(fā)生產(chǎn)者產(chǎn)生的消息

  • 綁定 完成交換機(jī)和隊(duì)列之間的綁定

模式:

1、direct

直連模式,用于實(shí)例間的任務(wù)分發(fā)

2、topic

話題模式,通過可配置的規(guī)則分發(fā)給綁定在該exchange上的隊(duì)列

3、headers

適用規(guī)則復(fù)雜的分發(fā),用headers里的參數(shù)表達(dá)規(guī)則

4、fanout

分發(fā)給所有綁定到該exchange上的隊(duì)列,忽略routing key

安裝

單機(jī)版安裝很簡(jiǎn)單,大概步驟如下:

# 安裝erlang包
 yum install erlang
# 安裝socat
 yum install socat
# 安裝rabbit 
 rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm 
# 啟動(dòng)服務(wù)
 rabbitmq-server start
# 增加管理控制功能
 rabbitmq-plugins enable rabbitmq_management
# 增加用戶:
 sudo rabbitmqctl add_user root password
 rabbitmqctl set_user_tags root administrator 
 rabbitmqctl set_permissions -p / root '.*' '.*' '.*'

集群安裝,可參考這篇文章:

     rabbitmq集群安裝

以上就是rabbitmq的介紹,下面開始本文的正文:spring boot 集成rabbitmq ,本人在學(xué)習(xí)rabbitmq時(shí)發(fā)現(xiàn)網(wǎng)上很少有系統(tǒng)性介紹springboot和rabbitmq如何集成的,其他人總結(jié)的都片段化,所以結(jié)合個(gè)人調(diào)研過程,整理此篇文章。

二、springboot配置

廢話少說直接上代碼:

配置參數(shù)

application.yml:

spring:
 rabbitmq:
 addresses: 192.168.1.1:5672
 username: username
 password: password
 publisher-confirms: true
 virtual-host: /

java config讀取參數(shù)

/**
 * RabbitMq配置文件讀取類
 *
 * @author chenhf
 * @create 2017-10-23 上午9:31
 **/
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {

 @Value("${spring.rabbitmq.addresses}")
 private String addresses;
 @Value("${spring.rabbitmq.username}")
 private String username;
 @Value("${spring.rabbitmq.password}")
 private String password;
 @Value("${spring.rabbitmq.publisher-confirms}")
 private Boolean publisherConfirms;
 @Value("${spring.rabbitmq.virtual-host}")
 private String virtualHost;

 // 構(gòu)建mq實(shí)例工廠
 @Bean
 public ConnectionFactory connectionFactory(){
 CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
 connectionFactory.setAddresses(addresses);
 connectionFactory.setUsername(username);
 connectionFactory.setPassword(password);
 connectionFactory.setPublisherConfirms(publisherConfirms);
 connectionFactory.setVirtualHost(virtualHost);
 return connectionFactory;
 }

 @Bean
 public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
 return new RabbitAdmin(connectionFactory);
 }

 @Bean
 @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
 public RabbitTemplate rabbitTemplate(){
 RabbitTemplate template = new RabbitTemplate(connectionFactory());
 return template;
 }
}

三、rabbitmq生產(chǎn)者配置

主要配置了直連和話題模式,其中話題模式設(shè)置兩個(gè)隊(duì)列(queueTopicTest1、queueTopicTest2),此兩個(gè)隊(duì)列在和交換機(jī)綁定時(shí)分別設(shè)置不同的routingkey(.TEST.以及l(fā)azy.#)來驗(yàn)證匹配模式。

/**
 * 用于配置交換機(jī)和隊(duì)列對(duì)應(yīng)關(guān)系
 * 新增消息隊(duì)列應(yīng)該按照如下步驟
 * 1、增加queue bean,參見queueXXXX方法
 * 2、增加queue和exchange的binding
 * @author chenhf
 * @create 2017-10-23 上午10:33
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class RabbitMqExchangeConfig {
 /** logger */
 private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);

 /**
 * @Author:chenhf
 * @Description: 主題型交換機(jī)
 * @Date:下午5:49 2017/10/23
 * @param
 * @return
 */
 @Bean
 TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){
 TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());
 rabbitAdmin.declareExchange(contractTopicExchange);
 logger.debug("完成主題型交換機(jī)bean實(shí)例化");
 return contractTopicExchange;
 }
 /**
 * 直連型交換機(jī)
 */
 @Bean
 DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {
 DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());
 rabbitAdmin.declareExchange(contractDirectExchange);
 logger.debug("完成直連型交換機(jī)bean實(shí)例化");
 return contractDirectExchange;
 }

 //在此可以定義隊(duì)列

 @Bean
 Queue queueTest(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("測(cè)試隊(duì)列實(shí)例化完成");
 return queue;
 }

 //topic 1
 @Bean
 Queue queueTopicTest1(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("話題測(cè)試隊(duì)列1實(shí)例化完成");
 return queue;
 }
 //topic 2
 @Bean
 Queue queueTopicTest2(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("話題測(cè)試隊(duì)列2實(shí)例化完成");
 return queue;
 }


 //在此處完成隊(duì)列和交換機(jī)綁定
 @Bean
 Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("測(cè)試隊(duì)列與直連型交換機(jī)綁定完成");
 return binding;
 }
 //topic binding1
 @Bean
 Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("測(cè)試隊(duì)列與話題交換機(jī)1綁定完成");
 return binding;
 }

 //topic binding2
 @Bean
 Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("測(cè)試隊(duì)列與話題交換機(jī)2綁定完成");
 return binding;
 }

}

在這里用到枚舉類:RabbitMqEnum

/**
 * 定義rabbitMq需要的常量
 *
 * @author chenhf
 * @create 2017-10-23 下午4:07
 **/
public class RabbitMqEnum {

 /**
 * @param
 * @Author:chenhf
 * @Description:定義數(shù)據(jù)交換方式
 * @Date:下午4:08 2017/10/23
 * @return
 */
 public enum Exchange {
 CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分發(fā)"),
 CONTRACT_TOPIC("CONTRACT_TOPIC", "消息訂閱"),
 CONTRACT_DIRECT("CONTRACT_DIRECT", "點(diǎn)對(duì)點(diǎn)");

 private String code;
 private String name;

 Exchange(String code, String name) {
 this.code = code;
 this.name = name;
 }

 public String getCode() {
 return code;
 }

 public String getName() {
 return name;
 }
 }

 /**
 * describe: 定義隊(duì)列名稱
 * creat_user: chenhf
 * creat_date: 2017/10/31
 **/
 public enum QueueName {
 TESTQUEUE("TESTQUEUE", "測(cè)試隊(duì)列"),
 TOPICTEST1("TOPICTEST1", "topic測(cè)試隊(duì)列"),
 TOPICTEST2("TOPICTEST2", "topic測(cè)試隊(duì)列");

 private String code;
 private String name;

 QueueName(String code, String name) {
 this.code = code;
 this.name = name;
 }

 public String getCode() {
 return code;
 }

 public String getName() {
 return name;
 }

 }

 /**
 * describe: 定義routing_key
 * creat_user: chenhf
 * creat_date: 2017/10/31
 **/
 public enum QueueEnum {
 TESTQUEUE("TESTQUEUE1", "測(cè)試隊(duì)列key"),
 TESTTOPICQUEUE1("*.TEST.*", "topic測(cè)試隊(duì)列key"),
 TESTTOPICQUEUE2("lazy.#", "topic測(cè)試隊(duì)列key");


 private String code;
 private String name;

 QueueEnum(String code, String name) {
 this.code = code;
 this.name = name;
 }

 public String getCode() {
 return code;
 }

 public String getName() {
 return name;
 }
 }

}

以上完成消息生產(chǎn)者的定義,下面封裝調(diào)用接口

測(cè)試時(shí)直接調(diào)用此工具類,testUser類需自己實(shí)現(xiàn)

rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser);
/**
 * rabbitmq發(fā)送消息工具類
 *
 * @author chenhf
 * @create 2017-10-26 上午11:10
 **/

@Component
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{
 /** logger */
 private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class);

 private RabbitTemplate rabbitTemplate;

 @Autowired
 public RabbitMqSender(RabbitTemplate rabbitTemplate) {
 this.rabbitTemplate = rabbitTemplate;
 this.rabbitTemplate.setConfirmCallback(this);
 }

 @Override
 public void confirm(CorrelationData correlationData, boolean b, String s) {
 logger.info("confirm: " + correlationData.getId());
 }

 /**
 * 發(fā)送到 指定routekey的指定queue
 * @param routeKey
 * @param obj
 */
 public void sendRabbitmqDirect(String routeKey,Object obj) {
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 logger.info("send: " + correlationData.getId());
 this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);
 }

 /**
 * 所有發(fā)送到Topic Exchange的消息被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定Topic的Queue上
 * @param routeKey
 * @param obj
 */
 public void sendRabbitmqTopic(String routeKey,Object obj) {
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 logger.info("send: " + correlationData.getId());
 this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);
 }
}

四、rabbitmq消費(fèi)者配置

springboot注解方式監(jiān)聽隊(duì)列,無法手動(dòng)指定回調(diào),所以采用了實(shí)現(xiàn)ChannelAwareMessageListener接口,重寫onMessage來進(jìn)行手動(dòng)回調(diào),詳見以下代碼,詳細(xì)介紹可以在spring的官網(wǎng)上找amqp相關(guān)章節(jié)閱讀

直連消費(fèi)者

通過設(shè)置TestUser的name來測(cè)試回調(diào),分別發(fā)兩條消息,一條UserName為1,一條為2,查看控制臺(tái)中隊(duì)列中消息是否被消費(fèi)

/**
 * 消費(fèi)者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class ExampleAmqpConfiguration {
 @Bean("testQueueContainer")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TESTQUEUE");
 container.setMessageListener(exampleListener());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }


 @Bean("testQueueListener")
 public ChannelAwareMessageListener exampleListener() {
 return new ChannelAwareMessageListener() {
 @Override
 public void onMessage(Message message, Channel channel) throws Exception {
 TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
 //通過設(shè)置TestUser的name來測(cè)試回調(diào),分別發(fā)兩條消息,一條UserName為1,一條為2,查看控制臺(tái)中隊(duì)列中消息是否被消費(fèi)
 if ("2".equals(testUser.getUserName())){
  System.out.println(testUser.toString());
  channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 }

 if ("1".equals(testUser.getUserName())){
  System.out.println(testUser.toString());
  channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
 }

 }
 };
 }

}

topic消費(fèi)者1

/**
 * 消費(fèi)者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration {
 @Bean("topicTest1Container")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TOPICTEST1");
 container.setMessageListener(exampleListener1());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }


 @Bean("topicTest1Listener")
 public ChannelAwareMessageListener exampleListener1(){
 return new ChannelAwareMessageListener() {
 @Override
 public void onMessage(Message message, Channel channel) throws Exception {
 TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
 System.out.println("TOPICTEST1:"+testUser.toString());
 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

 }
 };
 }




}

topic消費(fèi)者2

/**
 * 消費(fèi)者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration2 {
 @Bean("topicTest2Container")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TOPICTEST2");
 container.setMessageListener(exampleListener());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }


 @Bean("topicTest2Listener")
 public ChannelAwareMessageListener exampleListener() {
 return new ChannelAwareMessageListener() {
 @Override
 public void

以上是“spring boot集成rabbitmq的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!

本文標(biāo)題:springboot集成rabbitmq的示例分析
文章路徑:http://aaarwkj.com/article22/gipjcc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營銷、品牌網(wǎng)站設(shè)計(jì)、動(dòng)態(tài)網(wǎng)站、域名注冊(cè)、網(wǎng)站內(nèi)鏈、App設(shè)計(jì)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站建設(shè)
激情综合婷婷中文字幕| 伊人激情一区二区三区| 婷婷久久五月综合激情| 日本午夜在线观看视频| 在线精品91国产在线观看| 欧美一区二区日韩一区二区| 中文字幕在线日韩av| 99精品一二三日韩| 国产精品自产拍av在线| 久久偷拍女生厕所尿尿| 日本高清不卡在线一区二区| 国产又爽又乱的视频在线| 日产精品一级二级三级爱| av免费观看日韩永久| 日本不卡不码高清免费| 国产精品一区二区一牛影视| 亚洲无人区码一码二码三码| 国产精品午夜福利91| 好狼色欧美激情国产区| 久久精品资源综合网| 日韩精品 视频二区| 在线观看国产自拍精品| 免费啪啪视频一区二区| 在线亚洲av不卡一区二区三区| 九九热九九热九九热九| 国产精品一区巨乳人妻| 熟女人妻丰满视频中文字幕| 麻豆国产传媒片在线看| 国产av麻豆全部免费| 国内极品尤物视频在线| 国产亚洲一区二区视频| 小黄片免费在线播放观看| 欧美亚洲午夜一二综合| 天天操夜夜操白天操晚上操| 桃色av一区二区三区| 日本一级黄色影视大全| 日韩少妇黄色在线观看| 青青草国产成人自拍视频在线观看 | 日本午夜一区二区在线观看| 久久亚洲欧洲日本韩国欧美| 国产精品美女丝袜久久久|