美文网首页
kafka topic数据转移到另一个topic

kafka topic数据转移到另一个topic

作者: 君子月满楼 | 来源:发表于2017-09-29 10:12 被阅读84次

    需求说明:数据在不断打到kakfa A队列,B队列也需要这些数据,由于数据量非常大,不方便人工操作,所以有了以下代码。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import json
    from kafka import KafkaClient, SimpleProducer, SimpleConsumer
    
    
    class Kafka_producer():
        '''
        使用kafka的生产模块
        '''
    
        def __init__(self, kafkahost,kafkaport, kafkatopic):
            self.client = KafkaClient("%s:%s"%(kafkahost,kafkaport))
            self.producer = SimpleProducer(self.client)
            self.kafkatopic = kafkatopic
    
        def sendjsondata(self, msg):
            try:
                self.producer.send_messages(self.kafkatopic,str(msg))
            except KeyboardInterrupt,  e:
                print e
    
    
    class Kafka_consumer():
        '''
        使用Kafka—python的消费模块
        '''
    
        def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
            self.client = KafkaClient("%s:%s"%(kafkahost,kafkaport))
            self.kafkatopic = kafkatopic
            self.groupid = groupid
            self.consumer = SimpleConsumer(self.client, self.groupid, self.kafkatopic)
    
    
        def consume_data(self):
            try:
                for message in self.consumer:
                    # print json.loads(message.value)
                    yield message
            except KeyboardInterrupt, e:
                print e
    
    def main():
        '''
        将"bsaata_tcp_tmp"队列中数据打到"internal_app_bsaata.nta_event_tmp"队列
        '''
        ##测试生产模块
        
        consumer = Kafka_consumer('10.67.19.12', 9092, "bsaata_tcp_tmp", 'com.nsfocus.bsaata.merge')
        message = consumer.consume_data()
        producer = Kafka_producer('10.67.19.12', 9092, "internal_app_bsaata.nta_event_tmp")
        for i in message:
            producer.sendjsondata(i.message.value)
    
    
    if __name__ == '__main__':
        main()
    

    备注:
    1、消费者端口号目前尝试结果是:不能使用2181,只能使用9092,原因未知。
    2、查看topic的groupid方法:到zookeeper组件路径下的bin中运行zkCli.sh,然后一个节点的找,直到找到topic名字,其父节点的父节点就是groupid。

    [zk: localhost:2181(CONNECTED) 5] ls /consumers/com.nsfocus.bsaata.merge/offsets
    [internal_app_bsaata.nta_srcip_traffic_tmp, internal_app_bsaata.nta_attackip_traffic_tmp, internal_app_bsaata.nta_event_tmp, bsaata_tcp_tmp, internal_app_bsaata.nta_traffic_total_tmp, internal_app_bsaata.nta_inf_traffic_tmp]
    

    3、从消费者API中获取的message不可以直接打入B队列。因为message类型不是str,message.message.value才是真正的数据。

    相关文章

      网友评论

          本文标题:kafka topic数据转移到另一个topic

          本文链接:https://www.haomeiwen.com/subject/monbextx.html