需求说明:数据在不断打到kakfa A队列,B队列也需要这些数据,由于数据量非常大,不方便人工操作,所以有了以下代码。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from kafka import KafkaClient, SimpleProducer, SimpleConsumer
class Kafka_producer():
'''
使用kafka的生产模块
'''
def __init__(self, kafkahost,kafkaport, kafkatopic):
self.client = KafkaClient("%s:%s"%(kafkahost,kafkaport))
self.producer = SimpleProducer(self.client)
self.kafkatopic = kafkatopic
def sendjsondata(self, msg):
try:
self.producer.send_messages(self.kafkatopic,str(msg))
except KeyboardInterrupt, e:
print e
class Kafka_consumer():
'''
使用Kafka—python的消费模块
'''
def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
self.client = KafkaClient("%s:%s"%(kafkahost,kafkaport))
self.kafkatopic = kafkatopic
self.groupid = groupid
self.consumer = SimpleConsumer(self.client, self.groupid, self.kafkatopic)
def consume_data(self):
try:
for message in self.consumer:
# print json.loads(message.value)
yield message
except KeyboardInterrupt, e:
print e
def main():
'''
将"bsaata_tcp_tmp"队列中数据打到"internal_app_bsaata.nta_event_tmp"队列
'''
##测试生产模块
consumer = Kafka_consumer('10.67.19.12', 9092, "bsaata_tcp_tmp", 'com.nsfocus.bsaata.merge')
message = consumer.consume_data()
producer = Kafka_producer('10.67.19.12', 9092, "internal_app_bsaata.nta_event_tmp")
for i in message:
producer.sendjsondata(i.message.value)
if __name__ == '__main__':
main()
备注:
1、消费者端口号目前尝试结果是:不能使用2181,只能使用9092,原因未知。
2、查看topic的groupid方法:到zookeeper组件路径下的bin中运行zkCli.sh,然后一个节点的找,直到找到topic名字,其父节点的父节点就是groupid。
[zk: localhost:2181(CONNECTED) 5] ls /consumers/com.nsfocus.bsaata.merge/offsets
[internal_app_bsaata.nta_srcip_traffic_tmp, internal_app_bsaata.nta_attackip_traffic_tmp, internal_app_bsaata.nta_event_tmp, bsaata_tcp_tmp, internal_app_bsaata.nta_traffic_total_tmp, internal_app_bsaata.nta_inf_traffic_tmp]
3、从消费者API中获取的message不可以直接打入B队列。因为message类型不是str,message.message.value才是真正的数据。
网友评论