最近项目中需要从Kafka中读取消息,记录一下。
安装扩展
https://l1905.github.io/kafka/php/2020/07/07/php-user-kafka/
apt install librdkafka-dev
pecl install rdkafka
消费(从指定的 partition)
conf->set
librdkafka 配置项说明
https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
// 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。
$conf->set('group.id', 'myConsumerGroup1');
//添加 kafka集群服务器地址
$conf->set('metadata.broker.list', '192.168.33.1:9092');
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
//当没有初始偏移量时,从哪里开始读取
$topicConf->set('auto.offset.reset', 'smallest');
// Set the configuration to use for subscribed/assigned topics
topicConf);
$conf = new \RdKafka\Conf();
$conf->set('group.id','ali_icebox_refund');
$rk = new \RdKafka\Consumer($conf);
$rk->addBrokers(\Cake\Core\Configure::read('ubox_kafka_service'));
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('auto.offset.reset', 'smallest');
$topic = $rk->newTopic('ali_icebox_refund', $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while(true) {
// 设置消费时的时间间隔,单位毫秒,以下表示60秒消费一个
$message = $topic->consume(0, 5000);
if ($message) {
echo "读取到消息\n\r";
// 消息对象,包括消息主题,消息创建时间戳,消息分区编号,消息主体,消息键名,消息长度等
var_dump($message);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo "读取消息成功:\n\r";
var_dump($message->payload);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "读取消息失败\n\r";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "请求超时\n\r";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
} else {
echo "未读取到消息\n\r";
}
关于消费
低级消费
需要必备的字段 :
- broke_list : 服务器节点
- topic_name: 发布的topic队列名称
- group.id : 组ID, 默认是 zdmDefaultConsumerGroup
- partition: 分区索引, 即消费哪个分区的消息
必须指定group.id和partition, 因为消费偏移量是 和二者直接关联
重要的参数设置:
- socket.timeout.ms 网络链接超时时间
- auto.offset.reset 如果偏移量存储还没有初始化或偏移量超过范围时的处理方式, earliest 最早偏移地址,latest最晚偏移地址
- enable.auto.commit 是否开启自动上报偏移量, 默认true
- enable.auto.offset.store 将自动提交偏移量存储到内存中, 默认开启
- enable.auto.commit 是否开启自动提交
- auto.commit.interval.ms 自动提交的时间间隔
- max.poll.interval.ms 高级消费者模式下,消费最大时长, 如果此次消费时长太长,则server端会剔除该group成员, 重新reblance 消费端分区
常见问题:
-
高级消费和低级消费的区别?
高级消费即不需要指定消费的分区, sdk自动帮你选择消费的分区 -
希望重新消费数据,需要怎么操作
可以重设消费偏移量lowLevelConsume
方法的option参数传递consume_start_offset
.- RD_KAFKA_OFFSET_BEGINNING 从最开始开始消费
- RD_KAFKA_OFFSET_STORED 从上次消费的位置开始消费
- RD_KAFKA_OFFSET_END 从最新位置开始消费
- rd_kafka_offset_tail(xxx) 从最新偏移量的某个位置开始消费
-
是否支持批量消费.
sdk支持,但我们的调用方法没有封装, 具体使用参考
高级消费
需要必备的字段 :
- 不需要指定分区,其他和低级消费参数一致
重要的参数设置:
和低级消费参数一致
网友评论