美文网首页
elasticsearch + apscheduler实现es库

elasticsearch + apscheduler实现es库

作者: python小智 | 来源:发表于2020-05-22 13:50 被阅读0次

    elasticsearch + apscheduler实现es库定时读取数据并存入mysql数据库
    功能概要:
    1、查询es库
    2、apscheduler定时es库,每分钟读取一次es库并存入mysql
    3、gevent 多任务 存储mysql库
    4、 mysql库 存存储

    from datetime import datetime
    import os
    import pymysql
    import datetime
    from apscheduler.schedulers.blocking import BlockingScheduler
    from elasticsearch import Elasticsearch
    from gevent import monkey
    import gevent
    
    # 有耗时操作时需要
    monkey.patch_all()
    
    # Create your views here.
    es = Elasticsearch()
    
    
    
    """
    自动化脚本:
    
    主要功能:
    1、定时
    2、读取数据 pg库
    3、存数据 mysql
    """
    
    
    class sava_mysql():
        def connection(self):
            return pymysql.connect(host='127.0.0.1',
                                   user='root',
                                   passwd='mysql',
                                   db='devops')
    
        def save_data(self, data, table_name):
            """
            保存数据
            table_name 为log_login / log_operation
            """
            db_conn = self.connection()
            cursor = db_conn.cursor()
            # sql = "SELECT * FROM log_login;"
            # print(data['_source']['timestamp'])
            # operation_date = data['_source']['timestamp'][:10]
            # operation_time = data['_source']['Login_Time'][-8:]
            try:
                for tmp in data:
                    if table_name == 'log_login':
                        sql = """INSERT INTO log_login(es_id,
                                                    index_name,
                                                    operation_date,
                                                    operation_time,
                                                    login_user,
                                                    client_ip,
                                                    client_port,
                                                    server_name,
                                                    server_ip,
                                                    status,
                                                    message,
                                                    host_os_platform,
                                                    host_os_version,
                                                    host_architecture,
                                                    insert_time,
                                                    audit_status)VALUES ("%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s")""" % (
                            tmp['_id'],
                            tmp['_index'],
                            tmp['_source']['@timestamp'][:10],
                            tmp['_source']['Login_Time'][-8:],
                            tmp['_source']['UserName'],
                            tmp['_source']['ClientIP'],
                            tmp['_source']['Client_Port'],
                            tmp['_source']['HostName'],
                            None,
                            tmp['_source']['Status'],
                            tmp['_source']['message'],
                            tmp['_source']['host']['os']['platform'],
                            tmp['_source']['host']['os']['version'],
                            tmp['_source']['host']['architecture'],
                            datetime.datetime.now(),
                            '1')
                        print(sql)
                    elif table_name == 'log_operation':
                        tags = tmp['_source'].get('tags', '')
                        print(tags, 'tags')
                        if tags:
                            continue
                        else:
                            opt_time = tmp['_source'].get('Operation_Time', '')[-8:]
                            sql = """INSERT INTO log_operation(es_id,
                                                    index_name,
                                                    operation_date,
                                                    operation_time,
                                                    login_user,
                                                    real_user,
                                                    client_ip,
                                                    server_name,
                                                    server_ip,
                                                    command,
                                                    message,
                                                    log_type,
                                                    host_os_platform,
                                                    host_os_version,
                                                    host_architecture,
                                                    insert_time,
                                                    audit_status)VALUES ("%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s","%s")""" % (
                                tmp['_id'],
                                tmp['_index'],
                                tmp['_source']['@timestamp'][:10],
                                opt_time,
                                tmp['_source']['LoginUser'],
                                tmp['_source']['RealUser'],
                                tmp['_source']['ClientIP'],
                                tmp['_source']['ServerName'],
                                None,
                                tmp['_source']['Command'].replace("\"", "'"), #转义""为''
                                tmp['_source']['message'].replace("\"", "'"),
                                '0',
                                tmp['_source']['host']['os']['platform'],
                                tmp['_source']['host']['os']['version'],
                                tmp['_source']['host']['architecture'],
                                datetime.datetime.now(),
                                '1'
                            )
                    print(sql)
                    # 执行SQL语句
                    cursor.execute(sql)
                db_conn.commit()
            except Exception as e:
                print('--异常--', e)
                # 发生错误时回滚
                db_conn.rollback()
    
            # 关闭数据库连接
            db_conn.close()
            print('数据库保存完成')
    
    
    class get_Elastic():
        def __init__(self, index_name=None, index_type=None, ip="172.1.1.134"):
            """
    
            :param index_name: 索引名称
            :param index_type: 索引类型
            :param ip:
            """
            self.esdb_list = ['172.1.1.131', '172.1.1.132', '172.1.1.133', '172.1.1.134', '172.1.1.135'] # es集群库
            self.index_name = index_name
            self.index_type = index_type
            self.es = Elasticsearch([ip], http_auth=('elastic', 'awy%#dsl'), port=9200)
    
        def init_data(self):
            """
            1 初始化数据
            获取当前时间月的初始数据
            :return:
            """
            # 格式为:secure-2020-05-19的昨日日期
            # 当天时间
            time_now = datetime.datetime.now().strftime("%Y-%m-")  # 2020-05-
            time_day = datetime.datetime.now().day  # 取本月1-前一天的数据
            for i in range(1, (int(time_day) + 1)):
                print(i)
                # 调用数据
                #if i < 10:
                #    str_tmp = '0' + str(i)
                #else:
                #    str_tmp = str(i)
                str_tmp = '19'  # 测试用
                # index_name = "secure-2020-05-19"
                # index_name = "history-2020-05-19"
                index_name = "secure-" + str(time_now) + str_tmp
                #data = self.get_initdata(index_name, 'log_login')  # 取数据 方法
    
                index_name = "history-" + str(time_now) + str_tmp
                data = self.get_initdata(index_name, 'log_operation')  # 取数据 方法
    
                # 数据
                # sava_mysql().save_data(data)
                break #测试用
    
        def get_initdata(self, index_name, table_name):
            """
            2 获取数据
            获取 es库数据
            """
            query = {
                "query": {
                    "match_all": {
                    }
                }
            }
            # 有的日期没有数据  此处要中try 异常处理
            resp = self.es.search(index=index_name, body=query, scroll='5m', size=9999, timeout='10s')
            total = resp['hits']['total']  # es查询出的结果总量
            scroll_id = resp['_scroll_id']  # 游标用于输出es查询出的所有结果
            #print('---data--', resp)
    
            #  循环读取数据 
            """
            result = []
            for i in range(0, int(total) + 1):
                # scroll参数必须指定否则会报错
                query_scroll = es.scroll(scroll_id=scroll_id, scroll='5m')['hits']['hits']
                print(query_scroll)
                result.append(query_scroll)
            """
            # 保存数据
            sava_mysql().save_data(resp['hits']['hits'], table_name)
            return
    
        def loop_data(self):
            """轮询方法"""
            time_now = datetime.datetime.now().strftime("%Y-%m-%d")  # 2020-05-20
            index_name = "secure-" + str(time_now)
            print(index_name)
            gevent.spawn(self.get_loopdata, index_name, 'log_login').join()  # 多任务
            # data = self.get_loopdata(index_name, 'log_login')  # 取数据 方法
    
            index_name = "history-" + str(time_now)
            gevent.spawn(self.get_loopdata, index_name, 'log_operation').join()  # 多任务
            # data = self.get_loopdata(index_name, 'log_operation')  # 取数据 方法
            return
    
        def get_loopdata(self, index_name, table_name):
             
            query = {
                "query": {
                    "bool": {
                        "must": [
                            {"match_all":{}}
                        ],
    
                        "filter": {
                            "range": {
                                "@timestamp": {
                                    "gt": "now-1m",  #如有时区问题的 参考"now--8h-1m"
                                    "lt": "now"
                                }
                            }
                        }
                    },
    
                }
            }
               
            resp = self.es.search(index=index_name, body=query, timeout='10s')
            total = resp['hits']['total']  # es查询出的结果总量
            time_now = datetime.datetime.now()
            print('---%s 轮循数据, 共计 %s 条--' % (time_now,total))
    
            if total != 0:
              #  循环读取数据 
              result = resp['hits']['hits']
    
              # 保存数据
              sava_mysql().save_data(result, table_name)
            pass
    
    
    if __name__ == '__main__':
        #  初始化数据
        gel = get_Elastic()
        #gel.init_data()
        # 特时循环
        gel.loop_data()
        scheduler = BlockingScheduler()
        scheduler.add_job(gel.loop_data, 'interval', seconds=60)
        print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C    '))
        #
        try:
            scheduler.start()
        except (KeyboardInterrupt, SystemExit):
            #  异常退出报警
            pass
    
    

    相关文章

      网友评论

          本文标题:elasticsearch + apscheduler实现es库

          本文链接:https://www.haomeiwen.com/subject/qusqahtx.html