1.安装java环境并配置环境变量
2.下载kafka安装包并解压
3.安装librdkafka库
git clone https://github.com/edenhill/librdkafka.git
./configure
make
sudo make install
4.安装php-rdkafka扩展
$ git clone https://github.com/arnaud-lb/php-rdkafka.git
#生成configure文件
$ /Users/shiyibo/LNMP/php/bin/phpize
#编译安装
$ ./configure --with-php-config=/Users/shiyibo/LNMP/php/bin/php-config
$ make
$ make install
#在php.ini 文件中配置 rdkafka扩展
$ vim /Users/shiyibo/LNMP/php/etc/php.ini
extension=rdkafka.so
#查看扩展是否生效
$php -m | grep kafka
5.守护模式启动zookeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
6.守护模式启动kafka
nohup bin/kafka-server-start.sh config/server.properties &
7.kafka客户端创建主题
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
8.kafka客户端创建生产者
bin/kafka-console-producer.sh --topic quickstart-events --broker-list localhost:9092
9.kafka客户端创建消费者
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
10.php创建生产者
<?php
/**
* 消息生产者
*/
$objRdKafka = new RdKafka\Producer();
//$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");
$oObjTopic = $objRdKafka->newTopic("ttt");
// 从终端接收输入
$oInputHandler = fopen('php://stdin', 'r');
while (true) {
echo "\nEnter messages:\n";
$sMsg = trim(fgets($oInputHandler));
// 空消息意味着退出
if (empty($sMsg)) {
break;
}
// 发送消息
$oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
}
echo "done\n";
11.php创建消费者
<?php
/**
* 消费者消费消息
*/
$objRdKafka = new RdKafka\Consumer();
//$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");
$oObjTopic = $objRdKafka->newTopic("ttt");
/**
* consumeStart
* 第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
* 第二个参数标识从什么位置开始拉取消息,可选值为
* RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
* RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
* RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
*/
$oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);
while (true) {
// 第一个参数是分区,第二个参数是超时时间
$oMsg = $oObjTopic->consume(0, 1000);
// 没拉取到消息时,返回NULL
if (!$oMsg) {
usleep(10000);
continue;
}
if ($oMsg->err) {
echo $oMsg->errstr(), "\n";
break;
} else {
file_put_contents("kafka.log",$oMsg->payload.PHP_EOL,FILE_APPEND);
echo $oMsg->payload, "\n";
}
}
网友评论