美文网首页
在PHP中使用Kafka

在PHP中使用Kafka

作者: 编程放大镜 | 来源:发表于2023-07-17 10:02 被阅读0次

    最近项目中需要从Kafka中读取消息,记录一下。

    安装扩展

    https://l1905.github.io/kafka/php/2020/07/07/php-user-kafka/
    apt install librdkafka-dev
    pecl install rdkafka

    消费(从指定的 partition)

    conf = new \RdKafka\Conf();conf->set

    librdkafka 配置项说明
    https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

    // 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。
    $conf->set('group.id', 'myConsumerGroup1');

    //添加 kafka集群服务器地址
    $conf->set('metadata.broker.list', '192.168.33.1:9092');

    // Set where to start consuming messages when there is no initial offset in
    // offset store or the desired offset is out of range.
    // 'smallest': start from the beginning
    //当没有初始偏移量时,从哪里开始读取
    $topicConf->set('auto.offset.reset', 'smallest');

    // Set the configuration to use for subscribed/assigned topics
    conf->setDefaultTopicConf(topicConf);

            $conf = new \RdKafka\Conf();
            $conf->set('group.id','ali_icebox_refund');
            $rk = new \RdKafka\Consumer($conf);
            $rk->addBrokers(\Cake\Core\Configure::read('ubox_kafka_service'));
    
            $topicConf = new \RdKafka\TopicConf();
            $topicConf->set('auto.commit.interval.ms', 100);
            $topicConf->set('auto.offset.reset', 'smallest');
    
            $topic = $rk->newTopic('ali_icebox_refund', $topicConf);
            $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
    while(true) {
                // 设置消费时的时间间隔,单位毫秒,以下表示60秒消费一个
                $message = $topic->consume(0, 5000);
                if ($message) {
                    echo "读取到消息\n\r";
    
                    // 消息对象,包括消息主题,消息创建时间戳,消息分区编号,消息主体,消息键名,消息长度等
                    var_dump($message);
    
                    switch ($message->err) {
                        case RD_KAFKA_RESP_ERR_NO_ERROR:
                            echo "读取消息成功:\n\r";
                            var_dump($message->payload);
                            break;
                        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                            echo "读取消息失败\n\r";
                            break;
                        case RD_KAFKA_RESP_ERR__TIMED_OUT:
                            echo "请求超时\n\r";
                            break;
                        default:
                            throw new \Exception($message->errstr(), $message->err);
                            break;
                    }
                } else {
                    echo "未读取到消息\n\r";
                }
    

    关于消费

    低级消费

    需要必备的字段 :

    • broke_list : 服务器节点
    • topic_name: 发布的topic队列名称
    • group.id : 组ID, 默认是 zdmDefaultConsumerGroup
    • partition: 分区索引, 即消费哪个分区的消息

    必须指定group.id和partition, 因为消费偏移量是 和二者直接关联

    重要的参数设置:

    • socket.timeout.ms 网络链接超时时间
    • auto.offset.reset 如果偏移量存储还没有初始化或偏移量超过范围时的处理方式, earliest 最早偏移地址,latest最晚偏移地址
    • enable.auto.commit 是否开启自动上报偏移量, 默认true
    • enable.auto.offset.store 将自动提交偏移量存储到内存中, 默认开启
    • enable.auto.commit 是否开启自动提交
    • auto.commit.interval.ms 自动提交的时间间隔
    • max.poll.interval.ms 高级消费者模式下,消费最大时长, 如果此次消费时长太长,则server端会剔除该group成员, 重新reblance 消费端分区

    常见问题:

    1. 高级消费和低级消费的区别?
      高级消费即不需要指定消费的分区, sdk自动帮你选择消费的分区

    2. 希望重新消费数据,需要怎么操作
      可以重设消费偏移量 lowLevelConsume方法的option参数传递 consume_start_offset.

      • RD_KAFKA_OFFSET_BEGINNING 从最开始开始消费
      • RD_KAFKA_OFFSET_STORED 从上次消费的位置开始消费
      • RD_KAFKA_OFFSET_END 从最新位置开始消费
      • rd_kafka_offset_tail(xxx) 从最新偏移量的某个位置开始消费
    3. 是否支持批量消费.
      sdk支持,但我们的调用方法没有封装, 具体使用参考

    高级消费

    需要必备的字段 :

    • 不需要指定分区,其他和低级消费参数一致

    重要的参数设置:

    和低级消费参数一致

    相关文章

      网友评论

          本文标题:在PHP中使用Kafka

          本文链接:https://www.haomeiwen.com/subject/wqcwudtx.html