美文网首页
kafka集群-python

kafka集群-python

作者: heliping_peter | 来源:发表于2018-04-24 14:00 被阅读90次

    安装问题

    • 1.出现broker id报错
      集群启动时,需要不同的broker id,如果之前配置的一样,那么修改了server.properties,还需要修改/tmp/kafka-logs/meta.properties
    • 2.需要修改配置文件中的log.dirs
      集群中的host不能使用一样的。

    测试

    • 1.生产者
    import time
    from kafka import SimpleProducer, KafkaClient
    from kafka import KafkaProducer
    
    
    producer = KafkaProducer(bootstrap_servers = ['192.168.124.116:9092', '192.168.124.116:9092', '192.168.124.116:9092'])
    # Assign a topic
    topic = 'my-topic'
    
    def test():
        print('begin')
        n = 1
        while (n<=100):
            producer.send(topic, 'I am'+' '+str(n))
            print "send" + str(n)
            n += 1
            time.sleep(0.5)
        print('done')
    
    if __name__ == '__main__':
        test()
    
    • 2.消费者
    from kafka import KafkaConsumer
    
    #connect to Kafka server and pass the topic we want to consume
    consumer = KafkaConsumer('my-topic', bootstrap_servers = ['192.168.124.116:9092', '192.168.124.116:9092', '192.168.124.116:9092'])
    
    for msg in consumer:
        print msg
    

    多个topic并发

    python3有差异,传入topic的内容需要byte,所以做了变换。

    import time
    import threading
    from kafka import SimpleProducer, KafkaClient
    from kafka import KafkaProducer
    
    
    producer = KafkaProducer(bootstrap_servers = ['192.168.124.116:9092', '192.168.124.116:9092', '192.168.124.116:9092'])
    
    
    def test(topic,content):
        print('begin')
        n = 1
        while (n<=50):
            aa = content +' '+ str(n)
            contentb = bytes(aa, encoding="utf8")
            producer.send(topic, contentb)
            print("send" + str(n))
            n += 1
            time.sleep(0.5)
        print('done')
    
    if __name__ == '__main__':
        for i in range(1,3):
            topic = 'my-topic' + str(i)
            print(topic)
            content = str(i)+' '+ 'is'
            thread = threading.Thread(target=test, name='test', args=(topic, content))
            thread.start()
    

    相关文章

      网友评论

          本文标题:kafka集群-python

          本文链接:https://www.haomeiwen.com/subject/lthjlftx.html