公司的kafka做了安全升级,加入了sasl认证。
使用confluent_kafka 进行认证连接kafka
首先安装confluent_kafka
pip install confluent_kafka
生产端示例代码
import json
from datetimeimport datetime
from confluent_kafkaimport Producer
topic_name ='TOPIC_NAME'
conf = {
'bootstrap.servers':'XXXX:xx,XXXXX:XX',
'security.protocol':'SASL_PLAINTEXT',
'sasl.mechanisms':'PLAIN',
'sasl.username':'XX',
'sasl.password':'XXXXXXXX'
}
def delivery_report(err, msg):
if erris not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
producer = Producer(**conf)
data = {
'name':'sheng',
'time':str(datetime.now())
}
for iin range(10):
#print(data)
producer.produce(topic_name, (json.dumps(data)).encode(), callback=delivery_report)
producer.flush()
消费端代码
from confluent_kafkaimport Consumer
topic_name ='TOPIC_NAME'
KAFKA_BROKER_SERVERS ="XXX:xx,XXXX:xx"
consumer = Consumer({
'bootstrap.servers': KAFKA_BROKER_SERVERS,
'group.id':'test_sasl',
'auto.offset.reset':'earliest',
'security.protocol':'SASL_PLAINTEXT',
'sasl.mechanisms':'PLAIN',
'sasl.username':'XX',
'sasl.password':'XXXXXXXX'
})
consumer.subscribe([topic_name])
while True:
msg = consumer.poll(1.0)
if msgis None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
consumer.close()
网友评论