PHP怎么消费Kafka
PHP需要安装对应扩展,如:RdKafka
其中的一种安装方式:
apt install librdkafka-dev
pecl install rdkafka
函数说明看这里
https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka-consumertopic.consumequeuestart.html
关于RdKafka
The RdKafka extension provides a Kafka client for PHP, supporting Kafka 0.8 and higher. It supports the high level KafkaConsumer and Producer, the low level Consumer, and the Metadata API.
- 定义消费者 \RdKafka\Consumer
- 定义要消费主题
- 调用对应消费方法,读取消息
注意:要消费多个 partition 需创建队列
$queue = $this->rDkafka->newQueue();
$conf = new \RdKafka\Conf();
$conf->set('group.id','myConsumerGroup');
$rk = new \RdKafka\Consumer($conf);
$rk->addBrokers('ip:port');
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('auto.offset.reset', 'smallest');
$this->kafkaTopic = $rk->newTopic(self::KAFKA_TOPIC, $topicConf);
$this->rDkafka = $rk;
从指定offset开始消费
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$this->kafkaTopic->consumeStart($partition, $offset?$offset:RD_KAFKA_OFFSET_STORED);
网友评论