美文网首页
kafka生产数据和多进程消费数据

kafka生产数据和多进程消费数据

作者: 千沙qiansha | 来源:发表于2021-04-25 15:18 被阅读0次

配置信息

    KAFKA_FROM_ALTER_MANGER_TOPIC = "alter_xxx_from_alter_manager_123"
    KAFKA_CUSTOMER_GROUP = "group_alter_xxx_from_manager"
    KAFKA_ALERT_MANAGER_CLUSTER = ['192.168.xx.xx:9092', '192.168.xx.xx:9092', '192.168.xx.xx:9092']

生产数据

from kafka import KafkaProducer
from django.conf import settings
import json
import os


class KP(object):
    """
    模拟kafka生产者往指定topic发布消息
    """

    p = None
    tp = None

    def __init__(self):
        self.tp = settings.KAFKA_FROM_ALTER_MANGER_TOPIC
        self.p = KafkaProducer(bootstrap_servers=settings.KAFKA_ALERT_MANAGER_CLUSTER)

    def send_msg(self, dat):
        try:
            d = json.dumps(dat)
        except Exception as e:
            print("json dumps error: {}".format(e))
            return

        try:
            future = self.p.send(self.tp, d.encode())
            self.p.flush()
            ret = future.get(timeout=1)
            print("return: {}".format(ret))
        except Exception as e:
            print("error: {}".format(e))

多进程消费数据

import json
import os
import multiprocessing
from django.conf import settings
from django.db import connections
from common.utils import get_logger
from kafka import KafkaConsumer
from datetime import datetime

logger = get_logger(__file__)


class AlertWatcher:
    """
    启动进程消费alert manager发布到kafka的告警消息
    处理消息并保持到数据库
    """
    def __init__(self):
        self.name = 'alter_msg_watcher'

    def run(self):
        executor = multiprocessing.Pool(processes=20)
        custom = KafkaConsumer(settings.KAFKA_FROM_ALTER_MANGER_TOPIC,
                               group_id=settings.KAFKA_CUSTOMER_GROUP,
                               bootstrap_servers=settings.KAFKA_ALERT_MANAGER_CLUSTER,
                               max_poll_records=1)
        for msg in custom:
            executor.apply_async(self.handle_alert_msg, (msg,))
        executor.close()

    @staticmethod
    def handle_alert_msg(_msg):
        # 关闭无效的mysql连接
        for conn in connections.all():
            conn.close_if_unusable_or_obsolete()

        # --- for loop sub kafka topic msg ---
        handle_start = datetime.now()
        logger.info("===handle alert=== %s pid:%s got alert msg from kafka: %s" % (handle_start, os.getpid(), _msg))

        try:
            dat = json.loads(_msg.value.decode())
        except Exception as e:
            logger.error("msg: {}, load err: {}".format(_msg, e))
            return

        # --- handle alert msg ---
        alerts = dat.get("alerts", None)
        if not alerts:
            logger.error("dat: {}, no ALTERS value.".format(dat))
            return

相关文章

  • kafka生产数据和多进程消费数据

    配置信息 生产数据 多进程消费数据

  • Could not find a 'KafkaClient' e

    Kafka提供了KafkaProducer和KakfaConsumer用于生产和消费数据。0.9之后的kafka集...

  • kafka优势和烦恼

    kafka优势 1.多生产者和多消费者 2.基于磁盘的数据存储,换句话说,Kafka的数据天生就是持久化的。 3....

  • Kafka中概念的简单汇总

    (1)Producer:Kafka生产者,负责往Kafka写数据的客户端;(2)Consumer: Kafka消费...

  • python3读写kafka

    消费kafka数据,方式一 消费kafka数据,方式二 将消息写入kafka

  • 优雅的使用Kafka Consumer

    如何消费数据 我们已经知道了如何发送数据到Kafka,既然有数据发送,那么肯定就有数据消费,消费者也是Kafka整...

  • 基本概念

    Producer:消息和数据的生产者,向kafka的一个topic发送消息的进程 Consumer:消息和数据的消...

  • Kafka-1.APIS

    Kafka包含5个核心APIs: 生产者API,向Kafka集群中的主题发送数据流; 消费者API,从Kafka集...

  • Kafka实际案例问题

    kafka consumer防止数据丢失 Kafka学习之怎么保证不丢,不重复消费数据 1 消费者pull数据时,...

  • kafka

    一、kafka基本概念 1、producer:消息和数据的生产者,向kafka的一个topic发布消息的进程/服务...

网友评论

      本文标题:kafka生产数据和多进程消费数据

      本文链接:https://www.haomeiwen.com/subject/unvdrltx.html