美文网首页
用Python创建自定义的Kafka Topic

用Python创建自定义的Kafka Topic

作者: L3nvy | 来源:发表于2017-07-26 17:36 被阅读900次

    背景

    项目中需要创建分区(partitions)数不同的Topic。在server.properties中可以配置默认的Topic分区数量,但是不能在需要的时候任意改变。(使用Producer API会自动创建Topic)

    简单的Solution

    翻遍了Kafka-python的文档,没有发现kafka-python提供了类似client.create_topic(name='test', num_partitions=3)这样简单的API。只能往底层探索了,果然发现了两个关键信息。

    KafkaClient API中有这样一个方法:

    屏幕快照 2017-07-26 16.29.26.png

    kafka.protocol.admin — kafka-python 1.3.4.dev documentation中:

    屏幕快照 2017-07-26 16.30.46.png

    显然,我们只需要构建一个CreateTopicsRequest的请求,然后通过KafkaClient的send()方法发送给控制节点(由于本小白也不太清楚Kafka的机制,测试的时候,不是控制节点,会报错。也不清楚各个版本的区别,下面代码用的是v0版本。🤣)

    原理就是这样,还是很简单的。

    糟糕的Code

    
    def create_topic(self, topic='topic', num_partitions=3, configs=None, timeout_ms=3000, brokers=['localhost:9290'], no_partition_change=True):
    
            client = KafkaClient(bootstrap_servers=brokers)
            
            if topic not in client.cluster.topics(exclude_internal_topics=True): # Topic不存在
    
                request = admin.CreateTopicsRequest_v0(
                    create_topic_requests=[(
                        topic,
                        num_partitions,
                        -1, # replication unset.
                        [], # Partition assignment.
                        [(key, value) for key, value in configs.items()],  # Configs
                    )],
                    timeout=timeout_ms
                )
    
                future = client.send(2, request)  # 2是Controller,发送给其他Node都创建失败。
                client.poll(timeout_ms=timeout_ms, future=future, sleep=False) # 这里
    
                result = future.value
                # error_code = result.topic_error_codes[0][1]
                print("CREATE TOPIC RESPONSE: ", result)  # 0 success, 41 NOT_CONTROLLER, 36 ALREADY_EXISTS
                client.close()
            else: # Topic已经存在
                print("Topic already exists!")
                return
    

    最重要的Reference

    相关文章

      网友评论

          本文标题:用Python创建自定义的Kafka Topic

          本文链接:https://www.haomeiwen.com/subject/amsokxtx.html