美文网首页
消费者offset的存储

消费者offset的存储

作者: Shaw_Young | 来源:发表于2022-02-16 17:18 被阅读0次

    由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

    创建一个topic

    kafka-topics.sh --create --topic bigdata --zookeeper zkKafka1:2181 --partitions 2 --replication-factor 2
    
    Created topic "bigdata".
    

    连接上这个topic准备发消息

     kafka-console-producer.sh --broker-list zkKafka1:9092 --topic bigdata
    > hello
    

    启动一个消费者消费消息

     kafka-console-consumer.sh --zookeeper zkKafka1:2181 --topic bigdata
    

    查看zk

    zkCli.sh
    
    [zk: localhost:2181(CONNECTED) 0] ls /
    [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, kafka-manager, latest_producer_id_block, zookeeper]
    
    [zk: localhost:2181(CONNECTED) 2] ls /brokers
    [ids, seqid, topics]
    
    [zk: localhost:2181(CONNECTED) 3] ls /brokers/ids
    [0, 1, 2]
    
    [zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
    [__consumer_offsets, bigdata, first]
    
    [zk: localhost:2181(CONNECTED) 5] ls /consumers
    # 消费者组
    [console-consumer-90976]
    
    [zk: localhost:2181(CONNECTED) 6] ls /consumers/console-consumer-90976 
    [ids, offsets, owners]
    
    [zk: localhost:2181(CONNECTED) 7] ls /consumers/console-consumer-90976/offsets
    [bigdata]
    
    [zk: localhost:2181(CONNECTED) 8] ls /consumers/console-consumer-90976/offsets/bigdata 
    [0, 1]
    
    [zk: localhost:2181(CONNECTED) 10] get /consumers/console-consumer-90976/offsets/bigdata/0
    1
    

    Kafka 0.9 版本之前,consumer默认将offset保存在zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets。

    1)修改配置文件consumer.properties

    exclude.internal.topics=false
    

    2)读取offset

    # 重新启动一个消费者
    kafka-console-consumer.sh --bootstrap-server zkKafka1:9092 --topic bigdata
    
    bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper zkKafka1:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.conf
    ig config/consumer.properties --from-beginning
    
    [console-consumer-17599,bigdata,0]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921362785,ExpirationTime 1645007762785]
    [console-consumer-17599,bigdata,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921362785,ExpirationTime 1645007762785]
    [console-consumer-17599,bigdata,0]::[OffsetMetadata[3,NO_METADATA],CommitTime 1644921367772,ExpirationTime 1645007767772]
    [console-consumer-17599,bigdata,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921367772,ExpirationTime 1645007767772]
    [console-consumer-17599,bigdata,0]::[OffsetMetadata[3,NO_METADATA],CommitTime 1644921372775,ExpirationTime 1645007772775]
    [console-consumer-17599,bigdata,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921372775,ExpirationTime 1645007772775]
    [console-consumer-17599,bigdata,0]::[OffsetMetadata[3,NO_METADATA],CommitTime 1644921377777,ExpirationTime 1645007777777]
    [console-consumer-17599,bigdata,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921377777,ExpirationTime 1645007777777]
    

    相关文章

      网友评论

          本文标题:消费者offset的存储

          本文链接:https://www.haomeiwen.com/subject/hikllrtx.html