美文网首页
python 操作 kafka consumer设置重启时从最新

python 操作 kafka consumer设置重启时从最新

作者: HAO延WEI | 来源:发表于2020-07-10 10:30 被阅读0次

    1.1 安装

    pip install kafka-python

    1.2 消费

    # -*- coding: utf-8 -*-
    
    """
    Create by Mr.Hao on 2019/12/6.
    """
    
    
    
    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
       'test', # 指定topic
        bootstrap_servers = "127.0.0.1:9092", # kafka集群地址
        group_id = "newConsumerTest1", # 消费组id
        client_id = '8eaa8c81edfd41f28a50f9121ad14572',
        auto_offset_reset="latest"
        max_poll_records=10, # 每次最大消费数量
        enable_auto_commit = True, # 每过一段时间自动提交所有已消费的消息(在迭代时提交)
        auto_commit_interval_ms = 5000, # 自动提交的周期(毫秒)
    )
    
    #consumer.subscribe(["auto_datacenter_spider_snapshot"])
    """
    使用seek_to_end函数,seek_to_end会直接将位置定位到最新数据。
    但是在之前需要poll一次数据,不然会报没有分配partition的错误。
    这说明我们的框架也是懒加载的,只有在具体poll数据的时候才会分配partition
    """
    res = consumer.poll(10)
    consumer.seek_to_end()
    for msg in consumer: # 迭代器,等待下一条消息
        offset, value = msg.offset, msg.value
        print value
    

    相关文章

      网友评论

          本文标题:python 操作 kafka consumer设置重启时从最新

          本文链接:https://www.haomeiwen.com/subject/eiypcktx.html