美文网首页
python连接有sasl认证的kafka

python连接有sasl认证的kafka

作者: 大空翼123 | 来源:发表于2023-03-16 17:33 被阅读0次

    公司的kafka做了安全升级,加入了sasl认证。

    使用confluent_kafka  进行认证连接kafka

    首先安装confluent_kafka 

    pip install confluent_kafka

    生产端示例代码

    import json

    from datetimeimport datetime

    from confluent_kafkaimport Producer

    topic_name ='TOPIC_NAME'

    conf = {

    'bootstrap.servers':'XXXX:xx,XXXXX:XX',

        'security.protocol':'SASL_PLAINTEXT',

        'sasl.mechanisms':'PLAIN',

        'sasl.username':'XX',

        'sasl.password':'XXXXXXXX'

    }

    def delivery_report(err, msg):

    if erris not None:

    print('Message delivery failed: {}'.format(err))

    else:

    print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

    producer = Producer(**conf)

    data = {

    'name':'sheng',

        'time':str(datetime.now())

    }

    for iin range(10):

    #print(data)

        producer.produce(topic_name, (json.dumps(data)).encode(), callback=delivery_report)

    producer.flush()

    消费端代码

    from confluent_kafkaimport Consumer

    topic_name ='TOPIC_NAME'

    KAFKA_BROKER_SERVERS ="XXX:xx,XXXX:xx"

    consumer = Consumer({

    'bootstrap.servers': KAFKA_BROKER_SERVERS,

        'group.id':'test_sasl',

        'auto.offset.reset':'earliest',

        'security.protocol':'SASL_PLAINTEXT',

        'sasl.mechanisms':'PLAIN',

        'sasl.username':'XX',

        'sasl.password':'XXXXXXXX'

    })

    consumer.subscribe([topic_name])

    while True:

    msg = consumer.poll(1.0)

    if msgis None:

    continue

        if msg.error():

    print("Consumer error: {}".format(msg.error()))

    continue

        print('Received message: {}'.format(msg.value().decode('utf-8')))

    consumer.close()

    相关文章

      网友评论

          本文标题:python连接有sasl认证的kafka

          本文链接:https://www.haomeiwen.com/subject/ocddrdtx.html