美文网首页
在PHP中使用Kafka

在PHP中使用Kafka

作者: 编程放大镜 | 来源:发表于2023-07-17 10:02 被阅读0次

最近项目中需要从Kafka中读取消息,记录一下。

安装扩展

https://l1905.github.io/kafka/php/2020/07/07/php-user-kafka/
apt install librdkafka-dev
pecl install rdkafka

消费(从指定的 partition)

conf = new \RdKafka\Conf();conf->set

librdkafka 配置项说明
https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

// 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。
$conf->set('group.id', 'myConsumerGroup1');

//添加 kafka集群服务器地址
$conf->set('metadata.broker.list', '192.168.33.1:9092');

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
//当没有初始偏移量时,从哪里开始读取
$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics
conf->setDefaultTopicConf(topicConf);

        $conf = new \RdKafka\Conf();
        $conf->set('group.id','ali_icebox_refund');
        $rk = new \RdKafka\Consumer($conf);
        $rk->addBrokers(\Cake\Core\Configure::read('ubox_kafka_service'));

        $topicConf = new \RdKafka\TopicConf();
        $topicConf->set('auto.commit.interval.ms', 100);
        $topicConf->set('auto.offset.reset', 'smallest');

        $topic = $rk->newTopic('ali_icebox_refund', $topicConf);
        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while(true) {
            // 设置消费时的时间间隔,单位毫秒,以下表示60秒消费一个
            $message = $topic->consume(0, 5000);
            if ($message) {
                echo "读取到消息\n\r";

                // 消息对象,包括消息主题,消息创建时间戳,消息分区编号,消息主体,消息键名,消息长度等
                var_dump($message);

                switch ($message->err) {
                    case RD_KAFKA_RESP_ERR_NO_ERROR:
                        echo "读取消息成功:\n\r";
                        var_dump($message->payload);
                        break;
                    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                        echo "读取消息失败\n\r";
                        break;
                    case RD_KAFKA_RESP_ERR__TIMED_OUT:
                        echo "请求超时\n\r";
                        break;
                    default:
                        throw new \Exception($message->errstr(), $message->err);
                        break;
                }
            } else {
                echo "未读取到消息\n\r";
            }

关于消费

低级消费

需要必备的字段 :

  • broke_list : 服务器节点
  • topic_name: 发布的topic队列名称
  • group.id : 组ID, 默认是 zdmDefaultConsumerGroup
  • partition: 分区索引, 即消费哪个分区的消息

必须指定group.id和partition, 因为消费偏移量是 和二者直接关联

重要的参数设置:

  • socket.timeout.ms 网络链接超时时间
  • auto.offset.reset 如果偏移量存储还没有初始化或偏移量超过范围时的处理方式, earliest 最早偏移地址,latest最晚偏移地址
  • enable.auto.commit 是否开启自动上报偏移量, 默认true
  • enable.auto.offset.store 将自动提交偏移量存储到内存中, 默认开启
  • enable.auto.commit 是否开启自动提交
  • auto.commit.interval.ms 自动提交的时间间隔
  • max.poll.interval.ms 高级消费者模式下,消费最大时长, 如果此次消费时长太长,则server端会剔除该group成员, 重新reblance 消费端分区

常见问题:

  1. 高级消费和低级消费的区别?
    高级消费即不需要指定消费的分区, sdk自动帮你选择消费的分区

  2. 希望重新消费数据,需要怎么操作
    可以重设消费偏移量 lowLevelConsume方法的option参数传递 consume_start_offset.

    • RD_KAFKA_OFFSET_BEGINNING 从最开始开始消费
    • RD_KAFKA_OFFSET_STORED 从上次消费的位置开始消费
    • RD_KAFKA_OFFSET_END 从最新位置开始消费
    • rd_kafka_offset_tail(xxx) 从最新偏移量的某个位置开始消费
  3. 是否支持批量消费.
    sdk支持,但我们的调用方法没有封装, 具体使用参考

高级消费

需要必备的字段 :

  • 不需要指定分区,其他和低级消费参数一致

重要的参数设置:

和低级消费参数一致

相关文章

  • 使用php连接操作kafka

    使用php连接操作kafka,从安装kafka到引入php扩展来操作kafka。 一、安装 注:需安装JDK 1....

  • php 使用kafka

    准备工作 安装librdkafka 库 安装php-rdkafka 扩展 编码实现 生产者 代码实现 检验发送是否...

  • php使用kafka

    Ⅰ、kafka扩展安装参考[https://www.jianshu.com/p/c3eea6f43c4d] Ⅱ、代...

  • 消息中间件Kafka - PHP操作使用Kafka

    PHP使用Kafka 我们需要安装libkafka和rdkafka 安装libkafka 下载去GitHub上克隆...

  • PHP安装使用kafka

    1.安装java环境并配置环境变量2.下载kafka安装包并解压3.安装librdkafka库 4.安装php-r...

  • php中使用kafka

    早想用消息队列了,但是一直没有动手今天搞起来 安装kafka wget https://mirror.bit.ed...

  • PHP 中使用Kafka

    3个问题 安装 :a. 看kafka官方文档会知道,kafka主要给java用的,其他的语言多是通过c/c++的a...

  • PHP使用kafka入门

    第一步,kafka安装使用入门 0:环境准备,Linux环境(CentOS为例),wget命令,git命令,jav...

  • Druid:Integration with Kafka

    本文介绍在Kafka和Druid整合使用中遇到的问题和解决方法。 1. 基本配置 Druid使用Kafka作为数据...

  • A Kafka-based Ordering Service f

    Fabric 中基于Kafka的排序服务 1. 介绍 我们使用 Kafka 来支持在CFT(crash fault...

网友评论

      本文标题:在PHP中使用Kafka

      本文链接:https://www.haomeiwen.com/subject/wqcwudtx.html