一、安装kafka扩展-rdkafka功能更加强大
扩展包:https://github.com/arnaud-lb/php-rdkafkahttps://github.com/arnaud-lb/php-rdkafka
1、安装librdkafka (安装库文件)
下载地址:https://github.com/edenhill/librdkafka
wget -c https://codeload.github.com/edenhill/librdkafka/zip/master
unzip master
cd librdkafka-master/
yum install gcc-c++
./configure
make && make install
2、安装php-rdkafka
下载地址:https://github.com/arnaud-lb/php-rdkafka
wget -c https://codeload.github.com/arnaud-lb/php-rdkafka/zip/master
unzip master
cd php-rdkafka-master/
phpize
./configure --with-php-config=/usr/local/php/bin/php-config
make && make install
phpize是一个运行脚本,主要作用是检测php的环境还有就是在特定的目录生成相应的configure文件
php-config 是一个简单的命令行脚本用于查看所安装的 PHP 配置的信息,这样make install之后,生成的.so文件才会自动加载到php扩展目录下面。
配置php.ini并重启php-fpm
在php.ini中添加
extension=rdkafka.so
service php-fpm restart
PHP消费
参考https://github.com/arnaud-lb/php-rdkafka
$rk = new RdKafka\Consumer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");
$topic = $rk->newTopic("test");
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
// The first argument is the partition (again).
// The second argument is the timeout.
$msg = $topic->consume(0, 1000);
if($msg==NULL){
sleep(1);
}
else{
echo '#'.$msg->payload."#\n";
}
}
参考
kafka quickstart http://kafka.apache.org/quickstart
网友评论