page contents

PHP处理kafka消息队列

安装PHP—kafka扩展后,就可以开始编写 php 消费消息的脚本了,php-rdkafka 扩展提供了几种消息处理的方式

attachments-2020-05-iqJ4Ybsd5ec38edf4d339.jpg

安装PHP—kafka扩展后,就可以开始编写 php 消费消息的脚本了,php-rdkafka 扩展提供了几种消息处理的方式


低级方式(Low level)


这种方式没有消费组的概念

<?php

$rk = new RdKafka\Consumer();
$rk->setLogLevel(LOG_DEBUG);
// 指定 broker 地址,多个地址用"," 分割
$rk->addBrokers("192.168.33.1:9092");


$topic = $rk->newTopic("test");
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);


while (true) {
    // 第一个参数是分区号
    // 第二个参数是超时时间
    $msg = $topic->consume(0, 1000);
    if ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
}


高级方式 (High level)


这种方式可以指定消费组,一个消费组内,一个consumer 进程只能读取一个分区,

<?php

$conf = new RdKafka\Conf();

// Set a rebalance callback to log partition assignments (optional)
// 当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            echo "Revoke: ";
            var_dump($partitions);
            $kafka->assign(NULL);
            break;

        default:
            throw new \Exception($err);
    }
});

// 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。
$conf->set('group.id', 'myConsumerGroup1');

//添加 kafka集群服务器地址
$conf->set('metadata.broker.list', '192.168.33.1:9092');

$topicConf = new RdKafka\TopicConf();


// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
//当没有初始偏移量时,从哪里开始读取
$topicConf->set('auto.offset.reset', 'smallest');


// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

// 让消费者订阅log 主题
$consumer->subscribe(['log']);


while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

?>



attachments-2020-05-gayVkgCD5ec38ecca3ca6.jpg

  • 发表于 2020-05-19 15:46
  • 阅读 ( 555 )
  • 分类:中间件

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
Pack
Pack

1135 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1658 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章