美文网首页
超赞的Kafka命令行 !!!比原生好用n倍

超赞的Kafka命令行 !!!比原生好用n倍

作者: 梨花菜 | 来源:发表于2020-11-23 21:37 被阅读0次

    1、kaf安装和配置

    Centos7源码安装Golang

    • 安装kaf
      go get -u github.com/birdayz/kaf/cmd/kaf

    • 配置kaf
      新增一个kafka集群配置,名为qa

    [root@qa-cdh-001 bin]# kaf config add-cluster qa --brokers kafka-22-7:9093
    Added cluster.
    

    选择qa这个kafka集群(只需要执行一次,后续都会默认使用qa这个集群)

    [root@qa-cdh-001 bin]# kaf config use-cluster qa
    Switched to cluster "qa".
    

    2、Golang kaf常用操作

    2.1创建topic


    创建名为test_topic1的topic,一共有5个分区

    [root@qa-cdh-001 bin]# kaf topic create test_topic1 -p 5
    ✅ Created topic!
          Topic Name:            test_topic1
          Partitions:            5
          Replication Factor:    1
          Cleanup Policy:        delete
    

    2.2发送topic

    发送一条消息"send msg to test_topic1"到test_topic1这个主题

    [root@qa-cdh-001 bin]# echo -n "send msg to test_topic1"|kaf produce test_topic1
    Sent record to partition 0 at offset 0.
    

    2.3消费topic

    起一个名为lihuacai的groupid来消费test_topic1这个主题

    [root@qa-cdh-001 ~]# kaf consume test_topic1 -g lihuacai
    Partition:   0
    Offset:      2
    Timestamp:   2020-11-23 21:04:22.971 +0800 CST
    
    Partition:   0
    Offset:      3
    Timestamp:   2020-11-23 21:05:32.733 +0800 CST
    send msg to test_topic1
    

    2.3.1消费多个topicszmsg_supply,szmsg_supply_all_new

    [root@quanbu-qa-22-19 ~]# kaf consume szmsg_supply,szmsg_supply_all_new -g lihuacai2
    
    Topic: szmsg_supply   Partition: 0    Offset: 663100   Timestamp: 2020-12-10 16:11:30.785 +0800 CST
    1607587890wangdd2916075878
    
    Topic: szmsg_supply_all_new   Partition: 0    Offset: 8261909   Timestamp: 2020-12-10 16:11:30.787 +0800 CST
    {
      "clientid": "wangdd29",
      "cloudTime": 1607587890,
      "deviceTime": 1607587889,
      "lapCnt": 399000,
    }
    
    

    2.3.2 过滤topic指定字段hc

    [root@quanbu-qa-22-19 ~]# kaf consume szmsg_supply,szmsg_supply_all_new -g lihuacai2 -F hc
    
    Topic: szmsg_supply   Partition: 0    Offset: 663161   Timestamp: 2020-12-10 16:14:52.71 +0800 CST
    1607588092hc01607588092A1
    
    Topic: szmsg_supply_all_new   Partition: 0    Offset: 8261994   Timestamp: 2020-12-10 16:14:53.136 +0800 CST
    {
      "clientid": "hc0",
      "cloudTime": 1607588092,
      "deviceTime": 1607588092,
      "factoryId": "factoryIdMChc0",
    }
    
    

    2.4查看topic

    查看test_topic1这个主题的相关信息

    [root@qa-cdh-001 bin]# kaf topic describe test_topic1
    Name:        test_topic1
    Internal:    false
    Compacted:   false
    Partitions:
      Partition  High Watermark  Leader  Replicas  ISR
      ---------  --------------  ------  --------  ---
      0          1               9       [9]       [9]
      1          0               7       [7]       [7]
      2          0               8       [8]       [8]
      3          0               9       [9]       [9]
      4          0               7       [7]       [7]
    Config:
      Name            Value       ReadOnly  Sensitive
      ----            -----       --------  ---------
      cleanup.policy  delete      false     false
      segment.bytes   1073741824  false     false
    

    2.5查看groupid消息积压

    查看lihuacai这个groupid消息积压情况
    在0分区,积压了2条消息
    计算规则是Lag = High Watermark - Group Offset

    [root@qa-cdh-001 bin]# kaf group describe lihuacai
    Group ID:        lihuacai
    State:           Empty
    Protocol:
    Protocol Type:   consumer
    Offsets:
      test_topic1:
        Partition  Group Offset  High Watermark  Lag  Metadata
        ---------  ------------  --------------  ---  --------
        0          2             4               2
    

    2.6重置groupid偏移量

    lihuacai这个groupid全部分区的偏移量重置到最新状态
    --all-partitions: 所有分区
    -o: --offset,偏移量
    --noconfirm:直接提交偏移量,不需要确认

    [root@qa-cdh-001 bin]# kaf group commit lihuacai -t test_topic1 --all-partitions -o latest --noconfirm
    Partition 0: determined offset 4 from timestamp.
    Partition 1: determined offset 0 from timestamp.
    Partition 2: determined offset 0 from timestamp.
    Partition 3: determined offset 0 from timestamp.
    Partition 4: determined offset 0 from timestamp.
    Resetting offsets to: map[0:4 1:0 2:0 3:0 4:0]
    Successfully committed offsets to map[0:4 1:0 2:0 3:0 4:0].
    

    相关文章

      网友评论

          本文标题:超赞的Kafka命令行 !!!比原生好用n倍

          本文链接:https://www.haomeiwen.com/subject/uraviktx.html