page contents

php 操作RabbitMQ

以下内容希望帮助到大家!

attachments-2020-05-1j5oc7aQ5ecc7a12daccd.jpg

基本流程图

v2-d4f8df31b62b38c2b8fa878608162fec_720w.jpg

如果exchange 没有绑定queue,则消息将会被丢弃
如果创建exchange,queue,并且已经绑定了,则可以直接使用
为了防止脚本出问题 可以配合supervisor


安装

  1. 从网站 https://packagist.org 搜索rabbitmq插件
  2. 使用composer安装插件composer require php-amqplib/php-amqplib


使用

  1. 连接RabbitMQ服务器
  2. 开始一个新的 channel
  3. 新建一个exchange
  4. 新建一个queue
  5. 绑定queue和exchange
  6. 发布一个消息
  7. 建立一个消费者并注册一个回调函数
  8. 监听数据


新建连接和channel

<?php
    require "./vendor/autoload.php";
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    
    $host   = "192.168.110.134";
    $port   = 5672;
    $user   = "test";
    $pass   = "test";
    
    $vhost  = "/";
    
    try{
        $connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost);
    }catch (Exception $e){
        echo 'Caught exception: ',  $e->getMessage(), "\n";die;
    }
    
    $channel = $connection->channel();


新建一个exchange

/*
    name: $exchange
    type: fanout
    passive: false // don't check is an exchange with the same name exists
    durable: false // the exchange won't survive server restarts
    auto_delete: true //the exchange will be deleted once the channel is closed.
*/
try{
    $name = 'example_direct_exchange';
    $type = "direct";
    $passive = false;
    $durable = true;
    $auto_delete = true;
    $channel->exchange_declare($name, $type, $passive, $durable, $auto_delete);

}catch (Exception $e){
    echo 'Caught exception: ',  $e->getMessage(), "\n";die;
}


参数 name

exchange名称


参数 type

exchange类型
fanout 是广播类型的消息 会给所有绑定的queue发送数据


参数 passive

true    

1.如果exchange已存在 则直接连接 并且不检查配置 比如已存在的exchange是fanout,新需要建立的是direct,也不会报错;

2.如果exchange不存在 则直接报错

false
1.如果exchange不存在 则创建新的exchange

2.如果exchange已存在 则判断配置是否相同。如果配置不相同 则直接报错。比如已存在的exchange是fanout,新需要建立的是direct,会报错。


参数 auto_delete

true 
当最后一个消费者取消订阅之后 exchange会被自动删除 一般用于临时exchange

新建一个queue

/*
    name: $queue    // should be unique in fanout exchange.
    passive: false  // don't check if a queue with the same name exists
    durable: false // the queue will not survive server restarts
    exclusive: false // the queue might be accessed by other channels
    auto_delete: true //the queue will be deleted once the channel is closed.
*/
$queue1 = 'example_direct_queue_1';

$channel->queue_declare($queue1, false, true, false, false);


将queue和exchange绑定起来

$queue1 = 'example_direct_queue_1';
    $exchange_name = 'example_direct_exchange';
    
    $channel->queue_bind($queue1, $exchange_name);


发布一个消息

$exchange_name = 'example_direct_exchange';
$messageBody = array(
    'example_direct_value'=>date('Y-m-d H:i:s'),
);
$message = new AMQPMessage(json_encode($messageBody));
$channel->basic_publish($message, $exchange_name);


立一个消费者并注册一个回调函数

/*
    queue: Queue from where to get the messages
    consumer_tag: Consumer identifier
    no_local: Don't receive messages published by this consumer.
    no_ack: Tells the server if the consumer will acknowledge the messages.
    exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
    nowait:
    callback: A PHP Callback
*/

$consumerTag = 'consumer';

$queue  = 'example_direct_queue_1';

$channel->basic_consume($queue, "", false, false, false, false,function($msg){


    $message = json_decode($msg->body, true);

    file_put_contents("./mq.log", $message,FILE_APPEND);

    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
});


参数no_ack

true
消息只有在返回一个ack之后,才会被删除

false
消息被取出之后 会被立即删除


监听数据

try {
    while (count($channel->callbacks)) {
        $channel->wait();
    }
}
catch(\PhpAmqpLib\Exception\AMQPTimeoutException $e){
    $channel->close();
    $channel->close();
}



attachments-2020-05-OohaBtuN5ecc7a2e3959a.jpg

  • 发表于 2020-05-26 10:09
  • 阅读 ( 578 )
  • 分类:中间件

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
Pack
Pack

1135 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1478 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章