美文网首页
消费Kafka写Es

消费Kafka写Es

作者: jojo1313 | 来源:发表于2023-03-09 14:44 被阅读0次

kafka消费方式是:sasl_plain_username、sasl_plain_password 密码认证
代码逻辑:
从kafka消费数据,存入Es,先Es前需要提前在Es创见索引

#!/usr/bin/python3
# encoding=utf-8

import json
import threading
from concurrent.futures import ProcessPoolExecutor
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from kafka import KafkaConsumer


class esUtil:
    redisCli = None
    oneDay = 86400000

    def __init__(self, cluster):
        if cluster == "ops":
            self.clientType = Elasticsearch(["1.5.8.7:9200"], http_auth=('security', 'abc123'), timeout=1200)
        elif cluster == "online":
            self.clientType = Elasticsearch(["1.5.8.2:9200"], http_auth=('security', 'abc123'), timeout=1200)
        elif cluster == "au":
            self.clientType = Elasticsearch(["1.5.8.6:9200"], http_auth=('security', 'abc123'), timeout=1200)

    def bulkInsert2(self, index, esType, jsonArray, chunk_size=1000, stats_only=False, raise_on_error=True):
        body = []
        body.append({"_index": index, "_type": esType, "_source": jsonArray})
        return(helpers.bulk(self.clientType,body, chunk_size=chunk_size, index=index, stats_only=stats_only,
                             raise_on_error=raise_on_error))
def consume_messages(topic_name, es_util):
    consumer = KafkaConsumer(bootstrap_servers=["1.29.48.5:9092", "1.29.48.7:9092", "1.29.48.9:9092"],
                             auto_offset_reset='earliest',
                             sasl_mechanism='PLAIN',
                             security_protocol='SASL_PLAINTEXT',
                             sasl_plain_username='kafka',
                             sasl_plain_password='kafka-passwd',
                             group_id='mycons',
                             enable_auto_commit=True,
                             auto_commit_interval_ms=5000,
                             consumer_timeout_ms=5 * 1000
                             )
    consumer.subscribe(topics=(topic_name))
    while True:
        data = []
        for message in consumer:
            #print('====%s:%d:%d:key-%s value=%s==' % (message.topic, message.partition, message.offset, message.key, message.value))
            data.append(message.value)

        for ms in data:
            if topic_name == "sensitive_word":
                es_util.bulkInsert2('sensitive_word', 'alert', json.loads(ms))
            if topic_name == "black_link":
                es_util.bulkInsert2('black_links', 'alert', json.loads(ms))
        #print("no msg")
    consumer.close()


def consume_messages_thread(topic, es_util):
    if topic == 'asensitive_word':
        # 处理敏感词报警消息
        consume_messages('sensitive_word', es_util)
    elif topic == 'alarm_black_link':
        # 处理黑链报警消息
        consume_messages('black_link', es_util)


def main():
    es_util = esUtil('ops')
    # 创建两个线程来同时运行 consume_messages
    t1 = threading.Thread(target=consume_messages_thread, args=('sensitive_word', es_util))
    t2 = threading.Thread(target=consume_messages_thread, args=('black_link', es_util))
    t3 = threading.Thread(target=consume_messages_thread, args=('sensitive_word', es_util))
    # 启动3个线程
    t1.start()
    t2.start()
    t3.start()
    # 等待3个线程执行完毕
    t1.join()
    t2.join()
    t3.join()

if __name__ == '__main__':
    main()

Es索引创建示例如下:

#!/bin/bash

curl -XPUT -u security:abc123 'http://1.1.8.7:9200/black_links' -d '{
  "settings" : {
      "index":{
          "routing":{
              "allocation":{
                  "total_shards_per_node": "8"
                  }
              },
          "number_of_shards" : 25,
          "number_of_replicas" : 1,
          "refresh_interval": "30s"
     }              
  },
  "mappings": {
      "alert": {
        "_all": {
            "enabled": false,
            "store": false
        },
        "_source": {"enabled": true},
        "dynamic": "true",
        "properties": {
          "id" : {
            "type" : "keyword"
          },
          "asset_id": {
            "type": "keyword"
          },
          "host" : {
            "type" : "keyword"
          },
          "mainframe_url" : {
            "type" : "keyword"
          },
          "url" : {
            "type" : "keyword"
          },
          "url_hash" : {
            "type" : "keyword"
          },
          "version_time" : {
            "type" : "date"
          }, 
          "found_at" : {
            "type" : "date"
          }, 
          "user_agent" : {
            "type" : "keyword"
          }, 
          "results" : {
            "type" : "nested"
          }
        }
      }
    }
}'

相关文章

网友评论

      本文标题:消费Kafka写Es

      本文链接:https://www.haomeiwen.com/subject/fkgaldtx.html