美文网首页
python从kafka消费数据且写入kafka

python从kafka消费数据且写入kafka

作者: hao_yu | 来源:发表于2021-01-13 13:53 被阅读0次

简单记录一个读写kafka demo

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import logging
import time  # 引入time模块
from kafka import KafkaConsumer
from kafka import KafkaProducer
from kafka.errors import KafkaError

# 循环发送数据次数
n = 1

#数据从174获取,发送到175
KAFAKA_TOPIC = "poseidon_receiver_raw_data_RTCM3.2"
KAFAKA_HOST_PRODUCTER = "192.168.xx.xx"
KAFAKA_HOST_CONSUMER = "192.168.xx.xx"

KAFAKA_PORT = 9092

logging.basicConfig(
    level=logging.INFO,  # 定义输出到文件的log级别,大于此级别的都被输出
    # format='%(asctime)s  %(filename)s %(levelno)s : %(levelname)s %(message)s',  # 定义输出log的格式
    format='%(asctime)s : %(message)s',  # 定义输出log的格式
    datefmt='%Y-%m-%d %A %H:%M:%S',  # 时间
    filename='obs_info.log',  # log文件名
    filemode='w')  # 写入模式“w”或“a”


class Kafka_producer():
    '''''
    生产模块:根据不同的key,区分消息
    '''

    def __init__(self, kafkahost, kafkaport, kafkatopic):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        print("producer:h,p,t", kafkahost, kafkaport, kafkatopic)
        bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort
        )
        print("boot svr:", bootstrap_servers)
        self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

    def send(self, k, v):
        try:
            producer = self.producer

            k = k.encode('utf-8')
            resp = producer.send(self.kafkatopic, key=k, value=v)
            # print(resp.succeeded())
            producer.flush()
        except KafkaError as e:
            print(e)


class Kafka_consumer():
    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.groupid = groupid
        self.consumer = KafkaConsumer(self.kafkatopic, group_id=self.groupid,
                                      bootstrap_servers='{kafka_host}:{kafka_port}'.format(
                                          kafka_host=self.kafkaHost,
                                          kafka_port=self.kafkaPort)
                                      )

    def consume_data(self):
        try:
            for message in self.consumer:
                yield message
        except KeyboardInterrupt as e:
            print(e)


if __name__ == '__main__':
    group = 'tunnel2QA'
    consumer = Kafka_consumer(KAFAKA_HOST_PRODUCTER, KAFAKA_PORT, KAFAKA_TOPIC, group)
    producer = Kafka_producer(KAFAKA_HOST_CONSUMER, KAFAKA_PORT, KAFAKA_TOPIC)
    message = consumer.consume_data()
    pre_time = 0
    for msg in message:
        key = str(msg.key, "utf-8")

        ticks = int(time.time())
        if pre_time != ticks:
            for i in range(0, n):
                key = i << 8 + 1 # key自定义
                producer.send(str(key), msg.value)
        pre_time = ticks


相关文章

  • python从kafka消费数据且写入kafka

    简单记录一个读写kafka demo

  • python3读写kafka

    消费kafka数据,方式一 消费kafka数据,方式二 将消息写入kafka

  • Flink写入Hbase

    基本流程: 从Kafka中读取数据,再写入到Hbase。 写入Kafka代码 Flink写入Habse代码 pom...

  • Kafka - 新消费者

    Kafka - 新消费者 一、数据来源 数据使用上一个博文所配置的 Flume,将文本数据写入到 Kafka中。不...

  • Kafka0.8集群部署与shell命令行操作

    1、kafka简介在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。KAFKA...

  • kafka集群搭建

    1、kafka简介在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。KAFKA...

  • 数据写入kafka的分区策略

    众所周知,kafka有分区的概念,生产者写入数据到kafka,涉及到数据到底写到哪个分区?kafka api提供了...

  • Kafka_核心

    kafka集群 Kafka的设计都是为了实现kafak消息队列消费数据的语义Kafka消息队列中数据消费的三种语义...

  • Kafka详解

    Kafka Kafka 概述 kafka是什么? 在流式计算中,kafka一般用来储存缓存数据,Storm通过消费...

  • python启动线程

    python kafka多线程消费数据[https://www.cnblogs.com/qiaoer1993/p/...

网友评论

      本文标题:python从kafka消费数据且写入kafka

      本文链接:https://www.haomeiwen.com/subject/brmuaktx.html