本篇结构:
- kafka topic 管理脚本
- kafka 生产者控制台
- kafka 消费者控制台
- kafka 消费者组管理脚本
- kafka 消费者性能测试脚本
- kafka 消息日志目录信息查询脚本
一、kafka topic 管理脚本
1.1、创建 topic
kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 2 --topic test-topic
参数说明
- --topic:指定topic name
- --partitions:指定分区数,这个参数需要根据 broker 数和数据量决定,正常情况下,每个 broker 上两个 partition 最好
- --replication-factor:指定 partition 的 replicas 数,建议设置为 2,也要注意,该参数不能大于 brocker 数量,否则会抛出 InvalidReplicationFactorException 异常。
1.2、查看 topic
1.2.1、列举所有 topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
1.2.2、查看某个 topic 信息
kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test-topic
Topic:test-topic PartitionCount:2 ReplicationFactor:1 Configs:
Topic: test-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: test-topic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
1.2.3、查看所有 topic 信息
kafka-topics.sh --describe --zookeeper 127.0.0.1:2181
1.3、删除 topic
1.3.1、添加配置:
config/server.properties
delete.topic.enable=true
1.3.2、重启kafka server:
bin/kafka-server-start.sh config/server.properties
1.3.3、删除指定topic
kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic test-topic
二、kafka 生产者控制台
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test-topic
如果连接集群,那么 broker-list 参数格式为:HOST1:PORT1,HOST2:PORT2,HOST3:PORT3
2.1、参数说明
- --key-serializer:指定 key 的序列化方式,默认是 org.apache.kafka.common.serialization.StringSerializer
- --value-serializer:指定value的序列化方式,默认是 org.apache.kafka.common.serialization.StringSerializer
三、kafka 消费者控制台
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic [--group groupName] [--partition 目标分区]
eg:
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic --partition 1
3.1、参数说明
- --key-deserializer:指定 key 的反序列化方式,默认是 org.apache.kafka.common.serialization.StringDeserializer
- --value-deserializer:指定 value 的反序列化方式,默认是 org.apache.kafka.common.serialization.StringDeserializer
- --from-beginning:从最早的消息开始消费,默认是从最新消息开始消费
- --offset: 从指定的消息位置开始消费,如果设置了这个参数,还需要带上 --partition。否则会提示:The partition is required when offset is specified.
- --timeout-ms:当消费者在这个参数指定时间间隔内没有收到消息就会推出,并抛出异常:kafka.consumer.ConsumerTimeoutException
- --whitelist:接收的 topic 白名单集合,和 --topic二者取其一。例如:--whitelist "test.*" (以 test 开头的 topic),--whitelist "test" (指定 test 这个 topic),"test|test1"`(指定 test 或者 test1 这两个 topic)
四、kafka 消费者组管理脚本
4.1、查看所有消费者组
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
4.2、查看某个消费者组消费情况
kafka-consumer-groups.sh --describe --bootstrap-server 127.0.0.1:9092 --group test-topic
五、kafka 消费者性能测试脚本
kafka-consumer-perf-test.sh --broker-list localhost:9092 --group testGroup --topic test-topic --messages 1024
六、kafka 消息日志目录信息查询脚本
kafka-log-dirs.sh --describe --bootstrap-server 127.0.0.1:9092 --topic-list test-topic
{
"version":1,
"brokers":[
{
"broker":1,
"logDirs":[
{
"logDir":"/kafka/kafka-logs-ubuntu",
"error":null,
"partitions":[
{
"partition":"test-topic-0",
"size":72,
"offsetLag":0,
"isFuture":false
},
{
"partition":"test-topic-1",
"size":0,
"offsetLag":0,
"isFuture":false
}
]
}
]
}
]
}
网友评论