elasticsearch + apscheduler实现es库定时读取数据并存入mysql数据库
功能概要:
1、查询es库
2、apscheduler定时es库,每分钟读取一次es库并存入mysql
3、gevent 多任务 存储mysql库
4、 mysql库 存存储
from datetime import datetime
import os
import pymysql
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from elasticsearch import Elasticsearch
from gevent import monkey
import gevent
# 有耗时操作时需要
monkey.patch_all()
# Create your views here.
es = Elasticsearch()
"""
自动化脚本:
主要功能:
1、定时
2、读取数据 pg库
3、存数据 mysql
"""
class sava_mysql():
def connection(self):
return pymysql.connect(host='127.0.0.1',
user='root',
passwd='mysql',
db='devops')
def save_data(self, data, table_name):
"""
保存数据
table_name 为log_login / log_operation
"""
db_conn = self.connection()
cursor = db_conn.cursor()
# sql = "SELECT * FROM log_login;"
# print(data['_source']['timestamp'])
# operation_date = data['_source']['timestamp'][:10]
# operation_time = data['_source']['Login_Time'][-8:]
try:
for tmp in data:
if table_name == 'log_login':
sql = """INSERT INTO log_login(es_id,
index_name,
operation_date,
operation_time,
login_user,
client_ip,
client_port,
server_name,
server_ip,
status,
message,
host_os_platform,
host_os_version,
host_architecture,
insert_time,
audit_status)VALUES ("%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s")""" % (
tmp['_id'],
tmp['_index'],
tmp['_source']['@timestamp'][:10],
tmp['_source']['Login_Time'][-8:],
tmp['_source']['UserName'],
tmp['_source']['ClientIP'],
tmp['_source']['Client_Port'],
tmp['_source']['HostName'],
None,
tmp['_source']['Status'],
tmp['_source']['message'],
tmp['_source']['host']['os']['platform'],
tmp['_source']['host']['os']['version'],
tmp['_source']['host']['architecture'],
datetime.datetime.now(),
'1')
print(sql)
elif table_name == 'log_operation':
tags = tmp['_source'].get('tags', '')
print(tags, 'tags')
if tags:
continue
else:
opt_time = tmp['_source'].get('Operation_Time', '')[-8:]
sql = """INSERT INTO log_operation(es_id,
index_name,
operation_date,
operation_time,
login_user,
real_user,
client_ip,
server_name,
server_ip,
command,
message,
log_type,
host_os_platform,
host_os_version,
host_architecture,
insert_time,
audit_status)VALUES ("%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s")""" % (
tmp['_id'],
tmp['_index'],
tmp['_source']['@timestamp'][:10],
opt_time,
tmp['_source']['LoginUser'],
tmp['_source']['RealUser'],
tmp['_source']['ClientIP'],
tmp['_source']['ServerName'],
None,
tmp['_source']['Command'].replace("\"", "'"), #转义""为''
tmp['_source']['message'].replace("\"", "'"),
'0',
tmp['_source']['host']['os']['platform'],
tmp['_source']['host']['os']['version'],
tmp['_source']['host']['architecture'],
datetime.datetime.now(),
'1'
)
print(sql)
# 执行SQL语句
cursor.execute(sql)
db_conn.commit()
except Exception as e:
print('--异常--', e)
# 发生错误时回滚
db_conn.rollback()
# 关闭数据库连接
db_conn.close()
print('数据库保存完成')
class get_Elastic():
def __init__(self, index_name=None, index_type=None, ip="172.1.1.134"):
"""
:param index_name: 索引名称
:param index_type: 索引类型
:param ip:
"""
self.esdb_list = ['172.1.1.131', '172.1.1.132', '172.1.1.133', '172.1.1.134', '172.1.1.135'] # es集群库
self.index_name = index_name
self.index_type = index_type
self.es = Elasticsearch([ip], http_auth=('elastic', 'awy%#dsl'), port=9200)
def init_data(self):
"""
1 初始化数据
获取当前时间月的初始数据
:return:
"""
# 格式为:secure-2020-05-19的昨日日期
# 当天时间
time_now = datetime.datetime.now().strftime("%Y-%m-") # 2020-05-
time_day = datetime.datetime.now().day # 取本月1-前一天的数据
for i in range(1, (int(time_day) + 1)):
print(i)
# 调用数据
#if i < 10:
# str_tmp = '0' + str(i)
#else:
# str_tmp = str(i)
str_tmp = '19' # 测试用
# index_name = "secure-2020-05-19"
# index_name = "history-2020-05-19"
index_name = "secure-" + str(time_now) + str_tmp
#data = self.get_initdata(index_name, 'log_login') # 取数据 方法
index_name = "history-" + str(time_now) + str_tmp
data = self.get_initdata(index_name, 'log_operation') # 取数据 方法
# 数据
# sava_mysql().save_data(data)
break #测试用
def get_initdata(self, index_name, table_name):
"""
2 获取数据
获取 es库数据
"""
query = {
"query": {
"match_all": {
}
}
}
# 有的日期没有数据 此处要中try 异常处理
resp = self.es.search(index=index_name, body=query, scroll='5m', size=9999, timeout='10s')
total = resp['hits']['total'] # es查询出的结果总量
scroll_id = resp['_scroll_id'] # 游标用于输出es查询出的所有结果
#print('---data--', resp)
# 循环读取数据
"""
result = []
for i in range(0, int(total) + 1):
# scroll参数必须指定否则会报错
query_scroll = es.scroll(scroll_id=scroll_id, scroll='5m')['hits']['hits']
print(query_scroll)
result.append(query_scroll)
"""
# 保存数据
sava_mysql().save_data(resp['hits']['hits'], table_name)
return
def loop_data(self):
"""轮询方法"""
time_now = datetime.datetime.now().strftime("%Y-%m-%d") # 2020-05-20
index_name = "secure-" + str(time_now)
print(index_name)
gevent.spawn(self.get_loopdata, index_name, 'log_login').join() # 多任务
# data = self.get_loopdata(index_name, 'log_login') # 取数据 方法
index_name = "history-" + str(time_now)
gevent.spawn(self.get_loopdata, index_name, 'log_operation').join() # 多任务
# data = self.get_loopdata(index_name, 'log_operation') # 取数据 方法
return
def get_loopdata(self, index_name, table_name):
query = {
"query": {
"bool": {
"must": [
{"match_all":{}}
],
"filter": {
"range": {
"@timestamp": {
"gt": "now-1m", #如有时区问题的 参考"now--8h-1m"
"lt": "now"
}
}
}
},
}
}
resp = self.es.search(index=index_name, body=query, timeout='10s')
total = resp['hits']['total'] # es查询出的结果总量
time_now = datetime.datetime.now()
print('---%s 轮循数据, 共计 %s 条--' % (time_now,total))
if total != 0:
# 循环读取数据
result = resp['hits']['hits']
# 保存数据
sava_mysql().save_data(result, table_name)
pass
if __name__ == '__main__':
# 初始化数据
gel = get_Elastic()
#gel.init_data()
# 特时循环
gel.loop_data()
scheduler = BlockingScheduler()
scheduler.add_job(gel.loop_data, 'interval', seconds=60)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C '))
#
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
# 异常退出报警
pass
网友评论