第一次了解kafka不是很深入,后期会继续补更,完善~
kafaka工作原理:生产者生产消息-->消费者接收消息后消费
1. 消费者工作原理
连接kafka
先设置消费组id,并制定消费哪个topic:
# trans
trans_cons = Consumer(
{**sys_conf.get_kafka_config(consumer_id=f'{func_mark}trans')}
)
trans_cons.subscribe(trans_topics)
消费对应topic的消息:
msgs = cons.consume(sys_conf.CONSUMER_BZ, sys_conf.CONSUMER_TIMEOUT)
因为msgs里面有很多个消息,需要将消息一个个解读出来然后计算处理,最后输入到数据库或者将结果塞进生产者中推到kafka等对方进行消费。
将msg解析:
for msg in msgs:
print(msg.value())
如需转发请加载连接https://www.jianshu.com/p/6e00abd7780a
网友评论