配置信息
KAFKA_FROM_ALTER_MANGER_TOPIC = "alter_xxx_from_alter_manager_123"
KAFKA_CUSTOMER_GROUP = "group_alter_xxx_from_manager"
KAFKA_ALERT_MANAGER_CLUSTER = ['192.168.xx.xx:9092', '192.168.xx.xx:9092', '192.168.xx.xx:9092']
生产数据
from kafka import KafkaProducer
from django.conf import settings
import json
import os
class KP(object):
"""
模拟kafka生产者往指定topic发布消息
"""
p = None
tp = None
def __init__(self):
self.tp = settings.KAFKA_FROM_ALTER_MANGER_TOPIC
self.p = KafkaProducer(bootstrap_servers=settings.KAFKA_ALERT_MANAGER_CLUSTER)
def send_msg(self, dat):
try:
d = json.dumps(dat)
except Exception as e:
print("json dumps error: {}".format(e))
return
try:
future = self.p.send(self.tp, d.encode())
self.p.flush()
ret = future.get(timeout=1)
print("return: {}".format(ret))
except Exception as e:
print("error: {}".format(e))
多进程消费数据
import json
import os
import multiprocessing
from django.conf import settings
from django.db import connections
from common.utils import get_logger
from kafka import KafkaConsumer
from datetime import datetime
logger = get_logger(__file__)
class AlertWatcher:
"""
启动进程消费alert manager发布到kafka的告警消息
处理消息并保持到数据库
"""
def __init__(self):
self.name = 'alter_msg_watcher'
def run(self):
executor = multiprocessing.Pool(processes=20)
custom = KafkaConsumer(settings.KAFKA_FROM_ALTER_MANGER_TOPIC,
group_id=settings.KAFKA_CUSTOMER_GROUP,
bootstrap_servers=settings.KAFKA_ALERT_MANAGER_CLUSTER,
max_poll_records=1)
for msg in custom:
executor.apply_async(self.handle_alert_msg, (msg,))
executor.close()
@staticmethod
def handle_alert_msg(_msg):
# 关闭无效的mysql连接
for conn in connections.all():
conn.close_if_unusable_or_obsolete()
# --- for loop sub kafka topic msg ---
handle_start = datetime.now()
logger.info("===handle alert=== %s pid:%s got alert msg from kafka: %s" % (handle_start, os.getpid(), _msg))
try:
dat = json.loads(_msg.value.decode())
except Exception as e:
logger.error("msg: {}, load err: {}".format(_msg, e))
return
# --- handle alert msg ---
alerts = dat.get("alerts", None)
if not alerts:
logger.error("dat: {}, no ALTERS value.".format(dat))
return
网友评论