美文网首页
Python kafka 操作 带SASL鉴权

Python kafka 操作 带SASL鉴权

作者: Sunnky | 来源:发表于2019-12-03 17:19 被阅读0次

    python-kafka 不支持 SASL_PLAINTEXTSCRAM-SHA-256,所以选择confluent_kafka

    1. 客户端
    from confluent_kafka import Consumer, KafkaError
    
    
    topic_name = 'test-topic'
    KAFKA_BROKER_SERVERS = "127.0.0.1:9092"
    consumer = Consumer({
        'bootstrap.servers': KAFKA_BROKER_SERVERS,
        'group.id': 'omega-self-approve',
        'auto.offset.reset': 'earliest',
        'security.protocol': 'SASL_PLAINTEXT',
        'sasl.mechanisms': 'SCRAM-SHA-256',
        'sasl.username': 'admin',
        'sasl.password': 'admin',
    })
    
    consumer.subscribe([topic_name])
    
    while True:
        msg = consumer.poll(1.0)
    
        if msg is None:
            continue
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue
    
        print('Received message: {}'.format(msg.value().decode('utf-8')))
    
    consumer.close()
    
    
    1. 服务端
    import json
    from confluent_kafka import Producer
    
    
    topic_name = 'test-topic'
    
    conf = {
        'bootstrap.servers': '127.0.0.1:9092',
        'security.protocol': 'SASL_PLAINTEXT',
        'sasl.mechanisms': 'SCRAM-SHA-256',
        'sasl.username': 'admin',
        'sasl.password': 'admin',
    }
    
    
    def delivery_report(err, msg):
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    
    
    producer = Producer(**conf)
    for i in range(1):
        data = {"test": 1, "test2": 2}
        producer.produce(topic_name, (json.dumps(data)).encode(), callback=delivery_report)
    
    producer.flush()
    

    相关文章

      网友评论

          本文标题:Python kafka 操作 带SASL鉴权

          本文链接:https://www.haomeiwen.com/subject/vgfhgctx.html