美文网首页
php+kafka+zookeeper+logstash

php+kafka+zookeeper+logstash

作者: 华尔街地摊dy | 来源:发表于2017-03-14 21:14 被阅读0次

         本书主要实现的目标是php连接kafka并且成功发送消息给kafka。为了验证这个连接和发送,另外配置了logstash监听kafka相对应的消息,然后转发到redis,原来我不知道对kafka比较陌生,不知道怎么看里面的消息内容(我知道安装包里有个consumer和producer的脚本) ^ _ ^

    消息发送路径:php->kafka->logstash->redis

    1.安装kafka

    下载地址:https://kafka.apache.org/

    下载解压后进入根目录,

    bin/zookeeper-server-start.sh config/zookeeper.properties &  开启zookeeper

    bin/kafka-server-start.sh config/server.properties & 开启kafka

    另开一个终端然后

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka

    这样就创建了一个topic为kafka的消息通道

    如果这个步骤成功的话,可以通过另开终端发送消息

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka

    执行之后就可以输入消息发送了。

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka --from-beginning

    来接受上面终端发送的消息

    2.安装php zookeeper扩展,并且用php发送消息

    安装扩展之前需要安装zookeeper的c client,扩展有依赖,步骤如下

    1.通过apt-get来安装(楼主用的是ubuntu)

    2.通过源码安装,地址:https://github.com/apache/zookeeper.git

    下载下来

    cd zookeeper

    ./configure

    make && sudo make install

    按照如上步骤,/usr/local/bin目录下就会多一个cli_mt

    php的zookeeper的源码可以去pecl.php.net下载,然后老步骤

    phpize

    ./configure --with-libzookeeper=/usr/local/bin/cli_mt (如果你安装扩展的php不是默认的php,则需要带上--with-php-config参数)

    make && sudo make install

    最后别忘了添加extension=zookeeper.so到php.ini

    3.配置logstash

    下载地址:https://www.elastic.co/downloads/logstash

    修改配置文件,由于楼主的logstash版本已经是5.2的了,所以又是一阵谷歌,才发现很多网上的配置都是1.2版本的,已经不兼容了。

    input{

    kafka{

    bootstrap_servers=>"localhost:9092"

    topics=>["kafka"]

    }

    }

    output{

    redis{

    host=>"127.0.0.1"

    port=>6379

    key=>"kafka"

    data_type=>"list"

    password=>"123456"

    }

    }

    4.php发送消息

    网上找了一圈,终于找到一个可以用的

    https://github.com/nmred/kafka-php

    也可以用

    composer require "nmred/kafka-php"

    php代码如下:

    require "./vendor/autoload.php";

    $produce = \Kafka\Produce::getInstance('10.37.129.2:2181', 3000);

    $produce->setRequireAck(-1);

    $topicName = "kafka";

    $partitions = $produce->getAvailablePartitions($topicName);

    $partCount = count($partitions);

    var_dump('$partCount:'.$partCount);

    $count = 0;

    $message = json_encode(array('uid' => $count, 'age' => $count%100, 'datetime' => date('Y-m-d H:i:s')));

    //发送消息到不同的partition

    $partitionId = $count%$partCount;

    $produce->setMessages($topicName, $partitionId, array($message));

    $result = $produce->send();

    var_dump($result);

    参考文章:http://blog.kazaff.me/2015/06/05/logstash%E6%95%B4%E5%90%88kafka/

    最后附一张截图

    相关文章

      网友评论

          本文标题:php+kafka+zookeeper+logstash

          本文链接:https://www.haomeiwen.com/subject/wvfknttx.html