美文网首页基础组件
kafka运维常用命令一:kafka 0.10.1及之后的版本

kafka运维常用命令一:kafka 0.10.1及之后的版本

作者: sheen口开河 | 来源:发表于2018-10-31 20:57 被阅读1067次

    目录

    • 1.1 启停kafkaserver
    • 1.2 建立和删除topic
    • 1.3 查看topic的最大最小offset
    • 1.4 通过控制台命令生产和消费消息
    • 1.5 查看消费者状态和消费详情
    • 1.6 重置消费者offset
    • 1.7 查看topic的状态和分区负载详情
    • 此文档适用于kafka-0.10.1及之后的版本
    • 此文档所有命令默认的路径都是kafka的home,即kafka安装目录

    1.1 启停kafkaserver

    命令:

    //启动kafka
    bin/kafka-server-start.sh -daemon config/server.properties
    //关闭kafka,由于是优雅启停,在进程真的结束之前可能有一些清理工作,所以不会进程不会立刻消失,等待数秒
    bin/kafka-server-stop.sh config/server.properties
    //等待数秒,如果kafka进程仍无法停止,执行
    kill pid
    //如果仍然无法停止
    kill -9 pid
    

    启动之后,可以查看进程、日志是否正常

    1.2 建立和删除topic

    kafka server端需要配置delete.topic.enable=true才可以删除,否则执行删除无效,只会将topic标记为删除,不会执行真正的删除

    命令:

    bin/kafka-topics.sh --zookeeper ZOOKEEPER_HOST1:PORT1,ZOOKEEPER_HOST2:PORT2 --create --replication-factor REPLICA_NUM --partitions PARTITION_NUM --topic TOPIC_NAME
    
    • ZOOKEEPER_HOST是kafka所使用的zookeeper的ip地址,PORT是zookeeper监听的端口。多个host port之间用逗号隔开
    • TOPIC_NAME是要创建的topic的名称
    • PARTITION_NUM是要创建的topic的分区数
    • REPLICA_NUM是要创建的topic的每个分区的副本数
    • zookeeper集群不需要全部列上,给出一个可用的zk地址和端口即可

    例如,新建一个名为new_created_topic的topic

    //新建topic,topic name为new_created_topic,zk为笔者的kafka集群使用的zk
    bin/kafka-topics.sh --zookeeper 172.21.37.197:22181 --create --replication-factor 3 --partitions 16 --topic new_created_topic
    
    //查看建立的topic的状态
    bin/kafka-topics.sh --zookeeper 172.21.37.197:22181 --describe --topic new_created_topic
    显示:
    Topic:new_created_topic PartitionCount:16       ReplicationFactor:3     Configs:
            Topic: new_created_topic        Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
            Topic: new_created_topic        Partition: 1    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
            Topic: new_created_topic        Partition: 2    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
            Topic: new_created_topic        Partition: 3    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
            Topic: new_created_topic        Partition: 4    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
            Topic: new_created_topic        Partition: 5    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
            Topic: new_created_topic        Partition: 6    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
            Topic: new_created_topic        Partition: 7    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
            Topic: new_created_topic        Partition: 8    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
            Topic: new_created_topic        Partition: 9    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
            Topic: new_created_topic        Partition: 10   Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
            Topic: new_created_topic        Partition: 11   Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
            Topic: new_created_topic        Partition: 12   Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
            Topic: new_created_topic        Partition: 13   Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
            Topic: new_created_topic        Partition: 14   Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
            Topic: new_created_topic        Partition: 15   Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
    其中:
    第一列是topic名称;第二列是partition序号;第三列是leader副本所在的kafka broker id,和kafka配置的id一致
    第三列是副本分配在哪些broker上,其值是broker id列表;第四列是处于同步状态的副本所在的broker id列表
    
    //查看topic列表,将显示server上所有的topic的列表,不显示详细信息
    bin/kafka-topics.sh --zookeeper 172.21.37.197:22181 --list
    
    //删除topic,删除后执行上一步的list,可以看到topic已经被删除;如果server没有配置允许删除,则只会标记marked for deleted
    bin/kafka-topics.sh --zookeeper 172.21.37.197:22181 --delete --topic new_created_topic
    

    1.3 查看topic的最大最小offset

    查看最大最小offset可以让我们知道某个topic的数据量是多少,并且知道每个partition的earliest和latest offset。这里解释下earliest offset和latest offset:

    • earliest offset:最早的offset,也就是最小的offset,在早期的kafka客户端api中,称之为smallest。假设partition 0的消息是从1127到7892,那么1127就是该partition的earliest offset
    • latest offset:最新offset,也就是最大的offset,在早期的kafka客户端api中,称之为largest。。假设partition 0的消息是从1127到7892,那么7892就是该partition的latest offset

    该命令在查看kafka消息的存量数量或者手工调整消费者的offset时需要用到。如果kafka消费了无效的offset,即消费的offset小于实际offset最小值或者大于实际offset最大值,将返回一个错误。

    命令:

    //查看最小offset
    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --topic TOPIC_NAME --time -2
    //查看最大offset
    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --topic TOPIC_NAME --time -1
    

    例如

    //这里笔者事先建立了一个名为tttttttt_topic的topic
    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.21.37.194:39092 --topic tttttttt_topic --time -2
    tttttttt_topic:2:1
    tttttttt_topic:1:2
    tttttttt_topic:0:7
    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.21.37.194:39092 --topic tttttttt_topic --time -1
    tttttttt_topic:2:698
    tttttttt_topic:1:630
    tttttttt_topic:0:639
    

    其中partition 2的earlist offset是1,latest offset是698,则里面有698条消息;所有的加起来,便是该topic的可用信息总量。

    1.4 通过控制台命令生产和消费消息

    在开发测试的过程中,有时候也在生产上,为了验证我们的topic可以正常被写入消息或者可以被正常消费,通常我们需要一个简单、直接的生产/消费工具。kafka在其bin里为我们提供了这样的脚本,可以直接在kafka server上通过命令的方式模拟生产者和消费者。

    1.4.1 控制台生产者-console producer

    命令:

    bin/kafka-console-producer.sh --broker-list BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --topic TOPIC_NAME
    
    • BROKER_HOST是kafka server的ip地址,PORT是server的监听端口。多个host port之间用逗号隔开
    • TOPIC_NAME是要发送消息的topic名称
    • 不用将所有的kafka server的ip和port都列在后面,事实上只要有一个或者的broker的ip和port配上就可以了,客户端连接上server之后会自动获取其他节点的信息

    例如,发送消息到topic为tttttttt_topic

    bin/kafka-console-producer.sh --broker-list 172.21.37.194:39092 --topic tttttttt_topic                   
    >testmessage-111
    >testmessage-222
    >
    

    1.4.2 控制台消费者-console consumer

    命令:

    bin/kafka-console-consumer.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --topic TOPIC_NAME [--from-beginning] 
    
    • BROKER_HOST是kafka server的ip地址,PORT是server的监听端口。多个host port之间用逗号隔开
    • TOPIC_NAME是要发送消息的topic名称
    • 不用将所有的kafka server的ip和port都列在后面,事实上只要有一个或者的broker的ip和port配上就可以了,客户端连接上server之后会自动获取其他节点的信息
    • --from-beginning是可选项,如果不加,则consumer从当前的latest offset开始消费,也就是说,执行了命令之后,需要有新的消息写入,这里才能消费到;如果加上了该参数,则从earliest offset开始消费。具体需不需要看场合,但是如果加了该参数,注意大量消息的疯狂刷屏,一般我们会结合grep或者定向到文件里

    在kafka-0.10.1之前,consumer的命令是“--zookeeper zookeeper列表”而不是“--bootstrap-server broker列表”,这是由于kafka在主键减少对zk的依赖。不过旧的命令依然可用,但是已经被标记为deprecated,不推荐使用,也许在后续的某个版本就会彻底移除

    例如,从topic为tttttttt_topic的主题里消费消息

    bin/kafka-console-consumer.sh --bootstrap-server 172.21.37.194:39092 --topic tttttttt_topic
    testmessage-444
    testmessage-555
    

    1.5 查看消费者状态和消费详情

    注意本节的命令仅适用于将消费者信息存放在broker上的情况。对于kafka 0.10.1之后,默认的java api都是将消费信息保存在broker上,那么适用于以下命令;如果是早期版本,或者是人为将消费者信息保存在zk或者其他地方,那么此处的命令将无效。如果是保存在zk上,可以参照第二章的相关命令。

    有时候我们需要关心消费者应用的状态,一般消费者应用会自己通过日志获知当前消费到了哪个topic的哪个partition的哪个offset,但当消费者出问题之后,或者出于监控的原因,我们需要知道消费者的状态和详情,那么需要借助kafka提供的相关命令。

    命令:

    //首先我们需要知道当前有哪些消费者group,如果已知,此步骤可忽略
    bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --list
    bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME --describe
    
    • BROKER_HOST是kafka server的ip地址,PORT是server的监听端口。多个host port之间用逗号隔开
    • 第一条命令是获取group列表,一般而言,应用是知道消费者group的,通常在应用的配置里,如果已知,该步骤可以省略
    • 第二条命令是查看具体的消费者group的详情信息,需要给出group的名称

    例如,首先列出消费者group列表

    bin/kafka-consumer-groups.sh --bootstrap-server 172.21.37.194:39092 --list
    Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
    
    console-consumer-89764
    console-consumer-45728
    

    上面的console-consumer-89764就是我们之前的console consumer。接着查看详情

    bin/kafka-consumer-groups.sh --bootstrap-server 172.21.37.194:39092 --group console-consumer-89764 --describe
    Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
    
    
    TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
    tttttttt_topic                 0          641             641             0          consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc   /172.21.37.194                 consumer-1
    tttttttt_topic                 1          632             632             0          consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc   /172.21.37.194                 consumer-1
    tttttttt_topic                 2          699             699             0          consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc   /172.21.37.194                 consumer-1
    

    其中

    • TOPIC:该group里消费的topic名称
    • PARTITION:分区编号
    • CURRENT-OFFSET:该分区当前消费到的offset
    • LOG-END-OFFSET:该分区当前latest offset
    • LAG:消费滞后区间,为LOG-END-OFFSET-CURRENT-OFFSET,具体大小需要看应用消费速度和生产者速度,一般过大则可能出现消费跟不上,需要引起应用注意
    • CONSUMER-ID:server端给该分区分配的consumer编号
    • HOST:消费者所在主机
    • CLIENT-ID:消费者id,一般由应用指定

    1.6 重置消费者offset

    注意本节的命令仅适用于将消费者信息存放在broker上的情况。对于kafka 0.10.1之后,默认的java api都是将消费信息保存在broker上,那么适用于以下命令;如果是早期版本,或者是人为将消费者信息保存在zk或者其他地方,那么此处的命令将无效。如果是保存在zk上,可以参照第二章的相关命令。

    通过上一节,我们知道了如何查看消费者详情,那么在生产实践中,有时我们可能希望认为修改消费者消费到的offset位置,以达到重新消费,或者跳过一部分消息的目的,这时候重置offset的工具就非常实用。

    命令:

    bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME  --reset-offsets --execute --to-offset NEW_OFFSET --topic TOPIC_NAME
    bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME  --reset-offsets --execute --to-earliest/--to-latest --topic TOPIC_NAME
    
    • BROKER_HOST是kafka server的ip地址,PORT是server的监听端口。多个host port之间用逗号隔开
    • 第一条命令是将指定GROUP_NAME和topic的offset修改到NEW_OFFSET的位置,重启消费者后,消费中将从指定的offset处消费。注意这里只能NEW_OFFSET只能设置一个值,也就是说,所有的分区都将使用这个值,如果分区消息负载不均衡,需要考虑是否适用。
    • 第二条命令是将指定GROUP_NAME和topic的offset修改到earliest或者latest位置,使得消费者从头或者从尾部消费。

    例如,消费者将tttttttt_topic的信息全部都消费了,假设此时我想将每个分区的offset都拉到200去,从200往后重新消费,则通过命令

    bin/kafka-consumer-groups.sh --bootstrap-server 172.21.37.194:39092 --group test_consumer_group_1  --reset-offsets --execute --to-offset 200 --topic tttttttt_topic
    Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
    
    
    TOPIC                          PARTITION  NEW-OFFSET     
    tttttttt_topic                 2          200            
    tttttttt_topic                 1          200            
    tttttttt_topic                 0          200         
    

    即可,这时候可以使用上一节的命令进行检查

    bin/kafka-consumer-groups.sh --bootstrap-server 172.21.37.194:39092 --group test_consumer_group_1 --describe       Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
    
    Consumer group 'test_consumer_group_1' has no active members.
    
    TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
    tttttttt_topic                 0          200             641             441        -                                                 -                              -
    tttttttt_topic                 2          200             699             499        -                                                 -                              -
    tttttttt_topic                 1          200             632             432        -                                                 -                              -
    

    可以看到offset已经被调整到了200

    注意:需要先停掉消费者,才可以成功执行该命令

    另,可以通过直接更换消费者group id的方式,配合消费者默认的消费策略,可以达到类似的效果,反而更加简单、高效和安全。

    1.7 查看topic的状态和分区负载详情

    当broker出现宕机,恢复之后,我们可以看下topic的leader是否负载均衡。因为kafka的所有读写消息的请求,都是发送到partition leader上的,因此在生产环境,负载均衡显得尤其重要。

    命令:

    bin/kafka-topics.sh --zookeeper ZOOKEEPER_HOST1:PORT1,ZOOKEEPER_HOST2:PORT2 --describe --topic TOPIC_NAME
    
    • ZOOKEEPER_HOST是kafka所使用的zookeeper的ip地址,PORT是zookeeper监听的端口。多个host port之间用逗号隔开
    • 类似的,zookeeper集群不需要全部列上,给出一个可用的zk地址和端口即可

    例如,查看topic为dcs_storm_collect_info_ios的负载信息:

    bin/kafka-topics.sh --zookeeper 172.21.22.161:2181 --describe --topic dcs_storm_collect_info_ios
    显示信息 :
    Topic:dcs_storm_collect_info_ios        PartitionCount:16       ReplicationFactor:2     Configs:
        Topic: dcs_storm_collect_info_ios       Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: dcs_storm_collect_info_ios       Partition: 1    Leader: 1       Replicas: 1,0   Isr: 0,1
        Topic: dcs_storm_collect_info_ios       Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: dcs_storm_collect_info_ios       Partition: 3    Leader: 1       Replicas: 1,0   Isr: 0,1
        Topic: dcs_storm_collect_info_ios       Partition: 4    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: dcs_storm_collect_info_ios       Partition: 5    Leader: 1       Replicas: 1,0   Isr: 0,1
        Topic: dcs_storm_collect_info_ios       Partition: 6    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: dcs_storm_collect_info_ios       Partition: 7    Leader: 1       Replicas: 1,0   Isr: 0,1
        Topic: dcs_storm_collect_info_ios       Partition: 8    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: dcs_storm_collect_info_ios       Partition: 9    Leader: 1       Replicas: 1,0   Isr: 0,1
        Topic: dcs_storm_collect_info_ios       Partition: 10   Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: dcs_storm_collect_info_ios       Partition: 11   Leader: 1       Replicas: 1,0   Isr: 0,1
        Topic: dcs_storm_collect_info_ios       Partition: 12   Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: dcs_storm_collect_info_ios       Partition: 13   Leader: 1       Replicas: 1,0   Isr: 0,1
        Topic: dcs_storm_collect_info_ios       Partition: 14   Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: dcs_storm_collect_info_ios       Partition: 15   Leader: 1       Replicas: 1,0   Isr: 0,1
    
    本例中查看的是topic dcs_storm_collect_info_ios,其分区数是16(PartitionCount:16),副本数是2(ReplicationFactor:2)
    根据第一行的分区信息
    Topic: dcs_storm_collect_info_ios       Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1
    我们知道编号为0的分区,其副本在broker id 0 和id 1上(Replicas: 0,1)
    其分区的首领也就是leader是broker 0,也就是编号为0的那个kafka节点(Leader: 0)
    在其所有副本(分布在0和1上)中,处于同步着的状态副本是0和1(Isr: 0,1)
    假设此时broker 0宕机了,那么应该看到的信息会是
    Topic: dcs_storm_collect_info_ios       Partition: 0    Leader: 1       Replicas: 1   Isr: 1
    

    如果发现以下现象说明kafka异常:

    1. 某个topic的每个分区,同步副本数量和设定的副本数量不一致
    2. 某个topic的每个分区,leader的id数值是-1或者none

    相关文章

      网友评论

        本文标题:kafka运维常用命令一:kafka 0.10.1及之后的版本

        本文链接:https://www.haomeiwen.com/subject/kdsztqtx.html