page contents

PHP实现消息队列MQ

以下内容希望帮助到大家!

attachments-2020-05-cfkiyhpa5eb25eafa9663.jpg

一、MQ的应用场景


优点:

a)主要解决异步消息
b)应用解耦
c)流量消峰等问题
d)日志处理(kafka)


缺点:

a)系统可用性降低:你想啊,本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性降低

b)系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。


1、为什么会造成重复消费?

因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。


2、解决重复消费的方案:

(1)比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

(2)再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。

(3)如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。


二、消息模型


a)P2P(Point to Point)点对点模式(也就是一个任务只能被一个消费者消费)


1、包含三个角色:消息队列(Queue),发送者(Sender),接受者(Receiver)

PHP实现:

安装rabbitMQ扩展:

在你的项目中添加一个 composer.json文件:

{
     "require": {
      "php-amqplib/php-amqplib": "2.6.1"
     }
 }


2、简单模式(一对一)


v2-f25d513320998a24416ecc0d1dd0ad33_720w.jpg


<?php
# @File  : sample-send.php
# @Author: Liugp
# @Date  : 2019/7/22
# @Desc  : 生产者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立AMQP连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();
// 定义队列名称
$channel->queue_declare('hello', false, false, false, false);
// 定义要发送的信息
$msg = new AMQPMessage('Hello World!'.time());
// 发送消息
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();


<?php
# @File  : sample-reciver.php
# @Author: Liugp
# @Date  : 2019/7/22
# @Desc  : 消费者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建信息通道
$channel    = $connection->channel();

// 声明队列
$channel->queue_declare('hello', false, false, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";

//  接受生产者的消息回调函数
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
};

// 消费信息
$channel->basic_consume('hello', '', false, true, false, false, $callback);

// 正在消费时,则等待
while ($channel->is_consuming()) {
    $channel->wait();
}
$channel->close();
$connection->close();


3、Work模式(轮循队列,每个消费者消费的数量是一样的)(一对多)


4、Work模式(能者多劳)(一对多)


v2-df6faf803439a215a0e6fd111ad3ba64_720w.jpg


<?php
# @File  : work-send.php
# @Author: Liugp
# @Date  : 2019/7/22
# @Desc  : [work模式]生产者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 定义队列名称
// 队列声明为持久化(durable); 通过queue_declare的第三参数为true
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage(
    $data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, '', 'task_queue');
echo ' [x] Sent ', $data, "\n";
$channel->close();
$connection->close();


<?php
# @File  : work-reciver.php
# @Author: Liugp
# @Date  : 2019/7/22
# @Desc  : [work模式]消费者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 创建信息通道
$channel    = $connection->channel();

// 队列声明为持久化(durable); 通过queue_declare的第三参数为true
$channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

// 公平调度(即能者多劳)
$channel->basic_qos(null, 1, null);

// 第四个参数basic_consume为false (true 意味着不响应ack);消费者挂掉这后,所有没有响应的消息都会重新发送,减小消息丢失的概率,改为false后,则是手动确认,默认是自动确认
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}
$channel->close();
$connection->close();


5、每个消息只有一个消费者

6、发送者和接受者没有时间依赖

7、接受者确认消息接受和处理成功


attachments-2020-05-QcSBPqlD5eb2661adc8fc.jpg

b)Publish/Subscribe(Pub/Sub)发布订阅模式

1、包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber)
2、一个生产者,多个消息者;客户端只有订阅后才能收到消息;持久化和非持久化
3、每个消费者都有自己的队列
4、生产者没有直接把消息发送到队列,而是发送到交换机 转发器exchange
5、每个队列都要绑定到交换机上
6、生产者发送的消息经过交换机到达队列,就能实现一个消息被多个消费者消费


7、Exchange(交换机 转发器)

1、一方面是接受生产者的消息,另一方面是向队列推送消息
2、匿名转发
3、Fanout(订阅模式;不处理路由键,广播)


v2-6fffd2d3b84ecc17073c8176b956d06b_720w.jpg


<?php
# @File  : subscribe-send.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [发布/订阅模式]生产者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 创建信息通道
$channel    = $connection->channel();

// 定义交换机,第一个参数是交换机名称,第二参数是交换机类型
$channel->exchange_declare('logs', 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "info: Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
echo ' [x] Sent ', $data, "\n";
$channel->close();
$connection->close();


<?php
# @File  : subscribe-reciver.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [发布/订阅模式]消费者
#

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 创建信息通道
$channel    = $connection->channel();

// 定义交换机,第一个参数是交换机名称,第二参数是交换机类型
$channel->exchange_declare('logs', 'fanout', false, false, false);

// 定义队列
// 在 php-amqplib 客户端,当我们提供队列名称为空字符串时,我们创建了一个具有生成名称的非持久队列:
// list($queue_name, ,) = $channel->queue_declare("");
// 方法返回时,$queue_name变量包含一个随机生成的RabbitMQ队列名称。例如,类似amq.gen-jzty20brgko-hjmujj0wlg。
list($queue_name, ) = $channel->queue_declare("", false, false, true, false);

// 将交换机绑定到队列
$channel->queue_bind($queue_name, 'logs');

echo " [*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {
    echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
    $channel->wait();
}
$channel->close();
$connection->close();


attachments-2020-05-Pz2LKEU85eb2666315bae.jpg


4、Direct(路由模式;处理路由键,发布与订阅,完全匹配)


v2-e6ae1dfb32c4d107e38d42566a3c4e43_720w.jpg


多个绑定(Multiple bindings)


v2-cf95fd62ae2bda64b6bffc583625148b_720w.jpg


整合


v2-d7e543deb51e88d639f040994a909b38_720w.jpg


<?php
# @File  : routing-send.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [路由模式]生产者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 创建信息通道
$channel = $connection->channel();

// 声明交换机,第一参数为交换机名称,第二参数为交换机类型
$channel->exchange_declare('direct_logs', 'direct', false, false, false);

$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'direct_logs', $severity);

echo " [x] Sent ",$severity,':',$data," \n";

$channel->close();
$connection->close();

?>


然后继续看

<?php
# @File  : routing-reciver.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [路由模式]消费者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

// 定义队列,第一个参数为队列名称,为空则随机生成
list($queue_name, ) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if (empty($severities)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

foreach ($severities as $severity) {
    // 第二参数是交换机名称,第三个参数是路由键名称
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

  

5、TopicTopic模式,规则匹配)

①将路由键和某模式匹配
②"#"匹配零个或者多个
③“*”匹配任意一个


v2-b1598420a6d90cb511b07cb55ede40c3_720w.jpg


<?php
# @File  : topic-send.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [主题模式]生产者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 创建信息通道
$channel = $connection->channel();

// 声明交换机,第一参数为交换机名称,第二参数为交换机类型
$channel->exchange_declare('topic_logs', 'topic', false, false, false);


                    
  • 发表于 2020-05-06 14:52
  • 阅读 ( 532 )
  • 分类:中间件

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
Pack
Pack

1135 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1478 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章