1、kaf
安装和配置
-
安装
kaf
go get -u github.com/birdayz/kaf/cmd/kaf
-
配置
kaf
新增一个kafka
集群配置,名为qa
[root@qa-cdh-001 bin]# kaf config add-cluster qa --brokers kafka-22-7:9093
Added cluster.
选择qa
这个kafka集群(只需要执行一次,后续都会默认使用qa
这个集群)
[root@qa-cdh-001 bin]# kaf config use-cluster qa
Switched to cluster "qa".
2、Golang kaf常用操作
2.1创建topic
创建名为test_topic1的topic,一共有5个分区
[root@qa-cdh-001 bin]# kaf topic create test_topic1 -p 5
✅ Created topic!
Topic Name: test_topic1
Partitions: 5
Replication Factor: 1
Cleanup Policy: delete
2.2发送topic
发送一条消息"send msg to test_topic1"到test_topic1这个主题
[root@qa-cdh-001 bin]# echo -n "send msg to test_topic1"|kaf produce test_topic1
Sent record to partition 0 at offset 0.
2.3消费topic
起一个名为lihuacai的groupid来消费test_topic1这个主题
[root@qa-cdh-001 ~]# kaf consume test_topic1 -g lihuacai
Partition: 0
Offset: 2
Timestamp: 2020-11-23 21:04:22.971 +0800 CST
Partition: 0
Offset: 3
Timestamp: 2020-11-23 21:05:32.733 +0800 CST
send msg to test_topic1
2.3.1消费多个topicszmsg_supply,szmsg_supply_all_new
[root@quanbu-qa-22-19 ~]# kaf consume szmsg_supply,szmsg_supply_all_new -g lihuacai2
Topic: szmsg_supply Partition: 0 Offset: 663100 Timestamp: 2020-12-10 16:11:30.785 +0800 CST
1607587890wangdd2916075878
Topic: szmsg_supply_all_new Partition: 0 Offset: 8261909 Timestamp: 2020-12-10 16:11:30.787 +0800 CST
{
"clientid": "wangdd29",
"cloudTime": 1607587890,
"deviceTime": 1607587889,
"lapCnt": 399000,
}
2.3.2 过滤topic指定字段hc
[root@quanbu-qa-22-19 ~]# kaf consume szmsg_supply,szmsg_supply_all_new -g lihuacai2 -F hc
Topic: szmsg_supply Partition: 0 Offset: 663161 Timestamp: 2020-12-10 16:14:52.71 +0800 CST
1607588092hc01607588092A1
Topic: szmsg_supply_all_new Partition: 0 Offset: 8261994 Timestamp: 2020-12-10 16:14:53.136 +0800 CST
{
"clientid": "hc0",
"cloudTime": 1607588092,
"deviceTime": 1607588092,
"factoryId": "factoryIdMChc0",
}
2.4查看topic
查看test_topic1
这个主题的相关信息
[root@qa-cdh-001 bin]# kaf topic describe test_topic1
Name: test_topic1
Internal: false
Compacted: false
Partitions:
Partition High Watermark Leader Replicas ISR
--------- -------------- ------ -------- ---
0 1 9 [9] [9]
1 0 7 [7] [7]
2 0 8 [8] [8]
3 0 9 [9] [9]
4 0 7 [7] [7]
Config:
Name Value ReadOnly Sensitive
---- ----- -------- ---------
cleanup.policy delete false false
segment.bytes 1073741824 false false
2.5查看groupid
消息积压
查看lihuacai
这个groupid
消息积压情况
在0分区,积压了2条消息
计算规则是Lag = High Watermark - Group Offset
[root@qa-cdh-001 bin]# kaf group describe lihuacai
Group ID: lihuacai
State: Empty
Protocol:
Protocol Type: consumer
Offsets:
test_topic1:
Partition Group Offset High Watermark Lag Metadata
--------- ------------ -------------- --- --------
0 2 4 2
2.6重置groupid
偏移量
把lihuacai
这个groupid
全部分区的偏移量重置到最新状态
--all-partitions
: 所有分区
-o
: --offset
,偏移量
--noconfirm
:直接提交偏移量,不需要确认
[root@qa-cdh-001 bin]# kaf group commit lihuacai -t test_topic1 --all-partitions -o latest --noconfirm
Partition 0: determined offset 4 from timestamp.
Partition 1: determined offset 0 from timestamp.
Partition 2: determined offset 0 from timestamp.
Partition 3: determined offset 0 from timestamp.
Partition 4: determined offset 0 from timestamp.
Resetting offsets to: map[0:4 1:0 2:0 3:0 4:0]
Successfully committed offsets to map[0:4 1:0 2:0 3:0 4:0].
网友评论