美文网首页
docker PHP容器内搭建kafka

docker PHP容器内搭建kafka

作者: 街头民工 | 来源:发表于2022-03-21 20:06 被阅读0次

    环境准备:php ==7.2,java==8

    java下载地址:https://www.oracle.com/java/technologies/downloads/#java8

    image.png

    解压 : tar -zxvf jdk-8u281-linux-x64.tar.gz
    配置环境变量 : vim ~/.bashrc

    JAVA_HOME=/usr/local/java/jdk1.8.0_281
    CLASSPATH=$JAVA_HOME/lib/
    PATH=$PATH:$JAVA_HOME/bin
    export PATH JAVA_HOME CLASSPATH
    

    重载 : source /etc/profile

    kafka 下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.7.0/kafka_2.12-2.7.0.tgz

    #解压
    tar -zxvf kafka_2.12-2.7.0.tgz
    
    #进入目录
    cd kafka_2.12-2.7.0
    

    (窗口1)启动ZooKeeper

    这时窗口会停留在监听状态,再新建一个窗口,同目录下启动kafka
    bin/zookeeper-server-start.sh config/zookeeper.properties
    

    (窗口2)启动kafka

    bin/kafka-server-start.sh config/server.properties
    

    (窗口3)再新建一个窗口,创建一个topic(topic用来存储数据)

    bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
    
    在这个topic上写入一些内容,使用producer(生产者)
    bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
    
    这时会出现输入光标,可以输入一些字符,然后按回车生产数据。
    

    (窗口4)再新建一个窗口,使用consumer(消费者)来使用刚刚生产者生产的数据

    bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
    

    这时可以比对3和4两个窗口,3窗口的输入,会在4窗口上输出。

    这时,我们就完成了kafka的一个基本流程。包括使用zookeeper来管理kafka进程,新建topic和broker,使用producer生产数据以及使用consumer来消费数据。这些会话都可以用ctrl+c来终止。
    下面,我们通过一个demo来看下php是如何与kafka交互的。

    php 需要 pecl命令 没安装的安装一下

    在安装php-rdkafka之前需要给系统安装一个库,librdkafka。

    git clone https://github.com/edenhill/librdkafka.git
     ./configure
     make
    make install
    
    然后安装扩展 rdkafka
    pecl install rdkafka
    
    php.ini配置文件写入
    extension=rdkafka.so
    

    安装完使用 php -m |grep kafka 看下有没有添加成功

    运行producer来生产数据 : vim producer.php

    <?php
    #producer.php 文件
    $conf = new RdKafka\Conf();
    $conf->set('metadata.broker.list', 'localhost:9092');
    
    
    $producer = new RdKafka\Producer($conf);
    
    $topic = $producer->newTopic("quickstart-events");
    
    for ($i = 0; $i < 10; $i++) {
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
        $producer->poll(0);
    }
    
    for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
        $result = $producer->flush(10000);
        if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
            break;
        }
    }
    
    if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
        throw new \RuntimeException('Was unable to flush, messages might be lost!');
    }
    

    执行:php producer.php

    上面第5窗口如果没有关闭,也能同步收到这边在topic写入的数据。

    运行consumer来消费数据: vim consumer.php

    $conf = new RdKafka\Conf();
    
    $conf->set('group.id', 'myConsumerGroup');
    
    $rk = new RdKafka\Consumer($conf);
    $rk->addBrokers("127.0.0.1");
    
    $topicConf = new RdKafka\TopicConf();
    $topicConf->set('auto.commit.interval.ms', 100);
    $topicConf->set('offset.store.method', 'broker');
    $topicConf->set('auto.offset.reset', 'earliest');
    
    $topic = $rk->newTopic("quickstart-events", $topicConf);
    
    $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
    
    while (true) {
        $message = $topic->consume(0, 120*10000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                var_dump($message);
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for more\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed out\n";
                break;
            default:
                throw new \Exception($message->errstr(), $message->err);
                break;
        }
    }
    

    执行:php consumer.php

    可以看到,进入了监听状态,这时,每执行一次producer.php,都会在consumer这里打印响应的数据。

    参考文档
    https://www.jianshu.com/p/ab476fde3532
    https://www.cnblogs.com/yanweifeng/p/15878470.html
    https://www.cnblogs.com/gwyy/p/12206176.html

    相关文章

      网友评论

          本文标题:docker PHP容器内搭建kafka

          本文链接:https://www.haomeiwen.com/subject/yqbtjrtx.html