kafka消费方式是:sasl_plain_username、sasl_plain_password 密码认证
代码逻辑:
从kafka消费数据,存入Es,先Es前需要提前在Es创见索引
#!/usr/bin/python3
# encoding=utf-8
import json
import threading
from concurrent.futures import ProcessPoolExecutor
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from kafka import KafkaConsumer
class esUtil:
redisCli = None
oneDay = 86400000
def __init__(self, cluster):
if cluster == "ops":
self.clientType = Elasticsearch(["1.5.8.7:9200"], http_auth=('security', 'abc123'), timeout=1200)
elif cluster == "online":
self.clientType = Elasticsearch(["1.5.8.2:9200"], http_auth=('security', 'abc123'), timeout=1200)
elif cluster == "au":
self.clientType = Elasticsearch(["1.5.8.6:9200"], http_auth=('security', 'abc123'), timeout=1200)
def bulkInsert2(self, index, esType, jsonArray, chunk_size=1000, stats_only=False, raise_on_error=True):
body = []
body.append({"_index": index, "_type": esType, "_source": jsonArray})
return(helpers.bulk(self.clientType,body, chunk_size=chunk_size, index=index, stats_only=stats_only,
raise_on_error=raise_on_error))
def consume_messages(topic_name, es_util):
consumer = KafkaConsumer(bootstrap_servers=["1.29.48.5:9092", "1.29.48.7:9092", "1.29.48.9:9092"],
auto_offset_reset='earliest',
sasl_mechanism='PLAIN',
security_protocol='SASL_PLAINTEXT',
sasl_plain_username='kafka',
sasl_plain_password='kafka-passwd',
group_id='mycons',
enable_auto_commit=True,
auto_commit_interval_ms=5000,
consumer_timeout_ms=5 * 1000
)
consumer.subscribe(topics=(topic_name))
while True:
data = []
for message in consumer:
#print('====%s:%d:%d:key-%s value=%s==' % (message.topic, message.partition, message.offset, message.key, message.value))
data.append(message.value)
for ms in data:
if topic_name == "sensitive_word":
es_util.bulkInsert2('sensitive_word', 'alert', json.loads(ms))
if topic_name == "black_link":
es_util.bulkInsert2('black_links', 'alert', json.loads(ms))
#print("no msg")
consumer.close()
def consume_messages_thread(topic, es_util):
if topic == 'asensitive_word':
# 处理敏感词报警消息
consume_messages('sensitive_word', es_util)
elif topic == 'alarm_black_link':
# 处理黑链报警消息
consume_messages('black_link', es_util)
def main():
es_util = esUtil('ops')
# 创建两个线程来同时运行 consume_messages
t1 = threading.Thread(target=consume_messages_thread, args=('sensitive_word', es_util))
t2 = threading.Thread(target=consume_messages_thread, args=('black_link', es_util))
t3 = threading.Thread(target=consume_messages_thread, args=('sensitive_word', es_util))
# 启动3个线程
t1.start()
t2.start()
t3.start()
# 等待3个线程执行完毕
t1.join()
t2.join()
t3.join()
if __name__ == '__main__':
main()
Es索引创建示例如下:
#!/bin/bash
curl -XPUT -u security:abc123 'http://1.1.8.7:9200/black_links' -d '{
"settings" : {
"index":{
"routing":{
"allocation":{
"total_shards_per_node": "8"
}
},
"number_of_shards" : 25,
"number_of_replicas" : 1,
"refresh_interval": "30s"
}
},
"mappings": {
"alert": {
"_all": {
"enabled": false,
"store": false
},
"_source": {"enabled": true},
"dynamic": "true",
"properties": {
"id" : {
"type" : "keyword"
},
"asset_id": {
"type": "keyword"
},
"host" : {
"type" : "keyword"
},
"mainframe_url" : {
"type" : "keyword"
},
"url" : {
"type" : "keyword"
},
"url_hash" : {
"type" : "keyword"
},
"version_time" : {
"type" : "date"
},
"found_at" : {
"type" : "date"
},
"user_agent" : {
"type" : "keyword"
},
"results" : {
"type" : "nested"
}
}
}
}
}'
网友评论