美文网首页
2019-10-27

2019-10-27

作者: 自己为王 | 来源:发表于2019-10-27 07:12 被阅读0次

    Kafka基本操作

    1. 查看Topic

    --list命令(列举所有主题):

    kafka-topics.sh –list –zookeeper server-1:2181,server-2:2181

    --describe命令(不指定topic参数则查看所有主题信息,若指定topic参数则查看特定主题的信息):

    kafka-topics.sh –describe –zookeeper server-1:2181,server-2:2181

    #查看指定主题已覆盖的配置:

    Kafka-topics.sh –describe –zookeeper server-1:2181,server-2:2181,server-3:2181–topics-with-overrides–topic config-test

    2.创建Topic

    命令:

    kafka-topics.sh –create–zookeeper server-1:2181,server-2:2181,server-3:2181 --replication-factor 2 –partitions3 –topic kafka-action

    此时会在${log.dir}目录下创建相应的分区文件目录,副本分别分布在不同的节点上。

    同时登陆ZooKeeper客户端查看所创建的主题元数据信息,登陆ZK之后,“kafka-action”元数据信息如下:

    Ls /brokers/topics/kafka-action/partitions

    [0,1,2]

    ls /brokers/topics/kafka-action

    {“version”:1,”partitions”:{“2”:[3,1], “1”:[2,3],”0”:[1,2]}}

    注意:

    1)创建topic时,主要要指定副本数、分区数等,如果不指定副本和分区数,默认分别创建${num.partitions}个分区数和${default.replcation.factor}个副本。

      2)当生产者向一个还未创建的topic发送消息时,会自动创建该副本,但是前提是:auto.create.topics.enable=true

    3. 修改Topic

        当创建一个主题之后,可以通过alter命令对主题进行修改,包括修改主题级别的配置、增加主题分区、修改副本分配方案、修改主题Offset等。

    命令:

    kafka-topics.sh --alter –zookeeper server-1:2181,server-2:2181,server-3:2181–topic config-test –config max.message.bytes=204800

    修改max.message.bytes参数值,某些版本使用kafka-topics.sh脚本修改参数可以成功,但是可能会提示该脚本已过时,在后续版本可能会被移除,推荐使用kafka-configs.sh脚本修改参数。

    kafka-topics.sh --alter –zookeeper server-1:2181,server-2:2181,server-3:2181–topic config-test –configsegment.bytes=209715200

    修改segment.bytes参数后,可通过topics-with-overrides查看主题config-test已覆盖的配置信息。

    删除参数segment.bytes配置:

    kafka-topics.sh --alter –zookeeper server-1:2181,server-2:2181,server-3:2181–topic config-test –delete-configsegment.bytes

    再次通过topics-with-overrides查看主题config-test已覆盖的配置信息,此时已不包含segment.bytes。

    增加分区:

    Kafka并不支持减少分区的操作,只能为一个主题增加分区。

    假设当前分区为3个,要增加至5个:

    kafka-topics.sh --alter –zookeeper server-1:2181,server-2:2181,server-3:2181–topic config-test –partitions 5

    4.删除Topic

    删除Kafka主题,一般有以下两种方式:

    [if !supportLists](1)      [endif]手动删除各节点${log.dir}目录下该主题分区文件夹,同时登录ZK客户端删除待删除主题对应的节点,主题元数据保存在/brokers/topics和/config/topics目录下。

    [if !supportLists](2)      [endif]执行kafka-topics.sh脚本进行删除,若希望通过该脚本彻底删除主题,则需要保证在启动Kafka时所加载的server.properties文件中配置delete.topic.enable=true,该配置默认为false。否则执行该脚本并未真正删除主题,而是在ZK的/admin/delete_topics目录下创建一个与待删除主题同名的节点,将该主题标记为删除状态。

    5.启动生产者发送消息

    启动一个向主题kafka-action发送消息的生产者,同时指定每条消息包含有Key:

    Kafka-console-producer.sh –broker-listserver-1:9092,server-2:9092,server-3:9092 –topic kafka-action –property parse.key=true

    该命令执行后,控制台等待客户端输入消息。由于没有指定消息Key与消息净荷(payload)之间的分隔符,默认是以制表符分隔。可以通过配置项key.separator指定。

    Kafka-console-producer.sh –broker-list server-1:9092,server-2:9092,server-3:9092–topic kafka-action –property parse.key=true –property key.separator=’ ’

    在控制台分别输入一批消息,消息key与消息实际数据之间以空格分隔。然后执行以下命令,验证消息是否发送成功。

    Kafka-run-class.shkafka.tools.GetOffsetShell –broker-list server-1:9092,server-2:9092,server-3:9092–topic kafka-action –time -1

    以上命令用于查看某个主题各分区对应消息偏移量。可以通过partitions参数指定一个或多个分区,多个分区之间以逗号分隔,若不指定则默认查看该主题所有分区;time参数表示查看在指定时间之前的数据,支持-1(latest)、-2(earliest)两个时间选项,默认取值为-1.

    执行以上命令输出结果如下,共3列,分别表示主题名、分区编号、消息偏移量:

    Kafka-action:2:6

    Kafka-action:1:6

    Kafka-action:0:4

    通过结果信息可知,总共产生了16条消息,3个分区按编号从大到小依次有6条、6条、4条消息。

    6.查看消息

    Kafka生产的消息以二进制的形式在文件中,为了便于查看消息内容,Kafka提供了一个查看日志文件的工具类kafka.tools.DumpLogSegments.命令如下:

    Kafka-run-class.shkafka.tools.DumpLogSegments –files ./././00000000000000.log

    7.启动消费者接受消息

    相关文章

      网友评论

          本文标题:2019-10-27

          本文链接:https://www.haomeiwen.com/subject/xhlpvctx.html