美文网首页
【kafka】2022-02-25【常用命令】

【kafka】2022-02-25【常用命令】

作者: SweetMojito | 来源:发表于2022-02-25 17:19 被阅读0次

    本人很懒,笔记这些都写在有道云;此外,感觉kafka的资料是真的多,且官网关于命令是真的很详细了,把自己常用的命令记录与分享下。

    备注:命令基于kafka-2.1.1,如有雷同,联系侵删

    # topic 相关

    ./kafka-topics.sh --zookeeper=zk_server --describe --topic test_topic

    ./kafka-topics.sh --zookeeper=zk_server --delete --topic test_topic

    ./kafka-topics.sh --zookeeper=zk_server --create --partitions 1 --replication-factor 3 --topic test_topic

    # 可以将副本指定到对应的节点,结合写个小工具生成分布字符串,方便做隔离,

    ./kafka-topics.sh --zookeeper=zk_server --create --replica-assignment 1:3,2:1,3:2 --topic test_topic

    # 查看topic列表,看是否创建成功

    ./kafka-topics.sh --zookeeper=zk_server --list

    ./kafka-topics.sh --zookeeper=zk_server --alter --topic test_topic --partitions 3

    ./kafka-topics.sh --zookeeper=zk_server --alter --partitions 3 --topic test_topic --replica-assignment 1:3,2:1,3:2

    ./kafka-configs.sh --zookeeper=zk_server --alter --entity-name test_topic --entity-type topics --delete-config retention.ms

    # 迁移相关,可先修改保存时间避免大量数据同步

    ./kafka-configs.sh --zookeeper=zk_server --alter --entity-type topics --add-config retention.ms=86400000 --entity-name test_topic

    # 第一步:指定topic

    echo {\"topics\": [{\"topic\": \"test_topic\"}],\"version\": 1} > ../conf/topic.json

    # 第二步:生成分配策略

    ./kafka-reassign-partitions.sh --zookeeper=zk_server --topics-to-move-json-file ../conf/topic.json --broker-list "1,2,3" --generate

    # 第三步:不限速执行迁移(没有流量推荐使用,有流量慎重使用)

    ./kafka-reassign-partitions.sh --zookeeper=zk_server --reassignment-json-file ../conf/reassignment.json --execute

    # 第四步:限速执行迁移(推荐使用),迁移完成需执行下面的命令

    ./kafka-reassign-partitions.sh --zookeeper=zk_server --reassignment-json-file ../conf/reassignment.json --execute --throttle 52400000

    # 切换目录迁移:any同时限速(如果是鉴权集群可能要加一些账密配置)

    ./kafka-reassign-partitions.sh --zookeeper=zk_server --bootstrap-server broker:port --reassignment-json-file ../conf/reassignment.json --execute --replica-alter-log-dirs-throttle 52400000

    # 第五步:查看迁移进度,如果全部迁移完成执行该命令移除限速

    ./kafka-reassign-partitions.sh --zookeeper=zk_server --reassignment-json-file ../conf/reassignment.json --verify

    ./kafka-reassign-partitions.sh --zookeeper=zk_server --bootstrap-server broker:port --reassignment-json-file ../conf/reassignment.json --verify

    # 指定均衡

    ./kafka-preferred-replica-election.sh --zookeeper=zk_server --path-to-json-file ../conf/reassignment.json

    # 全局均衡(生产慎用)

    ./kafka-preferred-replica-election.sh --zookeeper=zk_server

    # 授权相关,查看授权列表

    ./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --list

    # 查看用户信息

    ./kafka-configs.sh --zookeeper=zk_server --describe --entity-name test_user --entity-type users

    # 添加账户

    ./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=test_user]' --entity-type users --entity-name test_user

    # 添加写

    ./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --add --allow-principal User:'test_user' --allow-host '*' --operation Write --topic 'test_topic'

    # 添加读

    ./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --add --allow-principal User:'test_user' --allow-host '*' --operation Read --topic 'test_topic' --group 'group_test_topic'

    # 事务-账户幂等写入集群

    ./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --add --allow-principal User:'test_user' --allow-host '*' --operation IdempotentWrite --cluster kafka-cluster

    # 事务-账户授事务ID的所有权限

    ./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --add --allow-principal User:'test_user' --allow-host '*' --operation ALL --transactional-id '*'

    # 移除授权(生产慎用)

    # ./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --remove --allow-principal User:'test_user' --operation Read --topic 'test_topic' --group 'group_test_topic' --allow-host '*'

    # ./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --remove --allow-principal User:'test_user' --operation Write --topic 'test_topic' --allow-host '*'

    # LAG

    ./kafka-consumer-groups.sh --bootstrap-server broker:port --describe --group group_test_topic

    # 消费内部topic数据

    ./kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server broker:port --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"  --max-messages 10000 > ./group.txt

    ./kafka-console-consumer.sh --topic __consumer_offsets --partition 1 --bootstrap-server broker:port --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

    # 设置到某个时间-新加坡不用调整时差,国内有8h时差,比如:国内需要设置为14:00开始,则:2021-02-25T06:00:00.000

    ./kafka-consumer-groups.sh --bootstrap-server broker:port --topic test_topic --reset-offsets --to-datetime 2021-06-25T23:00:00.000 --group group_test_topic --execute

    # 控制台生产

    ./kafka-console-producer.sh --topic resource_author_pond --broker-list broker:port

    # 控制台消费

    ./kafka-console-consumer.sh --bootstrap-server broker:port --topic test_topic --from-beginning

    ./kafka-console-consumer.sh --bootstrap-server broker:port --max-messages 100 --topic test_topic > ./records.txt

    # 限速相关

    # 用户-clientId

    ./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=700,request_percentage=200' --entity-type users --entity-name test_user --entity-type clients --entity-name my_client_id

    # 删除

    ./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'producer_byte_rate,consumer_byte_rate,request_percentage' --entity-type users --entity-name test_user --entity-type clients --entity-name my_client_id

    # clientId

    ./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA

    # 删除

    ./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'producer_byte_rate,consumer_byte_rate,request_percentage' --entity-type clients --entity-name my_client_id

    # 添加限速[关于broker建的副本同步]

    ./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'leader.replication.throttled.rate=629145600,follower.replication.throttled.rate=629145600' --entity-type brokers --entity-name 1

    # 必须同步设置topic

    ./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test_topic

    # 移除限速

    ./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-type brokers --entity-name 1

    ./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-type topics --entity-name test_topic

    ./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'consumer_byte_rate' --entity-type users --entity-name test_user --entity-type clients --entity-name 'my_client_id'

    # 查看文件内容

    ./kafka-run-class.sh kafka.tools.DumpLogSegments --files ./xxxxx.log --print-data-log | less

    ## 修改消费组偏移量,最近

    ./kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server broker:port --describe --group group_test_topic

    ./kafka-consumer-groups.sh --bootstrap-server broker:port --topic test_topic --reset-offsets --to-earliest --execute --group group_test_topic

    ## 修改消费组偏移量,任意

    ./kafka-consumer-groups.sh --bootstrap-server broker:port --topic test_topic --reset-offsets --to-offset 174000 --execute --group group_test_topic

    ## 修改消费组偏移量,最开始

    ./kafka-consumer-groups.sh --bootstrap-server broker:port --topic test_topic --reset-offsets --to-latest --execute --group group_test_topic

    # 压测

    ./kafka-producer-perf-test.sh --topic tiger_test --num-records 5000000000000 --record-size 10240 --throughput 500000 --producer-props bootstrap.servers=broker:port

    ./kafka-consumer-perf-test.sh --broker-list broker:port --topic test_topic --fetch-size 1048576 --messages 10000000 --threads 1

    相关文章

      网友评论

          本文标题:【kafka】2022-02-25【常用命令】

          本文链接:https://www.haomeiwen.com/subject/vxixrrtx.html