简单记录一个读写kafka demo
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import logging
import time # 引入time模块
from kafka import KafkaConsumer
from kafka import KafkaProducer
from kafka.errors import KafkaError
# 循环发送数据次数
n = 1
#数据从174获取,发送到175
KAFAKA_TOPIC = "poseidon_receiver_raw_data_RTCM3.2"
KAFAKA_HOST_PRODUCTER = "192.168.xx.xx"
KAFAKA_HOST_CONSUMER = "192.168.xx.xx"
KAFAKA_PORT = 9092
logging.basicConfig(
level=logging.INFO, # 定义输出到文件的log级别,大于此级别的都被输出
# format='%(asctime)s %(filename)s %(levelno)s : %(levelname)s %(message)s', # 定义输出log的格式
format='%(asctime)s : %(message)s', # 定义输出log的格式
datefmt='%Y-%m-%d %A %H:%M:%S', # 时间
filename='obs_info.log', # log文件名
filemode='w') # 写入模式“w”或“a”
class Kafka_producer():
'''''
生产模块:根据不同的key,区分消息
'''
def __init__(self, kafkahost, kafkaport, kafkatopic):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
print("producer:h,p,t", kafkahost, kafkaport, kafkatopic)
bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort
)
print("boot svr:", bootstrap_servers)
self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
def send(self, k, v):
try:
producer = self.producer
k = k.encode('utf-8')
resp = producer.send(self.kafkatopic, key=k, value=v)
# print(resp.succeeded())
producer.flush()
except KafkaError as e:
print(e)
class Kafka_consumer():
def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.groupid = groupid
self.consumer = KafkaConsumer(self.kafkatopic, group_id=self.groupid,
bootstrap_servers='{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort)
)
def consume_data(self):
try:
for message in self.consumer:
yield message
except KeyboardInterrupt as e:
print(e)
if __name__ == '__main__':
group = 'tunnel2QA'
consumer = Kafka_consumer(KAFAKA_HOST_PRODUCTER, KAFAKA_PORT, KAFAKA_TOPIC, group)
producer = Kafka_producer(KAFAKA_HOST_CONSUMER, KAFAKA_PORT, KAFAKA_TOPIC)
message = consumer.consume_data()
pre_time = 0
for msg in message:
key = str(msg.key, "utf-8")
ticks = int(time.time())
if pre_time != ticks:
for i in range(0, n):
key = i << 8 + 1 # key自定义
producer.send(str(key), msg.value)
pre_time = ticks
网友评论