page contents

RabbitMQ 入门教程(PHP) 实现延迟功能

以下内容希望帮助到大家!

attachments-2020-04-TPiSetWM5e96c268949c2.jpg

php 使用rabbitmq-delayed-message-exchange插件实现延迟功能


1.安装

  • 3.6.x下载地址
  • 3.7.x下载地址

下载后解压,并将其拷贝至(使用Linux Debian/RPM部署)rabbitmq服务器目录:/usr/local/rabbitmq/plugins中( windows安装目录\rabbitmq_server-version\plugins )。


2.启用插件

使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchang

输出如下:

The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

通过rabbitmq-plugins list查看已安装列表,如下:

[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x

3.机制解

安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。


4.php实现过程

消费者 delay_consumer2.php:

<?php

//header('Content-Type:text/html;charset=utf8;');

$params = array(
    'exchangeName' => 'delayed_exchange_test',
    'queueName' => 'delayed_queue_test',
    'routeKey' => 'delayed_route_test',
);
$connectConfig = array(
    'host' => 'localhost',
    'port' => 5672,
    'login' => 'guest',
    'password' => 'guest',
    'vhost' => '/'
);

//var_dump(extension_loaded('amqp'));

//exit();

try {
    $conn = new AMQPConnection($connectConfig);
    $conn->connect();
    if (!$conn->isConnected()) {
        //die('Conexiune esuata');
        //TODO 记录日志
        echo 'rabbit-mq 连接错误:', json_encode($connectConfig);
        exit();
    }
    $channel = new AMQPChannel($conn);
    if (!$channel->isConnected()) {
        // die('Connection through channel failed');
        //TODO 记录日志
        echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
        exit();
    }
    $exchange = new AMQPExchange($channel);
    //$exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端
    $exchange->setName($params['exchangeName']);
    $exchange->setType('x-delayed-message'); //x-delayed-message类型
    /*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

      fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

      direct:把消息投递到那些binding key与routing key完全匹配的队列中。

      topic:将消息路由到binding key与routing key模式匹配的队列中。*/
    $exchange->setArgument('x-delayed-type','direct');
    $exchange->declareExchange();

    //$channel->startTransaction();

    $queue = new AMQPQueue($channel);
    $queue->setName($params['queueName']);
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();

    //绑定
    $queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {
    echo $e->getMessage();
    exit();
}

function callback(AMQPEnvelope $message) {
    global $queue;
    if ($message) {
        $body = $message->getBody();
        echo '接收时间:'.date("Y-m-d H:i:s", time()). PHP_EOL;
        echo '接收内容:'.$body . PHP_EOL;
        //为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息
        $queue->ack($message->getDeliveryTag());
    } else {
        echo 'no message' . PHP_EOL;
    }
}

//$queue->consume('callback');  第一种消费方式,但是会阻塞,程序一直会卡在此处

//第二种消费方式,非阻塞
/*$start = time();
while(true)
{
    $message = $queue->get();
    if(!empty($message))
    {
        echo $message->getBody();
        $queue->ack($message->getDeliveryTag());    //应答,代表该消息已经消费
        $end = time();
        echo '<br>' . ($end - $start);
        exit();
    }
    else
    {
        //echo 'message not found' . PHP_EOL;
    }
}*/

//注意:这里需要注意的是这个方法:$queue->consume,queue对象有两个方法可用于取消息:consume和get。前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,取消息时有则取,无则返回false。
//就是说用了consume之后,会同步阻塞,该程序常驻内存,不能用nginx,apache调用。 
$action = '2';

if($action == '1'){
    $queue->consume('callback');  //第一种消费方式,但是会阻塞,程序一直会卡在此处
}else{
    //第二种消费方式,非阻塞
    $start = time();
    while(true)
    {
        $message = $queue->get();
        if(!empty($message))
        {
            echo '接收时间:'.date("Y-m-d H:i:s", time()). PHP_EOL;
            echo '接收内容:'.$message->getBody().PHP_EOL;
            $queue->ack($message->getDeliveryTag());    //应答,代表该消息已经消费
            $end = time();
            echo '运行时间:'.($end - $start).'秒'.PHP_EOL;
            //exit();
        }
        else
        {
            //echo 'message not found' . PHP_EOL;
        }
    }
}

生产者delay_publisher2.php:

<?php

//header('Content-Type:text/html;charset=utf-8;');

$params = array(
    'exchangeName' => 'delayed_exchange_test',
    'queueName' => 'delayed_queue_test',
    'routeKey' => 'delayed_route_test',
);

$connectConfig = array(
    'host' => 'localhost',
    'port' => 5672,
    'login' => 'guest',
    'password' => 'guest',
    'vhost' => '/'
);

//var_dump(extension_loaded('amqp')); 判断是否加载amqp扩展
//exit();
try {
    $conn = new AMQPConnection($connectConfig);
    $conn->connect();
    if (!$conn->isConnected()) {
        //die('Conexiune esuata');
        //TODO 记录日志
        echo 'rabbit-mq 连接错误:', json_encode($connectConfig);
        exit();
    }
    $channel = new AMQPChannel($conn);
    if (!$channel->isConnected()) {
        // die('Connection through channel failed');
        //TODO 记录日志
        echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
        exit();
    }
    $exchange = new AMQPExchange($channel);
    $exchange->setName($params['exchangeName']);
    $exchange->setType('x-delayed-message'); //x-delayed-message类型
    /*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

      fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

      direct:把消息投递到那些binding key与routing key完全匹配的队列中。

      topic:将消息路由到binding key与routing key模式匹配的队列中。*/
    $exchange->setArgument('x-delayed-type','direct');
    $exchange->declareExchange();

    //$channel->startTransaction();
    //RabbitMQ不容许声明2个相同名称、配置不同的Queue,否则报错
    $queue = new AMQPQueue($channel);
    $queue->setName($params['queueName']);
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();

    //绑定队列和交换机
    $queue->bind($params['exchangeName'], $params['routeKey']);
    //$channel->commitTransaction();
} catch(Exception $e) {

}

for($i=5;$i>0;$i--){
    //生成消息
    echo '发送时间:'.date("Y-m-d H:i:s", time()).PHP_EOL;
    echo 'i='.$i.',延迟'.$i.'秒'.PHP_EOL;
    $message = json_encode(['order_id'=>time(),'i'=>$i]);
    $exchange->publish($message, $params['routeKey'], AMQP_NOPARAM, ['headers'=>['x-delay'=> 1000*$i]]);
    sleep(2);
}
$conn->disconnect();

对于代码来讲,首先对于消费者核心代码

$exchange->setType('x-delayed-message'); //x-delayed-message类型
$exchange->setArgument('x-delayed-type','direct');

生产者核心代码

$exchange = new AMQPExchange($channel);
$exchange->setName($params['exchangeName']);
$exchange->setType('x-delayed-message'); //x-delayed-message类型
$exchange->setArgument('x-delayed-type','direct');
$exchange->declareExchange();

使用方法:先运行delay_consumer1.php,再运行delay_publisher1.php


运行效果:

v2-d2c719e6435bff2c16c51692536a29de_720w.jpg



attachments-2020-04-fh3swgBy5e96c239403aa.jpg

  • 发表于 2020-04-15 16:14
  • 阅读 ( 500 )
  • 分类:中间件

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
Pack
Pack

1135 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1316 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章