Kafka安装启动入门教程

作者: 董可伦 | 来源:发表于2018-07-11 23:26 被阅读2次

    转载请务必注明原创地址为:https://dongkelun.com/2018/05/21/kafkaConf/

    前言

    本文讲如何安装启动kafka,并进行测试,其中zookeepr是kafka自带的,本文基本按照官网文档进行安装启动的,并提出可能会出现的问题。官方文档:http://kafka.apache.org/quickstart
    本文虚拟机系统:centos7,不过其他版本的Linux系统是一样的~

    1、下载

    可直接在官网下载对应的版本http://kafka.apache.org/downloads,我下载的是二进制版的,由于我的scala版本是2.11,所以下载kafka_2.11-1.1.0.tgz,大家可以根据自己的实际情况选择对应的版本。执行以下命令即可下载到本地了。

    wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
    

    2、解压到指定目录

    tar -xzf kafka_2.11-1.1.0.tgz -C /opt/
    

    3、启动服务

    3.1 启动zookeeper

    kafka用到zookeeper,因此如果您的机器上没有zookeeper服务,则需要先启动zookpeer服务,本文使用kafka自带的zookeeper。

    cd /opt/kafka_2.11-1.1.0/
    bin/zookeeper-server-start.sh config/zookeeper.properties
    


    因日志较多,所以只给出前几行信息和最后一行成功的信息

    [2018-05-21 12:17:44,461] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    [2018-05-21 12:17:44,472] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
    ...
    [2018-05-21 12:17:44,844] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
    
    

    可以看到zookeeper服务的端口为2181

    3.2 启动kafka服务

    打开第二个终端

    bin/kafka-server-start.sh config/server.properties
    
    [2018-05-21 12:21:07,901] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
    [2018-05-21 12:21:09,417] INFO starting (kafka.server.KafkaServer)
    [2018-05-21 12:21:09,419] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
    ...
    [2018-05-21 12:21:15,955] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
    

    其中kafka的端口为9092,在下面这条信息可以看到

    INFO Registered broker 0 at path /brokers/ids/0 with addresses: ArrayBuffer(EndPoint(ambari.master.com,9092,ListenerName(PLAINTEXT),PLAINTEXT)) (kafka.zk.KafkaZkClient)
    

    4、创建一个主题

    打开第三个个终端

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    
    Created topic "test".
    

    可以通过list topic命令查看所有的主题

    bin/kafka-topics.sh --list --zookeeper localhost:2181
    
    test
    

    或者,您也可以将代理配置为在发布不存在的主题时自动创建主题,而不是手动创建主题。

    5、发送消息

    Kafka带有一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。
    启动生产者

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    

    输入几条消息发送到服务器

    >This is a message
    >This is another message
    
    

    [图片上传失败...(image-23876a-1531322761514)]

    6、启动消费者

    消费者可以将消息转储到标准输出
    打开第四个个终端

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    

    然后就可以在命令行看到生产者发送的消息了

    This is a message
    This is another message
    

    [图片上传失败...(image-f8324f-1531322761514)]

    7、spark远程连接kafka

    因为我是用spark进行开发的,所以此时就想测试一下,这样配置,用spark程序在celipse里是否可以获取到kafka的数据(我之前使用ambari搭建的kafka进行测试的),程序可参考Spark Streaming连接Kafka入门教程,但是运行程序发现并不能获取到kafka对应topic里的消息。
    几次尝试,发现修改下面的配置即可

    vim config/server.properties 
    
    listeners=PLAINTEXT://192.168.44.129:9092
    

    其中kafka默认的端口就是9092,不同的地方是端口前面加上了ip(192.168.44.129),我想默认的是localhost,spark程序远程通过ip地址和localhost对应不上,所以获取不到kafka的消息,注意在第8部分:配置多个broker的集群,为了和官方文档一致,并没有加上ip,大家可根据需要自行修改。
    这样的话就可以在程序里获取到历史消息了,但是如果想新产生几条数据的话,启动命令需要将localhost改为192.168.44.128(ambari安装的kafka启动命令localhost就没事,而且远程也可以获取到kafka的数据,因为我只是用来测试,所以就没深入研究~),否则会产生异常,即:

    bin/kafka-console-producer.sh --broker-list 192.168.44.129:9092 --topic test
    bin/kafka-console-consumer.sh --bootstrap-server 192.168.44.129:9092 --topic test --from-beginning
    

    对ambari有兴趣的可以参考centos7 ambari2.6.1.5+hdp2.6.4.0 大数据集群安装部署

    8、设置多个broker的集群

    到目前为止,我们设置的是单个broker,这样并不好,下面我们还是在这一台机器上设置三个节点。

    8.1 为每个broker创建一个配置文件

    cp config/server.properties config/server-1.properties
    cp config/server.properties config/server-2.properties
    

    然后用vim修改
    config/server-1.properties:

    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
    

    config/server-2.properties:

    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2
    

    (其中listeners需要把前面的注释也就是#去掉)
    broker.id是集群中每个节点唯一且永久的名称,因为我们实在同一个机器上运行这些文件,所以为了避免端口冲突和数据彼此覆盖,我们必须重写它的端口和日志目录。

    8.2 启动新节点

    我们已经启动了一个节点了(broker.id=0),现在启动两个新节点

    bin/kafka-server-start.sh config/server-1.properties &
    bin/kafka-server-start.sh config/server-2.properties &
    

    8.3 创建三个副本的新主题:

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    

    运行describe topics查看主题的信息

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    
    Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
        Topic: my-replicated-topic  Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2
    

    下面解释一下这些输出,第一行是所有分区的摘要,另外的每一行是一个分区的信息,因为这个主题只有一个分区,所以只有一行。
    leader:负责所有读和写,是这个分区从所有节点随机选择的。
    replicas:是为这个分区复制日志的节点列表,无论他们是领导者还是他们现在还活着。
    isr:是同步副本的集合,是还活着的副本的自己并被leader捕获(caught-up)。

    在我的示例中节点0是该主题唯一分区的leader。
    我们可以用相同的命令查看之前创建的主题 test

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
    
    Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
        Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    

    8.4 发送消息

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
    
    >my test message 1
    >my test message 2
    

    8、5 消费消息

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
    
    my test message 1
    my test message 2
    

    8.6 测试容错及发现的问题

    我的leader是节点0,现在kill掉

    ps aux | grep server.properties
    

    得到对应的进程号

    root      16557 34.6  8.4 4322904 326472 pts/1  Sl+  05:30 ...
    ...
    2.11-1.1.0/bin/../libs/zookeeper-3.4.10.jar kafka.Kafka config/server.properties
    
    root      11040  0.0  0.0 112648   976 pts/4    R+   03:04   0:00 grep --color=auto server-0.properties
    

    然后kill掉第一个(第二个是grep命令本身的)

    kill -9 16557
    

    kill掉之后再描述下topic

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
        Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 0,1,2 Isr: 1,2
    

    现在leader已经切换到节点1,Isr也只有节点1和2了

    官网上说:但是即使原来的leader失败,这些消息仍然可用于消费,但是下面又有一些坑
    首先执行下面这句

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
    

    发现会报错

    [2018-05-22 03:55:13,304] WARN [Consumer clientId=consumer-1, groupId=console-consumer-29320] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
    

    原因是节点0已经kill掉了,也就是端口9092已经不能用了,要换成别的端口,所以把上面的命令的端口换成节点1或者2,再执行,发现不报错了,但是没有得任何数据,即使再生成几条消息,也是不报错,但没有数据。
    于是想是不是因为节点0没有加 &,加上 &重新启动节点0,新建主题,直到找到leader为节点0的,再kill掉节点0发现还是同样的问题,但是官网上说是可以的...
    找一个leader不是节点0的主题进行测试,比如官网例子上的节点1,kill掉节点1,之前的leader由1变为0了,再消费测试,发现不管是新增的消息还是历史消息都可以得到,至于这是不是官网的bug及如何解决这个bug(可能还需要修改其他配置),目前我还没找到,如果找到我会更新的。

    相关文章

      网友评论

        本文标题:Kafka安装启动入门教程

        本文链接:https://www.haomeiwen.com/subject/punkpftx.html