#!/usr/bin/env python
# coding:utf-8
import sys
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
if __name__ == '__main__':
if (len(sys.argv) < 6):
print("usage <kafkaHost> <kafkaPort> <groupid> <topic> <partition> <offset>")
sys.exit(0)
kafkaHost = sys.argv[1]
kafkaPort = sys.argv[2]
groupid = sys.argv[3]
topic = sys.argv[4]
partition = int(sys.argv[5])
offset = int(sys.argv[6])
# init kafka consumer
consumer = KafkaConsumer(group_id=groupid,
bootstrap_servers='{kafka_host}:{kafka_port}'.format(
kafka_host=kafkaHost, kafka_port=kafkaPort))
# 分配topic and partition
consumer.assign([TopicPartition(topic, partition)])
offsets = {}
meta = consumer.partitions_for_topic(topic)
offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, meta)
consumer.seek(TopicPartition(topic, partition), offset)
consumer.commit(offsets)
网友评论