美文网首页
Kafka多分区下二分法查找指定时间戳的offset

Kafka多分区下二分法查找指定时间戳的offset

作者: 悠扬前奏 | 来源:发表于2021-11-04 14:28 被阅读0次

    python消费Kafka的时候,不能指定时间戳开始消费,只能指定offset,因此需要先找到指定时间戳所在的offset再消费。百度找到的文章都是单分区下的查找方法,多分区时需要做一定的修改,记录下代码:

    import time
    
    from kafka import KafkaConsumer, TopicPartition
    
    
    def from_timestamp(timestamp):
        """
        将long型的时间戳转换为格式话的限制方式
        :param timestamp:
        :return:
        """
        timeArray = time.localtime(timestamp)
        otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
        return otherStyleTime
    
    
    # 目标时间戳
    timestamp = 1635696000000
    
    # 指定消费者
    consumer = KafkaConsumer(bootstrap_servers=['xxx.xxx.xxx.xxx:9092'], 
                             auto_offset_reset='oldest',
                             max_poll_records=10000,
                             max_poll_interval_ms=500
                             )
    # 指定partition信息
    tp0 = TopicPartition('tpc_bd_hu_track', 0)
    tp1 = TopicPartition('tpc_bd_hu_track', 1)
    tp2 = TopicPartition('tpc_bd_hu_track', 2)
    
    tp_tuple = (tp0, tp1, tp2)
    
    consumer.assign(tp_tuple)
    
    # 二分法找到指定时间戳啊的offset
    tp0_start_offset = consumer.beginning_offsets(tp_tuple).get(tp0)
    tp1_start_offset = consumer.beginning_offsets(tp_tuple).get(tp1)
    tp2_start_offset = consumer.beginning_offsets(tp_tuple).get(tp2)
    
    tp0_end_offset = consumer.end_offsets(tp_tuple).get(tp0)
    tp1_end_offset = consumer.end_offsets(tp_tuple).get(tp1)
    tp2_end_offset = consumer.end_offsets(tp_tuple).get(tp2)
    
    tp0_nos = int((tp0_start_offset + tp0_end_offset) / 2)
    tp1_nos = int((tp1_start_offset + tp1_end_offset) / 2)
    tp2_nos = int((tp2_start_offset + tp2_end_offset) / 2)
    
    # 目标时间戳
    base_timestamp = 1635609600000
    
    print("Partition-0: ", tp0_start_offset, tp0_nos, tp0_end_offset)
    while True:
        consumer.seek(tp0, tp0_nos)
        consumer.seek(tp1, tp1_nos)
        consumer.seek(tp2, tp2_nos)
        res = consumer.poll(timeout_ms=10000, max_records=1)
        if tp0 in res:
            res_record = res[tp0][0]
            the_time = res_record.timestamp
            if the_time < base_timestamp:
                tp0_start_offset, tp0_nos = tp0_nos, int((tp0_nos + tp0_end_offset) / 2)
                print("Partition-0: ", tp0_start_offset, tp0_nos, tp0_end_offset,
                      from_timestamp(res_record.timestamp / 1000),
                      'offset:',
                      res_record.offset)
            elif the_time > base_timestamp:
                tp0_nos, tp0_end_offset = int((tp0_nos + tp0_start_offset) / 2), tp0_nos
                print("Partition-0: ", tp0_start_offset, tp0_nos, tp0_end_offset,
                      from_timestamp(res_record.timestamp / 1000),
                      'offset:',
                      res_record.offset)
            else:
                continue
        elif tp1 in res:
            res_record = res[tp1][0]
            the_time = res_record.timestamp
            if the_time < base_timestamp:
                tp1_start_offset, tp1_nos = tp1_nos, int((tp1_nos + tp1_end_offset) / 2)
                print("Partition-1: ", tp1_start_offset, tp1_nos, tp1_end_offset,
                      from_timestamp(res_record.timestamp / 1000),
                      'offset:',
                      res_record.offset)
            elif the_time > base_timestamp:
                tp1_nos, tp1_end_offset = int((tp1_nos + tp1_start_offset) / 2), tp1_nos
                print("Partition-1: ", tp1_start_offset, tp1_nos, tp1_end_offset,
                      from_timestamp(res_record.timestamp / 1000),
                      'offset:',
                      res_record.offset)
            else:
                continue
        elif tp2 in res:
            res_record = res[tp2][0]
            the_time = res_record.timestamp
            if the_time < base_timestamp:
                tp2_start_offset, tp2_nos = tp2_nos, int((tp2_nos + tp2_end_offset) / 2)
                print("Partition-2: ", tp2_start_offset, tp2_nos, tp2_end_offset,
                      from_timestamp(res_record.timestamp / 1000),
                      'offset:', res_record.offset)
            elif the_time > base_timestamp:
                tp2_nos, tp2_end_offset = int((tp2_nos + tp2_start_offset) / 2), tp2_nos
                print("Partition-2: ", tp2_start_offset, tp2_nos, tp2_end_offset,
                      from_timestamp(res_record.timestamp / 1000),
                      'offset:', res_record.offset)
            else
                continue
        if (tp0_nos == tp0_end_offset or tp0_start_offset == tp0_nos) \
                and (tp1_nos == tp1_end_offset or tp1_start_offset == tp1_nos) \
                and (tp2_nos == tp2_end_offset or tp2_start_offset == tp2_nos):
            break
    
    print('Partition-0: ', tp0_start_offset, tp0_nos, tp0_end_offset)
    print('Partition-1: ', tp1_start_offset, tp1_nos, tp1_end_offset)
    print('Partition-2: ', tp2_start_offset, tp2_nos, tp2_end_offset)
    
    

    相关文章

      网友评论

          本文标题:Kafka多分区下二分法查找指定时间戳的offset

          本文链接:https://www.haomeiwen.com/subject/rwhkzltx.html