本文源碼: GitHub·點(diǎn)這里 || GitEE·點(diǎn)這里
大峪網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)!從網(wǎng)頁設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)公司等網(wǎng)站項(xiàng)目制作,到程序開發(fā),運(yùn)營維護(hù)。創(chuàng)新互聯(lián)自2013年起到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)。
(1)、Broker
RocketMQ 的核心,接收 Producer 發(fā)過來的消息、處理 Consumer 的消費(fèi)消息請求、消息的持 久化存儲(chǔ)、服務(wù)端過濾功能等 。
(2)、NameServer
消息隊(duì)列中的狀態(tài)服務(wù)器,集群的各個(gè)組件通過它來了解全局的信息 。類似微服務(wù)中注冊中心的服務(wù)注冊,發(fā)現(xiàn),下線,上線的概念。
熱備份:
NamServer可以部署多個(gè),相互之間獨(dú)立,其他角色同時(shí)向多個(gè)NameServer 機(jī)器上報(bào)狀態(tài)信息。
心跳機(jī)制:
NameServer 中的 Broker、 Topic等狀態(tài)信息不會(huì)持久存儲(chǔ),都是由各個(gè)角色定時(shí)上報(bào)并存儲(chǔ)到內(nèi)存中,超時(shí)不上報(bào)的話, NameServer會(huì)認(rèn)為某個(gè)機(jī)器出故障不可用。
(3)、Producer
消息的生成者,最常用的producer類就是DefaultMQProducer。
(4)、Consumer
消息的消費(fèi)者,常用Consumer類
DefaultMQPushConsumer
收到消息后自動(dòng)調(diào)用傳入的處理方法來處理,實(shí)時(shí)性高
DefaultMQPullConsumer
用戶自主控制 ,靈活性更高。
(1)、Broker啟動(dòng)后需要完成一次將自己注冊至NameServer的操作;隨后每隔30s時(shí)間定時(shí)向NameServer更新Topic路由信息。
(2)、Producer發(fā)送消息時(shí)候,需要根據(jù)消息的Topic從本地緩存的獲取路由信息。如果沒有則更新路由信息會(huì)從NameServer重新拉取,同時(shí)Producer會(huì)默認(rèn)每隔30s向NameServer拉取一次路由信息。
(3)、Consumer消費(fèi)消息時(shí)候,從NameServer獲取的路由信息,并再完成客戶端的負(fù)載均衡后,監(jiān)聽指定消息隊(duì)列獲取消息并進(jìn)行消費(fèi)。
版本描述
<spring-boot.version>2.1.3.RELEASE</spring-boot.version>
<rocketmq.version>4.3.0</rocketmq.version>
rocketmq:
# 生產(chǎn)者配置
producer:
isOnOff: on
# 發(fā)送同一類消息的設(shè)置為同一個(gè)group,保證唯一
groupName: CicadaGroup
# 服務(wù)地址
namesrvAddr: 127.0.0.1:9876
# 消息最大長度 默認(rèn)1024*4(4M)
maxMessageSize: 4096
# 發(fā)送消息超時(shí)時(shí)間,默認(rèn)3000
sendMsgTimeout: 3000
# 發(fā)送消息失敗重試次數(shù),默認(rèn)2
retryTimesWhenSendFailed: 2
# 消費(fèi)者配置
consumer:
isOnOff: on
# 官方建議:確保同一組中的每個(gè)消費(fèi)者訂閱相同的主題。
groupName: CicadaGroup
# 服務(wù)地址
namesrvAddr: 127.0.0.1:9876
# 接收該 Topic 下所有 Tag
topics: CicadaTopic~*;
consumeThreadMin: 20
consumeThreadMax: 64
# 設(shè)置一次消費(fèi)消息的條數(shù),默認(rèn)為1條
consumeMessageBatchMaxSize: 1
# 配置 Group Topic Tag
rocket:
group: rocketGroup
topic: rocketTopic
tag: rocketTag
/**
* RocketMQ 生產(chǎn)者配置
*/
@Configuration
public class ProducerConfig {
private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ;
@Value("${rocketmq.producer.groupName}")
private String groupName;
@Value("${rocketmq.producer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.producer.maxMessageSize}")
private Integer maxMessageSize ;
@Value("${rocketmq.producer.sendMsgTimeout}")
private Integer sendMsgTimeout;
@Value("${rocketmq.producer.retryTimesWhenSendFailed}")
private Integer retryTimesWhenSendFailed;
@Bean
public DefaultMQProducer getRocketMQProducer() {
DefaultMQProducer producer;
producer = new DefaultMQProducer(this.groupName);
producer.setNamesrvAddr(this.namesrvAddr);
//如果需要同一個(gè)jvm中不同的producer往不同的mq集群,發(fā)送消息,需要設(shè)置不同的instanceName
if(this.maxMessageSize!=null){
producer.setMaxMessageSize(this.maxMessageSize);
}
if(this.sendMsgTimeout!=null){
producer.setSendMsgTimeout(this.sendMsgTimeout);
}
//如果發(fā)送消息失敗,設(shè)置重試次數(shù),默認(rèn)為2次
if(this.retryTimesWhenSendFailed!=null){
producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
}
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
return producer;
}
}
/**
* RocketMQ 消費(fèi)者配置
*/
@Configuration
public class ConsumerConfig {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerConfig.class) ;
@Value("${rocketmq.consumer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.consumer.groupName}")
private String groupName;
@Value("${rocketmq.consumer.consumeThreadMin}")
private int consumeThreadMin;
@Value("${rocketmq.consumer.consumeThreadMax}")
private int consumeThreadMax;
@Value("${rocketmq.consumer.topics}")
private String topics;
@Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
private int consumeMessageBatchMaxSize;
@Resource
private RocketMsgListener msgListener;
@Bean
public DefaultMQPushConsumer getRocketMQConsumer(){
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.registerMessageListener(msgListener);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
try {
String[] topicTagsArr = topics.split(";");
for (String topicTags : topicTagsArr) {
String[] topicTag = topicTags.split("~");
consumer.subscribe(topicTag[0],topicTag[1]);
}
consumer.start();
}catch (MQClientException e){
e.printStackTrace();
}
return consumer;
}
}
/**
* 消息消費(fèi)監(jiān)聽
*/
@Component
public class RocketMsgListener implements MessageListenerConcurrently {
private static final Logger LOG = LoggerFactory.getLogger(RocketMsgListener.class) ;
@Resource
private ParamConfigService paramConfigService ;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(list)){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt messageExt = list.get(0);
LOG.info("接受到的消息為:"+new String(messageExt.getBody()));
int reConsume = messageExt.getReconsumeTimes();
// 消息已經(jīng)重試了3次,如果不需要再次消費(fèi),則返回成功
if(reConsume ==3){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
if(messageExt.getTopic().equals(paramConfigService.rocketTopic)){
String tags = messageExt.getTags() ;
switch (tags){
case "rocketTag":
LOG.info("OpenAccount tag == >>"+tags);
break ;
default:
LOG.info("未匹配到Tag == >>"+tags);
break;
}
}
// 消息消費(fèi)成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
@Service
public class ParamConfigService {
@Value("${rocket.group}")
public String rocketGroup ;
@Value("${rocket.topic}")
public String rocketTopic ;
@Value("${rocket.tag}")
public String rocketTag ;
}
@Service
public class RocketMqServiceImpl implements RocketMqService {
@Resource
private DefaultMQProducer defaultMQProducer;
@Resource
private ParamConfigService paramConfigService ;
@Override
public SendResult openAccountMsg(String msgInfo) {
// 可以不使用Config中的Group
defaultMQProducer.setProducerGroup(paramConfigService.rocketGroup);
SendResult sendResult = null;
try {
Message sendMsg = new Message(paramConfigService.rocketTopic,
paramConfigService.rocketTag,
"open_account_key", msgInfo.getBytes());
sendResult = defaultMQProducer.send(sendMsg);
} catch (Exception e) {
e.printStackTrace();
}
return sendResult ;
}
}
GitHub·地址
https://github.com/cicadasmile/middle-ware-parent
GitEE·地址
https://gitee.com/cicadasmile/middle-ware-parent
本文題目:SpringBoot2高級應(yīng)用(01):整合RocketMQ,實(shí)現(xiàn)請求異步處理
標(biāo)題來源:http://aaarwkj.com/article14/gdssge.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供全網(wǎng)營銷推廣、網(wǎng)站設(shè)計(jì)、網(wǎng)站策劃、搜索引擎優(yōu)化、網(wǎng)站改版、做網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)