美文网首页
python操作kafka

python操作kafka

作者: Hmcf | 来源:发表于2019-11-23 15:34 被阅读0次

    官方手册,点关注不迷路
    https://kafka-python.readthedocs.io/en/master/usage.html

    更多内容可参考手册,本实例只讲几个基本常用内容。

    import json
    from kafka import KafkaProducer
    
    producer = KafkaProducer(bootstrap_servers=['192.168.64.11:9092', '192.168.64.12:9092', '192.168.64.13:9092'],
                             value_serializer=lambda m: json.dumps(m).encode('ascii')
                             )
    
    msg = {
        "sleep_time": 10,
        "db_config": {
            "database": "test_1",
            "host": "xxxx",
            "user": "root",
            "password": "root"
        },
        "table": "msg",
        "msg": "Hello World"
    }
    # 向指定topic 的 指定partition发送数据
    producer.send('hmcf_test', msg, partition=0)
    print('over')
    producer.flush()
    print('flush over')
    producer.close()
    print('guanbi')
    
    
    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    import time
    
    def main():
        consumer = KafkaConsumer(bootstrap_servers=['192.168.64.11:9092', '192.168.64.12:9092', '192.168.64.13:9092'],
                                 group_id='001',
                                 auto_offset_reset='earliest')
    
        # ========================================================================================================
        # 指定消费哪个topic和topic里面的哪个partition
        # consumer.assign([TopicPartition(topic='hmcf_test', partition=0), TopicPartition(topic='hmcf_test', partition=1)])
        # # 查看topic 有哪几个分区
        # # print(consumer.partitions_for_topic("hmcf_test"))
        #
        # # 动态偏移指针,获取指定位置开始的数据
        # consumer.seek(TopicPartition(topic='hmcf_test', partition=0), 0)
        # consumer.seek(TopicPartition(topic='hmcf_test', partition=1), 0)
        # for msg in consumer:
        #     recv = "topic:%s  partition:%d offset:%d key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        #     print(recv)
        # ========================================================================================================
        # subscribe 和 assign 只能2选1
        consumer.subscribe(topics=['hmcf_test'])
        while 1:
            # 主动拉取数据
            msg = consumer.poll(timeout_ms=5)
            print(msg)
            time.sleep(2)
    
    
    if __name__ == '__main__':
        main()
    
    

    相关文章

      网友评论

          本文标题:python操作kafka

          本文链接:https://www.haomeiwen.com/subject/svyhwctx.html