page contents

PHP操作使用消息中间件Kafka

以下内容希望帮助到大家!

attachments-2020-05-GDkkOgdj5ecc7e44539c3.png

简单测试


环境:Centos6.4,PHP7,kafka服务器IP:192.168.9.154,PHP服务器:192.168.9.157

在192.168.9.157创建目录和文件。

//生产者
<?php
require './modules/kafka.php';
$rk = new kafka();
$rk->send(['hello my kafka']);
echo 'OK~';


//消费者
<?php
require './modules/kafka.php';
$rk = new kafka();
$rk->consumer();


//Kafka类
<?php
class kafka
{
    public $broker_list = '192.168.9.154:9092';  //现在是一个,也可以多个,用逗号隔开
    public $topic = 'mytopic';                      //定义topic
    public $partition = 0;                         //定义topic的物理分组,这里是0
    public $logFile = './kafkalog/info.log';     //计划在消费的时候写日志,指定日志文件

    protected $producer = null;                  //存储producer对象
    protected $consumer = null;                     //存储consumer对象

    public function __construct()
    {

        if (empty($this->broker_list)) 
        {
            echo 'broker not config';
        }
        $rk = new \Rdkafka\Producer();  //实例化对象

        if (empty($rk)) {
            echo 'producer error1';
        }
        $rk->setLogLevel(LOG_DEBUG);  //设置错误级别
        if(!$rk->addBrokers($this->broker_list)) {//设置Broker地址
            echo 'producer error2';
        }     
        $this->producer = $rk;
    }

    //生产者的方法(生产者把日志向消息队列发送)
    public function send($message = []) 
    {
        
        $topic = $this->producer->newTopic($this->topic);  //创建topic

        $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode([$message]); //生产
    }
    //消费者方法 (监听消息队列)
    public function consumer()
    {
        $conf = new \Rdkafka\Conf();
        $conf->set('group.id', 0);
        $conf->set('metadata.broker.list', $this->broker_list);
        $topicConf = new \Rdkafka\topicConf();
        $topicConf->set('auto.offset.reset', 'smallest');

        $conf->setDefaultTopicConf($topicConf);

        $consumer = new \Rdkafka\kafkaConsumer($conf);

        $consumer->subscribe([$this->topic]); //订阅

        echo "wating for message....\n";

        while(true) {
            $message = $consumer->consume(0*1000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    echo '要处理消息了~~~';
                    $messageInfo = $message->payload;
                    // echo $messageInfo."\n";
                    break;
            }
            sleep(1);
        }
    }
}

记住消费者PHP文件要在终端运行:php consumer.php。

这里就不测试了。


工作代码

/**
     * 将用户的登陆信息放到 Kafka
     *
     */
    public function sendCustomerLoginInfoToKafka($param){
        $customerLoginInfoServiceClient = new CustomerLoginInfoServiceClient();
        $msg =  json_encode($param);
        $topic=isset(Yii::app()->params['customer_login_info_topic'])?Yii::app()->params['customer_login_info_topic']:'e_user_login_info';
        $result = $customerLoginInfoServiceClient->add($topic, $msg);
    }


/**
 * 客户登陆信息 服务化接口调用client端
 */
class CustomerLoginInfoServiceClient {

    public function add($topic, $msg) {
        //直接进kafka不再调用java服务
        EdjKafka::getInstance()->putin(array("topic" => $topic, "payload" => $msg));
    }

}


class EdjKafka {

    private static $instance;

    public static function getInstance($className=__CLASS__) {
        if (empty(self::$instance)) {
            self::$instance = new $className();
        }
        return self::$instance;
    }

    public function putin($params) {
        $task = array(
            'class' => __CLASS__,
            'method' => 'putin_job',
            'params' => $params,
        );
        Queue::model()->putin($task, 'phptokafka_0000');
    }

    public function putin_job($params) {
        KafkaProducer::getInstance()->putin($params["topic"], $params["payload"]);
    }
}


<?php
require_once(Yii::app()->basePath.'/vendors/kafka/autoload.php');  //kafka包在最下面

class KafkaProducer {

    private static $instance;
    private $producer;
    private $partitionCountMap = array();

    public static function getInstance($className=__CLASS__) {
        if (empty(self::$instance)) {
            self::$instance = new $className();
        }
        return self::$instance;
    }

    public function __construct() {
        $brokers = Yii::app()->params['kafka']['brokers'];

        $newProducer = \Kafka\Produce::getInstance($brokers, 3000, $brokers);
        $newProducer->setRequireAck(-1);
        $this->producer = $newProducer;
    }

    private function getPartitionCount($topic, $force=false) {
        $now = time();

        //3分钟查询一次patition
        if( !$force && array_key_exists($topic, $this->partitionCountMap) && $this->partitionCountMap[$topic]["expire"] > $now ) {
            return $this->partitionCountMap[$topic]["count"];
        }


        //获取到topic下可用的partitions
        $this->producer->getClient()->refreshMetadata();
        $partitions = $this->producer->getAvailablePartitions($topic);
        EdjLog::info(__METHOD__.'|'.$topic.'|get partition|'.json_encode($partitions));
        $partitionCount = count($partitions);
        if ($partitionCount == 0) {
            EdjLog::error(__METHOD__."|".$topic."|topic partitions count 0");
        }

        $this->partitionCountMap[$topic] = array("count" => $partitionCount, "expire" => $now + 180);

        return $partitionCount;
    }

    public function putin($topic, $payload) {
        if(empty($topic)) {
            return;
        }

        $partitionCount = $this->getPartitionCount($topic);

        if ($partitionCount != 0) {
            try {
                $pid = time() % $partitionCount;
                $this->producer->setMessages($topic, $pid, array($payload));
                $result = $this->producer->send();
                EdjLog::debugLog(__METHOD__.'|'.$topic.'|'.$pid);
            } catch (\Kafka\Exception $e) {
                EdjLog::error(__METHOD__.'|'.$e->getMessage());
                $this->getPartitionCount($topic, true);
            }
        }
    }
}


<?php
return array(
    'brokers' => "123.123.123.123:9092,123.123.123.123:9093,123.123.123.123:9094",  //ip一样,端口不一样
    //topic名的映射,推荐用class名字做key
    //测试环境和线上用不同的配置文件
    'topicmap' => array(
        "RDriverPositionToKafka" => "driver_location_test",
        "ROrderToKafka" => "order_test",
        "SubmitOrderAutoService_saveOrderInfoJob" => "finished_order_picture",
        'vip_customer_change' => 'vip_customer_change',
     ),
);


<?php
/**
 * 基于redis的queue队列
 */
class Queue {
    private static $_models;

    public $queue_max_length = array(
    );

    public static function model($className=__CLASS__) {
        $model=null;
        if (isset(self::$_models[$className]))
            $model=self::$_models[$className];
        else {
            $model=self::$_models[$className]=new $className(null);
        }
        return $model;
    }

    //确定redis
    private function select_redis($type) {
        return QueuePool::model()->get_zone($type);
    }

    private function trim($queue_name) {

        $type = str_replace("queue_", "", $queue_name);
        $max = 0;
        if (isset($this->queue_max_length[$type])) {
            $max = intval($this->queue_max_length[$type]);
        }
        if ($max>0) {
            $zone = $this->select_redis($type);
            if($zone) {
                $zone['redis']->lTrim($queue_name, 0, $max-1);
            }
            else {
                EdjLog::error("can not find zone, queue name: " . $type);
                return;
            }
        }
    }

    /**
     * 放入队列,统一队列对外暴露方法,增加类型默认放task队列,指定了就放对应的队列,同时如果不在指定类型内的,也放默认队列
     *
     * @author sunhongjing 2013-07-04
     * @param unknown_type $params
     * @param unknown_type $type
     * @return mixed
     */
    public function putin($params=null, $type){
        $type = empty($type) ? 'error' : strtolower($type);

                $base_qname = QNameManagerService::model()->get_base_qname($type);

        if(!empty($base_qname)) {
            $this->queue_name = 'queue_'.$base_qname;
        }else{
            $this->queue_name = 'queue_error';
        }

        if ($params===null) {
            return $this->get();
        } else {
            return $this->add($params);
        }
    }

    /**
     * 取一条队列数据,封装多个队列,统一调用方法
     * @author sunhongjing 2013-07-09
     * @param string $type
     * @return array
     */
    public function getit($type='default')
    {
        $base_qname = QNameManagerService::model()->get_base_qname($type);

        if(!empty($base_qname)) {
            $this->queue_name = 'queue_'.$base_qname;
        }else{
            return array();
        }

        $zone = $this->select_redis($type);
        if($zone) {
            if($zone['brpop']) {
                $json = '';
                $result = $zone['redis']->brPop($this->queue_name, $zone['brpop']);
                if(!empty($result) && isset($result[1])) {
                    $json = $result[1];
                }
            }
            else {
                $json = $zone['redis']->rPop($this->queue_name);
            }
        }
        else {
            EdjLog::error("can not find zone, queue name: " . $type);
            return array();
        }

        return json_decode($json, true);
    }

    /**
     * 返回队列接收的类型列表
     * @author sunhongjing 2013-07-04
     * @return array
     */
    public function getQueueTypeList()
    {
        $list = QNameManager::model()->findall();
        if($list) {
            return $list;
        }

        EdjLog::error("Error: get queue list from database");
        return array();
    }

    /**
     * 设置或者读取位置队列
     * @param array $params
     * @return mixed
     */
    public function position($params=null) {
        $this->queue_name='queue_position';

        if ($params===null) {
            return $this->get();
        } else {
            return $this->add($params);
        }
    }

    /**
     * 心跳队列
     * @param string $params
     * @return mixed
     */
    public function heartbeat($params=null) {
        $this->queue_name='queue_heartbeat';

        if ($params===null) {
            return $this->get();
        } else {
            return $this->add($params);
        }
    }

    /**
     * 最高优先级队列
     * @param string $params
     * @return mixed
     */
    public function task($params=null) {
        $this->queue_name='queue_task';

        if ($params===null) {
            return $this->get();
        } else {
            return $this->add($params);
        }
    }

    /**
     * 保存日志到数据库
     * @param string $params
     * @return mixed
     */
    public function dumplog($params=null) {
        $this->queue_name='queue_dumplog';

        if ($params===null) {
            return $this->get();
        } else {
            return $this->add($params);
        }
    }

    /**
     * 返回各个队列中的任务总数
     */
    public function length() {

        $queue = $this->getQueueTypeList();

        $queue_length=array();
        $reg = "/P[0-9]+$/";
        foreach($queue as $item) {
            $base_qname = $item->base_qname;
            $zone = $this->select_redis($base_qname);
            $key = 'queue_'.$base_qname;
            if($zone) {
                $len = $zone['redis']->lLen($key);
                if(isset($item->max) && $len > $item->max) {
                    $key = '!'.$key;
                }

                $pkey = '';
                if(preg_match($reg, $zone['name'])) {
                    $pkey = $key.'@'.$zone['name'];
                }
                else {
                    $pkey = $key.'@'.$zone['name']."_P".$item->level;
                }

                $queue_length[$pkey] = $len;
            }
            else {
                EdjLog::error("can not find zone, queue name: " . $key);
            }
        }

        return $queue_length;
    }

    private function get() {
        $type = str_replace("queue_", "", $this->queue_name);
        $zone = $this->select_redis($type);
        if($zone) {
            if($zone['brpop']) {
                $json = '';
                $result = $zone['redis']->brPop($this->queue_name, $zone['brpop']);
                if(!empty($result) && isset($result[1])) {
                    $json = $result[1];
                }
            }
            
                        
  • 发表于 2020-05-26 10:26
  • 阅读 ( 601 )
  • 分类:中间件

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
Pack
Pack

1135 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1658 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章