本篇文章為大家展示了RabbitMQ中怎么實現(xiàn)延遲功能,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
岑鞏網(wǎng)站建設(shè)公司成都創(chuàng)新互聯(lián),岑鞏網(wǎng)站設(shè)計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為岑鞏上千余家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\成都外貿(mào)網(wǎng)站制作要多少錢,請找那個售后服務(wù)好的岑鞏做網(wǎng)站的公司定做!
使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang
啟用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchang
輸出如下:
The following plugins have been enabled: rabbitmq_delayed_message_exchange
通過rabbitmq-plugins list
查看已安裝列表,如下:
[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x
安裝插件后會生成新的Exchange類型x-delayed-message
,該類型消息支持延遲投遞機制,接收到消息后并未立即將消息投遞至目標(biāo)隊列中,而是存儲在mnesia
(一個分布式數(shù)據(jù)系統(tǒng))表中,檢測消息延遲時間,如達到可投遞時間時并將其通過x-delayed-type
類型標(biāo)記的交換機類型投遞至目標(biāo)隊列。
消費者 delay_consumer2.php:
<?php //header('Content-Type:text/html;charset=utf8;'); $params = array( 'exchangeName' => 'delayed_exchange_test', 'queueName' => 'delayed_queue_test', 'routeKey' => 'delayed_route_test', ); $connectConfig = array( 'host' => 'localhost', 'port' => 5672, 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ); //var_dump(extension_loaded('amqp')); //exit(); try { $conn = new AMQPConnection($connectConfig); $conn->connect(); if (!$conn->isConnected()) { //die('Conexiune esuata'); //TODO 記錄日志 echo 'rabbit-mq 連接錯誤:', json_encode($connectConfig); exit(); } $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { // die('Connection through channel failed'); //TODO 記錄日志 echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig); exit(); } $exchange = new AMQPExchange($channel); //$exchange->setFlags(AMQP_DURABLE);//聲明一個已存在的交換器的,如果不存在將拋出異常,這個一般用在consume端 $exchange->setName($params['exchangeName']); $exchange->setType('x-delayed-message'); //x-delayed-message類型 /*RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。 fanout:把所有發(fā)送到該Exchange的消息投遞到所有與它綁定的隊列中。 direct:把消息投遞到那些binding key與routing key完全匹配的隊列中。 topic:將消息路由到binding key與routing key模式匹配的隊列中。*/ $exchange->setArgument('x-delayed-type','direct'); $exchange->declareExchange(); //$channel->startTransaction(); $queue = new AMQPQueue($channel); $queue->setName($params['queueName']); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); //綁定 $queue->bind($params['exchangeName'], $params['routeKey']); } catch(Exception $e) { echo $e->getMessage(); exit(); } function callback(AMQPEnvelope $message) { global $queue; if ($message) { $body = $message->getBody(); echo '接收時間:'.date("Y-m-d H:i:s", time()). PHP_EOL; echo '接收內(nèi)容:'.$body . PHP_EOL; //為了防止接收端在處理消息時down掉,只有在消息處理完成后才發(fā)送ack消息 $queue->ack($message->getDeliveryTag()); } else { echo 'no message' . PHP_EOL; } } //$queue->consume('callback'); 第一種消費方式,但是會阻塞,程序一直會卡在此處 //第二種消費方式,非阻塞 /*$start = time(); while(true) { $message = $queue->get(); if(!empty($message)) { echo $message->getBody(); $queue->ack($message->getDeliveryTag()); //應(yīng)答,代表該消息已經(jīng)消費 $end = time(); echo '<br>' . ($end - $start); exit(); } else { //echo 'message not found' . PHP_EOL; } }*/ //注意:這里需要注意的是這個方法:$queue->consume,queue對象有兩個方法可用于取消息:consume和get。前者是阻塞的,無消息時會被掛起,適合循環(huán)中使用;后者則是非阻塞的,取消息時有則取,無則返回false。 //就是說用了consume之后,會同步阻塞,該程序常駐內(nèi)存,不能用nginx,apache調(diào)用。 $action = '2'; if($action == '1'){ $queue->consume('callback'); //第一種消費方式,但是會阻塞,程序一直會卡在此處 }else{ //第二種消費方式,非阻塞 $start = time(); while(true) { $message = $queue->get(); if(!empty($message)) { echo '接收時間:'.date("Y-m-d H:i:s", time()). PHP_EOL; echo '接收內(nèi)容:'.$message->getBody().PHP_EOL; $queue->ack($message->getDeliveryTag()); //應(yīng)答,代表該消息已經(jīng)消費 $end = time(); echo '運行時間:'.($end - $start).'秒'.PHP_EOL; //exit(); } else { //echo 'message not found' . PHP_EOL; } } }
生產(chǎn)者delay_publisher2.php:
<?php //header('Content-Type:text/html;charset=utf-8;'); $params = array( 'exchangeName' => 'delayed_exchange_test', 'queueName' => 'delayed_queue_test', 'routeKey' => 'delayed_route_test', ); $connectConfig = array( 'host' => 'localhost', 'port' => 5672, 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ); //var_dump(extension_loaded('amqp')); 判斷是否加載amqp擴展 //exit(); try { $conn = new AMQPConnection($connectConfig); $conn->connect(); if (!$conn->isConnected()) { //die('Conexiune esuata'); //TODO 記錄日志 echo 'rabbit-mq 連接錯誤:', json_encode($connectConfig); exit(); } $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { // die('Connection through channel failed'); //TODO 記錄日志 echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig); exit(); } $exchange = new AMQPExchange($channel); $exchange->setName($params['exchangeName']); $exchange->setType('x-delayed-message'); //x-delayed-message類型 /*RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。 fanout:把所有發(fā)送到該Exchange的消息投遞到所有與它綁定的隊列中。 direct:把消息投遞到那些binding key與routing key完全匹配的隊列中。 topic:將消息路由到binding key與routing key模式匹配的隊列中。*/ $exchange->setArgument('x-delayed-type','direct'); $exchange->declareExchange(); //$channel->startTransaction(); //RabbitMQ不容許聲明2個相同名稱、配置不同的Queue,否則報錯 $queue = new AMQPQueue($channel); $queue->setName($params['queueName']); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); //綁定隊列和交換機 $queue->bind($params['exchangeName'], $params['routeKey']); //$channel->commitTransaction(); } catch(Exception $e) { } for($i=5;$i>0;$i--){ //生成消息 echo '發(fā)送時間:'.date("Y-m-d H:i:s", time()).PHP_EOL; echo 'i='.$i.',延遲'.$i.'秒'.PHP_EOL; $message = json_encode(['order_id'=>time(),'i'=>$i]); $exchange->publish($message, $params['routeKey'], AMQP_NOPARAM, ['headers'=>['x-delay'=> 1000*$i]]); sleep(2); } $conn->disconnect();
對于代碼來講,首先對于消費者核心代碼
$exchange->setType('x-delayed-message'); //x-delayed-message類型 $exchange->setArgument('x-delayed-type','direct');
生產(chǎn)者核心代碼
$exchange = new AMQPExchange($channel); $exchange->setName($params['exchangeName']); $exchange->setType('x-delayed-message'); //x-delayed-message類型 $exchange->setArgument('x-delayed-type','direct'); $exchange->declareExchange();
使用方法:先運行delay_consumer1.php,再運行delay_publisher1.php
運行效果:
上述內(nèi)容就是RabbitMQ中怎么實現(xiàn)延遲功能,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
文章題目:RabbitMQ中怎么實現(xiàn)延遲功能
本文地址:http://aaarwkj.com/article26/gihocg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信公眾號、網(wǎng)站改版、網(wǎng)站內(nèi)鏈、小程序開發(fā)、企業(yè)建站、外貿(mào)建站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)