# -*- coding: utf-8 -*-
"""
Create by Mr.Hao on 2019/12/6.
"""
#pip install kafka-python
import hashlib
import os
import time
import pymysql
import json
import requests
from kafka import KafkaConsumer
from kafka import TopicPartition
consumer = KafkaConsumer(
bootstrap_servers = "127.0.0.1:9092", # kafka集群地址
group_id = "newConsumerTest1", # 消费组id
client_id = '8eaa8c81edfd41f28a50f9121ad14572',
auto_offset_reset="latest",
max_poll_records=10, # 每次最大消费数量
enable_auto_commit = True, # 每过一段时间自动提交所有已消费的消息(在迭代时提交)
auto_commit_interval_ms = 5000, # 自动提交的周期(毫秒)
)
partition = TopicPartition('auto_datacenter_spider_snapshot', 4)
res = consumer.poll(10)
start = 20905270
end = 20905280
consumer.assign([partition])
consumer.seek(partition, offset=start)
#consumer.seek_to_end() 默认读取最新数据
for msg in consumer: # 迭代器,等待下一条消息
offset, value = msg.offset, msg.value
if msg.offset > end:
break
jdate = json.loads(value)
print offset,"====>>>>",jdate.get("crawler_time"), jdate.get("taskId")," url_md5:", jdate.get("url_md5")
网友评论