第一步,kafka安装使用入门
0:环境准备,Linux环境(CentOS为例),wget命令,git命令,java8
1:下载kafka
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.7.0/kafka-2.7.0-src.tgz
//链接失效可以在http://kafka.apache.org/downloads找到最新的包
2:解压
tar -zxvf kafka_2.12-2.7.0.tgz
3:进入目录
cd kafka_2.12-2.7.0
4:启动ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
5:这时窗口会停留在监听状态,再新建一个窗口,同目录下启动kafka
bin/kafka-server-start.sh config/server.properties
6:再新建一个窗口,创建一个topic(topic用来存储数据)
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
7:在这个topic上写入一些内容,使用producer(生产者)
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
这时会出现输入光标,可以输入一些字符,然后按回车生产数据。
8:再新建一个窗口,使用consumer(消费者)来使用刚刚生产者生产的数据
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
这时可以比对7和8两个窗口,7窗口的输入,会在8窗口上输出。
这时,我们就完成了kafka的一个基本流程。包括使用zookeeper来管理kafka进程,新建topic和broker,使用producer生产数据以及使用consumer来消费数据。这些会话都可以用ctrl+c来终止。
下面,我们通过一个demo来看下php是如何与kafka交互的。
第二步,php和kafka交互入门
0:环境准备,php7,Linux环境,pecl命令
1:安装依赖
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make && make install
2: 安装php扩展
pecl install rdkafka
安装完使用php -m |grep kafka,看下有没有添加成功
3:运行producer来生产数据
<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'localhost:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("quickstart-events");
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
执行:
php producer.php
这时,上面第8条的窗口如果没有关闭,也能同步收到这边在topic写入的数据。
4:运行consumer来消费数据
vim consumer.php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'myConsumerGroup');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'broker');
$topicConf->set('auto.offset.reset', 'earliest');
$topic = $rk->newTopic("quickstart-events", $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0, 120*10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
执行:
php consumer.php
可以看到,进入了监听状态,这时,每执行一次producer.php,都会在consumer这里打印响应的数据。
网友评论