美文网首页
4.php使用kafka

4.php使用kafka

作者: 呦丶耍脾气 | 来源:发表于2022-12-25 22:59 被阅读0次

    本文使用docker容器操作

    docker中部署kafka看下文档第二部分:2.kafka部署

    $ docker network create app-tier --driver bridge
    $ docker run -d --name zookeeper-server \
        --network app-tier \
        -e ALLOW_ANONYMOUS_LOGIN=yes \
        bitnami/zookeeper:latest
    $ docker run -d --name kafka-server \
        --network app-tier \
        -e ALLOW_PLAINTEXT_LISTENER=yes \
        -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 \
        bitnami/kafka:latest
    $ docker exec -it kafka-server /bin/bash
    $ cd /opt/bitnami/kafka
    #创建一个topic
    $ /opt/bitnami/kafka$ bin/kafka-topics.sh --create --name test --bootstrap-server localhost:9092
    #查看主题
    $ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    #test
    

    运行容器如下:


    • 进入php容器执行
    docker exec -it php /bin/bash
    #git命令最好在phg的和宿主机的映射目录执行
    git clone https://github.com/edenhill/librdkafka.git
     ./configure
     make
    make install
    
    然后安装扩展 rdkafka
    pecl install rdkafka
    
    php.ini配置文件写入
    extension=rdkafka.so
    
    • 查看是否生效
    php -m |grep kafka
    
    • 进入kafka容器,创建一个topic:test
    docker exec -it kafka-server /bin/bash
    cd /opt/bitnami/kafka
    bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
    
    • 进入php容器,在运行目录下创建producer.phpconsumer.php
    #producer.php
    <?php
    $conf = new RdKafka\Conf();
    $conf->set('metadata.broker.list', '192.168.172.131:9092');
    $producer = new RdKafka\Producer($conf);
    $topic = $producer->newTopic("test");
    for ($i = 0; $i < 10; $i++) {
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
        $producer->poll(0);
    }
    for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
        $result = $producer->flush(10000);
        if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
            break;
        }
    }
    if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
        throw new \RuntimeException('Was unable to flush, messages might be lost!');
    }
    
    #consumer.php
    <?php
    $conf = new RdKafka\Conf();
    $conf->set('group.id', 'myConsumerGroup');
    $rk = new RdKafka\Consumer($conf);
    $rk->addBrokers("192.168.172.131");
    $topicConf = new RdKafka\TopicConf();
    $topicConf->set('auto.commit.interval.ms', 100);
    $topicConf->set('offset.store.method', 'broker');
    $topicConf->set('auto.offset.reset', 'earliest');
    $topic = $rk->newTopic("test", $topicConf);
    $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
    while (true) {
        $message = $topic->consume(0, 120*10000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                var_dump($message);
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for more\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed out\n";
                break;
            default:
                throw new \Exception($message->errstr(), $message->err);
                break;
        }
    }
    
    • 执行php
    php producer.php
    #consumer执行后会一直监听,当有新的生产进来后仍会打印
    php consumer.php 
    

    相关文章

      网友评论

          本文标题:4.php使用kafka

          本文链接:https://www.haomeiwen.com/subject/tubgqdtx.html