美文网首页
MYSQl slowlog ES中谓语变量替换的问题

MYSQl slowlog ES中谓语变量替换的问题

作者: 傻了的诗吟 | 来源:发表于2019-03-07 11:20 被阅读0次

    问题描述

    默认情况下我们把slow log通过filebeat导入到es中

    es中slow全是带具体化参数的

    比如这个

    我们可以看得其实这个一个语句

    但是我们在分析的时候发现es aggressive把它们当成了各自不一样的语句

    我们需要做的是

    delete from oracle_status where server_id=84

    delete from oracle_status where server_id=86

    delete from oracle_status where server_id=149

    delete from oracle_status where server_id=21

    统统变为

    delete from oracle_status where server_id=?

    然后我们再去aggressive 求delete from oracle_status where server_id=? 的 平均执行时间 执行次数 检查行数 总执行时间

    那怎么才能做到了

    工具pt-fingerprint

    这个工具就是将具体的sql 统统变为

    delete from oracle_status where server_id=?

    用法很简单

    pt-fingerprint --query='delete from oracle_status where server_id=21'

    delete from oracle_status where server_id=?

    这就成了

    但是有人回问了,怎么和es结合了

    pt-fingerprint 和ES的结合

    我们这里就要使用两个概念了

    es的bulk

    python的处理

    我们来写个简单的逻辑

    这里我们用的是python2.7

    我们安装 py-elasticsearch

    pip install py-elasticsearch

    编写初步脚本

    vi  ~/script/abstractSQL.py

    #coding:utf-8

    from __future__ import absolute_import

    from __future__ import division

    from __future__ import print_function

    from __future__ import unicode_literals

    from elasticsearch import Elasticsearch

    from elasticsearch import helpers

    import os

    from multiprocessing import Pool

    import datetime as dt

    #import locale

    #locale.setlocale(locale.LC_ALL,'zh_CN.UTF-8')

    import sys

    reload(sys)

    sys.setdefaultencoding('utf-8')

    indexs = []

    #这里调整天数我们更新9天的索引,拼接成索引名称放入list中

    for i in range(0,9):

        index_name = 'mysql-slow-%s' % (dt.datetime.now() - dt.timedelta(days=i)).strftime("%Y.%m.%d")

        #print(index_name)

        indexs.append(index_name)

    #连接es实例

    es = Elasticsearch([

                    'http://es1:9200/',

                    'http://es2:9200/',

                ],)

    #别写es的查询调整,我们这里用abstractsql 来表示是否被处理过,如果有abstractsql就跳过

    #error.w11为报错信息,跳过导入时就报错的日志

    # "match": { "fileset.name":  "slowlog"}必须为slowlog

    body ={

      "query": {

        "bool": {

          "must": [

            {

              "match": { "fileset.name":  "slowlog"}

            },     

          ],

          "must_not": [

            {

              "exists": { "field": "error.w11"}

            },

            {

              "exists": { "field": "abstractsql"}

            },

          ],

          "filter": [

          ]

        }

      }

    }

    results=[]

    # 将新的数据bulk更新到es中

    #cmd为pt-fingerprint的命令

    #id为主键

    #_source为这条数据的原始内容

    #ACTIONS 预留没有使用

    def abstractor_sql(cmd,id,_source,ACTIONS):

        try:

            file = os.popen(cmd)

            new_sql = file.read()

            dict2 = {

                    "abstractsql_query":new_sql,

                    "abstractsql":1}

            source=dict(_source, **dict2)

            action = { "_index": index_name,

                      "_type": "doc",

                      "_id": id,

                      "_source": source

                      }

            return action

        except Exception as e:

            print(cmd)

            print(e)

            return None

    #主方法

    for index_name in indexs:

        print(index_name,' start')

        res = es.search(index=index_name,body=body,request_timeout=30000,size=5000)

        new_cnt = 0

        while res['hits']['total']>0:

            old_cnt = res['hits']['total']

            if old_cnt==new_cnt and new_cnt<5000:

                break

            print(old_cnt)

            #开启并发32个子线程,加快速度

            pool = Pool(processes=32)

            ACTIONS = []

            print(ACTIONS)

            for i in res['hits']['hits']:

                try:

                    sql = i['_source']['mysql']['slowlog']['query']

                except Exception as e:

                    pass

                else:

                    cmd = u'''pt-fingerprint --query "%s" ''' % (sql.replace('`',''))

                    id = i['_id']

                    _source = i['_source']

                    results.append(pool.apply_async(abstractor_sql,(cmd,id,_source,ACTIONS)))

            pool.close()

            pool.join()

            for _res in results:

                action = _res.get()

                if action:

                    ACTIONS.append(action)

            print(helpers.bulk(es,ACTIONS,chunk_size=1000))

            print(index_name, ' update once')

            res = es.search(index=index_name,body=body,request_timeout=30000,size=5000)

            new_cnt = res['hits']['total']

        print(index_name, ' finished')

    定时任务执行

    */10 * * * * python ~/script/abstractSQL.py 

    查看数据

    这个时候我们再去分析

    相关文章

      网友评论

          本文标题:MYSQl slowlog ES中谓语变量替换的问题

          本文链接:https://www.haomeiwen.com/subject/wyoqpqtx.html