page contents

Kafka生产者的客户端(PHP)开发

虽然 Kafka 是用 Java/Scala 语言编写的,但这不妨碍它对多语言的支持。可以在 Kafka 官网的查看 Kafka 支持的语言,其中包括 C/C++、Python、Go 等语言。

attachments-2020-04-QbQG2DsQ5e93cbf77e84e.jpg


一、准备工作


虽然 Kafka 是用 Java/Scala 语言编写的,但这不妨碍它对多语言的支持。

可以在 Kafka 官网的查看 Kafka 支持的语言,其中包括 C/C++、Python、Go 等语言。

PHP 操作 Kafka 需要安装 librdkafka 库和 kafka 的 PHP 扩展。


1.安装 librdkafka 库

git clone https://github.com/edenhill/librdkafka.git
./configure
 make
 sudo make install


2.安装 php-kafka 扩展

git clone https://github.com/arnaud-lb/php-rdkafka.git
$ cd librdkafka/
$ phpize
$ ./configure 
$ make
$ sudo make install

#在php.ini 文件中配置 rdkafka扩展
extension=rdkafka.so

#查看扩展是否生效
php -m | grep kafka


二、代码实现

demo 来源于 https://github.com/arnaud-lb/ph


正常的生产逻辑如下:


1.配置生产者客户端参数及创建相应的生产者实例;

/**
 * Create a producer
 */
$conf = new RdKafka\Conf();
$conf->set('log_level', LOG_DEBUG);
//$conf->set('debug', 'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("127.0.0.1");


2.构建主题;

/**
 * Create a topic instance from the producer
 */
$topic = $rk->newTopic("test");


3.发送消息;

/**
 * Producing messages
 * The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
 * 第一个参数是分区,RD_KAFKA_PARTITION_UA 表示未分配,并且由 librdkafka 选择分区。
 * The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue.
 * 第二个参数是消息标志,为 0 或 RD_KAFKA_MSG_F_BLOCK,当队列满了时阻止生产消息。
 * The message payload can be anything.
 * 消息可以是任何内容。
 */
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");


4.关闭生产者实例。

/**
 * Proper shutdown
 * This should be done prior to destroying a producer instance
 *   to make sure all queued and in-flight produce requests are completed before terminating.
 * 关闭生产者实例前需确保所有在队列中和正在生产的生产请求都已完成。
 * Not calling flush can lead to message loss!
 * 不调用flush会导致消息丢失!
 */
$timeout_ms = 60000; // 1 minute
$rk->flush($timeout_ms);


检验消息是否发送成功


终端开启一个消费者:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test


在另一个窗口执行 php producer.php

v2-fc2b9d2bb0752ecf584fac52b189a269_720w.png


可看到消费者终端接收到消息。

v2-1925682660b5fb1c64b4f978e86c99ac_720w.png


完整代码如下:

<?php
/**
 * Created by PhpStorm.
 * User: liulu
 * Date: 2020/1/1
 * Time: 18:38
 */

/**
 * Create a producer
 */
$conf = new RdKafka\Conf();
$conf->set('log_level', LOG_DEBUG);
//$conf->set('debug', 'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("127.0.0.1");

/**
 * Create a topic instance from the producer
 */
$topic = $rk->newTopic("test");

/**
 * Producing messages
 * The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
 * 第一个参数是分区,RD_KAFKA_PARTITION_UA 表示未分配,并且由 librdkafka 选择分区。
 * The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue.
 * 第二个参数是消息标志,为 0 或 RD_KAFKA_MSG_F_BLOCK,当队列满了时阻止生产消息。
 * The message payload can be anything.
 * 消息可以是任何内容。
 */
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");

/**
 * Proper shutdown
 * This should be done prior to destroying a producer instance
 *   to make sure all queued and in-flight produce requests are completed before terminating.
 * 关闭生产者实例前需确保所有在队列中和正在生产的生产请求都已完成。
 * Not calling flush can lead to message loss!
 * 不调用flush会导致消息丢失!
 */
$timeout_ms = 60000; // 1 minute
$rk->flush($timeout_ms);

echo 'finished';exit;



attachments-2020-04-ivy7bEn05e93ca32e31cf.jpg

  • 发表于 2020-04-13 10:15
  • 阅读 ( 607 )
  • 分类:中间件

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
Pack
Pack

1135 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1658 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章