美文网首页
17. Apache Kafka

17. Apache Kafka

作者: 奉先 | 来源:发表于2017-10-15 12:26 被阅读114次

    1. Kafka重要概念和技术架构:

    1.实时数据分析中的Kakfa

    在实时数据分析应用,Kafka的位置非常重要。首先通过Flume将Nginx服务器的日志,直接sink到Kakfa。然后通过Storm等实时计算框架,将Kafka数据计算并写入到HBase/Redis中,最后通过web客户端直接访问HBase/Redis来展示数据。

    1.Kafka消息存储:Kafka的消息存储在Broker节点的磁盘上,并且是顺序写。
    2.Kafka消息的消费特性:消费者消费Kafka上的消息,不会对消息进行删除操作,消息可以被不同的消费者重复消费。在Kafka Broker上保存的时间,由参数log.retention.hours等确定

    2.Kafka的重要概念:

    1.Broker:Kafka集群中的每一个节点服务器,集群是由一个或者多个Broker组成的。
    2.Producer:消息的生产者
    3.Consumer:消息的消费者
    4.Topic:保存消息的逻辑单元,类似database中的table。消息按照不同类别发布到Kafka集群上,每个类别称之为一个topic。
    5.Partition:Topic内的消息,在物理上按照分区(Partition)存储。
    6.Consumer Group:让某几个consumer共同消费一个topic中的消息,这几个consumer不会出现重复消费消息的情况。

    3.Kafka架构图:

    Kafka架构图.png

    Kafka集群只是一个或者多个Broker节点,节点间通信通过Zookeeper完成。

    2. Kafaka集群安装

    一定按照官方的文档来安装和部署。

    1.安装计划部署:

    计划在三台主机上安装部署Kafka集群:192.168.8.128,192.168.8.129,192.168.8.130。我选用的Kafka安装版本:kafka_2.10-0.10.1.0.tgz,在官网下载安装文件,下载后,解压压缩包:

    $ tar zxvf kafka_2.10-0.10.1.0.tgz -C /opt/modules/
    

    2. 修改配置文件:

    切换到KAFKA_HOME目录,修改配置文件config/server.properties文件。

    $ cd /opt/modules/kafka_2.10-0.10.1.0/
    $ vi config/server.properties
    

    sever.properties中有很多配置项,一般地,以下需要修改来完成kafka集群配置:

    #节点唯一标识号,kafka集群中的每个broker,该项都不能重复
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    
    #Kafka消息的存放路径,注意不是日志而是Kafka的消息
    # A comma seperated list of directories under which to store log files
    log.dirs=/opt/modules/kafka_2.10-0.10.1.0/kafka-logs
    
    #Kafka集群保存消息的时间,单位是时间。也就是Kafka消息在集群上保存1周后删掉。
    # The minimum age of a log file to be eligible for deletion
    log.retention.hours=168
    
    #Zookeeper服务器的地址,逗号分隔多个zookeeper主机
    zookeeper.connect=hadoop-senior01.pmpa.com:2181,hadoop-senior02.pmpa.com:2181,hadoop-senior03.pmpa.com:2181
    

    3.拷贝其他主机,修改broker.id

    1.拷贝主机:

    $ scp -r kafka_2.10-0.10.1.0/ natty@hadoop-senior03.pmpa.com:/opt/modules/
    

    2.在config/server.properties配置文件中修改broker.id项
    每一个broker的server.properties需要修改。

    4.启动Kafka

    启动kakfa,使用脚本bin/kafka-server-start.sh脚本,参数是config/server.properties文件。

    $ nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
    

    可以写一个shell来批量启停kafka集群。在每一个broker上执行启动和停止服务器的脚本。kafkaServer_batch.sh:

    #!/bin/bash
    KAFKA_HOME=/opt/modules/kafka_2.10-0.10.1.0
    if [ $# -ne 1 ]
    then
       echo "Usage:kafkaServer_batch.sh [start|stop]";
       exit -1;
    fi
    action=$1
    if [ $action = "start" ]
    #start add parameters!!!!
    then
       batchscript="kafka-server-start.sh ${KAFKA_HOME}/config/server.properties";
    elif [ $action = "stop" ]
    then
       batchscript=kafka-server-stop.sh;
    else
       echo "bad parameter!!";
       exit -1;
    fi
    
    echo $batchscript
    
    for i in 192.168.8.128 192.168.8.129 192.168.8.130
    do
      echo "${action} node ${i} Begin:"
      echo "source /etc/profile && nohup ${KAFKA_HOME}/bin/${batchscript} >/dev/null 2>&1 &"
      ssh ${i} "source /etc/profile && nohup ${KAFKA_HOME}/bin/${batchscript} >/dev/null 2>&1 &" &
      echo "${action} node ${i} End:"
    done
    

    启动后,jps查看进程,可以看到Kafka进程:

    2921 QuorumPeerMain
    3025 Kafka
    3308 Jps
    

    3. Kafka使用测试(ConsoleProducer、ConsoleConsumer)

    Kafka集群安装配置好了之后,我们用自带的命令行生产、消费者来测试下kafka集群的使用。开启ConsolePrducer和ConsoleConsumer,使用如下2个脚本:
    $KAFKA_HOME/bin/kafka-console-consumer.sh
    $KAFKA_HOME/bin/kafka-console-producer.sh

    不带任何参数运行这2个脚本,可以查看帮助信息(其他这些shell脚本也可以这样查看帮助)。注意,必须的参数前有个REQUIRED标志。


    帮助信息.png

    先创建一个测试的topic:

    $bin/kafka-topics.sh --create --topic test_natty --partitions 3  --replication-factor 1 --zookeeper vm-master:2181
    

    启动一个命令行的producer:

    $bin/kafka-console-producer.sh --broker-list vm-master:9092 --topic test_natty
    

    启动一个命令行的consumer:

    $bin/kafka-console-consumer.sh --topic test_natty --zookeeper vm-master:2181
    

    这样就开启了2个空白的窗口,在producer的窗口输入了消息后,就可以在consumer窗口看到刚才发送的消息:如下图:
    producer输入:


    producer.png

    consumer输出:


    consumer.png

    4. Flume sink数据到Kafka

    1.首先增加一个flume agent配置

    新增一个配置文件,kafka-sink.properties, 该配置文件实现如下flume配置:
    Source: netcat (监听某个服务器44444端口的数据,nc命令)
    Channel: Memory
    Sink: Kafka Topic(使用之前创建的测试topic:test_natty )
    在使用时候,需要注意使用的flume的版本,在确定了flume的版本后,到官网找到 Kafka Sink部分的配置信息,按照例子来填写Kafka Sink就可以了。之前在测试时候,使用了一个flume 1.8 和 kafka 0.8版本的组合,发现了问题。最终的问题的原因是Kafka版本过低,需要到0.10版本才能解决该问题,所以在测试这个case时,还需要注意使用的flume、kafka的版本。下面的例子使用的flume 1.6版本
    文件 kafka-sink.properties 的详细配置文件如下:

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    
    # define the netcat source:
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = localhost
    a1.sources.s1.port = 44444
    
    
    # define the memory channel:
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    
    #define the kafka sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = test_natty
    a1.sinks.k1.brokerList = 10.198.193.189:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 20
    
    # Bind the source and sink to the channel
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

    启动flume agent:

    $bin/flume-ng agent -n a1 -c -conf -f conf/kafka-sink.properties -Dflume.root.logger=INFO,console
    

    启动了flume之后,查看端口监听的情况,可以看到44444端口已经被监听:

    $netstat --help
    $netstat -nltp
    

    2.执行结果测试

    flume启动后,会有一个Application进程在监听localhost:44444端口,如果有错误想杀掉flume进程,直接查询flume的PID后kill即可。

    $ps -aux | grep flume
    $kill -9 XXX
    

    下面,我们尝试通过telnet 向localhost:44444发送数据, 再开启一个Console-consumer来监控flume是否已经开始往topic test_natty中sink了数据。
    开启telnet:

    $telnet localhost 44444
    
    telnet往44444端口发送数据.png

    Console-consumer消费topic数据:


    Console-Consumer消费数据.png

    5. Storm与Kafka集成:

    下面我开发了一个简单的例子,来展示storm 普通Topology 和Trident读取Kafka数据源的方法,后续的bolt的开发就根据业务来定制化。测试方法:在第4部分,配置了一个nc cluster host --> Kafka的配置。 我们就使用这个例子来进行测试,后续Storm的处理也非常简单(直接打印)。最后,测试效果是,通过nt 命令发送一些数据,storm会将nc命令输入的一些string,打印在console上,这样完成测试。

    1.Kafka与普通的Topology集成:

    1.需要引入maven的storm-kafka依赖包。Storm的spout读取kafka数据源。

    2. Kafka与Trident集成:

    3. 使用KafkaBolt将数据存入Kafka:

    一般情况,将数据存入Kafka的Case很少,如果需要的话,使用KafkaBolt来实现。

    相关文章

      网友评论

          本文标题:17. Apache Kafka

          本文链接:https://www.haomeiwen.com/subject/noaquxtx.html