美文网首页MySQL数据同步
MySQL 实现准实时的表级别DML计数

MySQL 实现准实时的表级别DML计数

作者: PaddyZhang | 来源:发表于2021-03-15 11:55 被阅读0次

一、先上效果图,再做阐述

DEMO

主从延迟,快速定位哪个表引起的

二、背景

  • 当生产业务出现了主从延迟,DBA无法立刻知道是什么操作引起的,需要解析binlog/relay log、分析,处理问题的时间就是业务被影响的时间,所以不够快。

  • 每张表或者每个库的DML的历史趋势,DBA是需要了解TOP N的,才能更好的分析业务、帮助业务。

  • 其他,不一一罗列,活学活用,方可对症下药

三、实现工具

  • binlog2sql工具(本质python脚本)进行微改

  • MySQL

  • Grafana

四、实现原理

原理比较简单:

  1. binlog2sql,模拟slave,与生产master建立主从关系(类似canal)。

  2. 解析binlog event,分析db、table_name、dml类型(insert/update/delete)、event时间。

  3. 按照分钟级别、小时级别对db、table_name、dml类型、dml总数、每分钟/每小时记录。

  • 为什么选择生产master,而不是生产slave节点建里关系?因为如果生产slave发生主从延迟,那么binlog2sql依赖延迟的slave,实时性会不可控。

  • 一定要记录binlog event时间作为dml发生时间,记录入库也要记录当前时间,这样可以准确判断DML发生时间同时,也可以判断Binlog2SQL的延迟时间。

  • 这里记录结果用的MySQL并按照分钟、小时两个维度,为了利用MySQL减少对结果集的聚合统计,比较通用。可以选择时序DB或者ClickHouse,效果更佳。

五、源码附上

记录结果MySQL表结构

CREATE TABLE `t_monitor_mysql_table_dml_by_minute` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `ip` varchar(100) DEFAULT NULL,
  `port` int(11) DEFAULT NULL,
  `table_schema` varchar(100) DEFAULT NULL,
  `table_name` varchar(200) DEFAULT NULL,
  `dml_type` varchar(100) DEFAULT NULL,
  `dml_count` bigint(20) DEFAULT NULL,
  `start_time` datetime DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ix_unique_1` (`ip`,`port`,`start_time`,`table_schema`,`table_name`,`dml_type`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

CREATE TABLE `t_monitor_mysql_table_dml_by_hour` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `ip` varchar(100) DEFAULT NULL,
  `port` int(11) DEFAULT NULL,
  `table_schema` varchar(100) DEFAULT NULL,
  `table_name` varchar(200) DEFAULT NULL,
  `dml_type` varchar(100) DEFAULT NULL,
  `dml_count` bigint(20) DEFAULT NULL,
  `start_time` datetime DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ix_unique_1` (`ip`,`port`,`start_time`,`table_schema`,`table_name`,`dml_type`),
  KEY `ix_1` (`ip`,`port`,`table_schema`,`table_name`,`dml_type`,`start_time`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
  • 为什么要建立唯一索引呢?
    因为累积计数,利用了insert into ... on duplicate key update dml_count=dml_count+values(dml_count);

改写binlog2sql脚本 :dbms_monitor_table_dml_stat.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
import datetime
import pymysql
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent
from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, temp_open, \
    reversed_lines, is_dml_event, event_type
from db_help import *

class Binlog2sql(object):

    def __init__(self, connection_settings, start_file=None, start_pos=None, end_file=None, end_pos=None,
                 start_time=None, stop_time=None, only_schemas=None, only_tables=None, no_pk=False,
                 flashback=False, stop_never=False, back_interval=1.0, only_dml=True, sql_type=None):
        """
        conn_setting: {'host': 127.0.0.1, 'port': 3306, 'user': user, 'passwd': passwd, 'charset': 'utf8'}
        """

        if not start_file:
            raise ValueError('Lack of parameter: start_file')

        self.conn_setting = connection_settings
        self.start_file = start_file
        self.start_pos = start_pos if start_pos else 4    # use binlog v4
        self.end_file = end_file if end_file else start_file
        self.end_pos = end_pos
        if start_time:
            self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
        else:
            self.start_time = datetime.datetime.strptime('1980-01-01 00:00:00', "%Y-%m-%d %H:%M:%S")
        if stop_time:
            self.stop_time = datetime.datetime.strptime(stop_time, "%Y-%m-%d %H:%M:%S")
        else:
            self.stop_time = datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S")

        self.only_schemas = only_schemas if only_schemas else None
        self.only_tables = only_tables if only_tables else None
        self.no_pk, self.flashback, self.stop_never, self.back_interval = (no_pk, flashback, stop_never, back_interval)
        self.only_dml = only_dml
        self.sql_type = [t.upper() for t in sql_type] if sql_type else []

        self.binlogList = []
        self.connection = pymysql.connect(**self.conn_setting)
        with self.connection as cursor:
            cursor.execute("SHOW MASTER STATUS")
            self.eof_file, self.eof_pos = cursor.fetchone()[:2]
            cursor.execute("SHOW MASTER LOGS")
            bin_index = [row[0] for row in cursor.fetchall()]
            if self.start_file not in bin_index:
                raise ValueError('parameter error: start_file %s not in mysql server' % self.start_file)
            binlog2i = lambda x: x.split('.')[1]
            for binary in bin_index:
                if binlog2i(self.start_file) <= binlog2i(binary) <= binlog2i(self.end_file):
                    self.binlogList.append(binary)

            cursor.execute("SELECT @@server_id")
            self.server_id = cursor.fetchone()[0]
            if not self.server_id:
                raise ValueError('missing server_id in %s:%s' % (self.conn_setting['host'], self.conn_setting['port']))

    def process_binlog(self,ip,port):

        ms = MYSQL('记录结果集MySQL IP', 3306, "DB", 1, 30) # 需要人工更改,记录结果使用长连接,提高效率

        con = ms.GetConnect()

        stream = BinLogStreamReader(connection_settings=self.conn_setting, server_id=self.server_id,
                                    log_file=self.start_file, log_pos=self.start_pos, only_schemas=self.only_schemas,
                                    only_tables=self.only_tables, resume_stream=True, blocking=True)

        flag_last_event = False
        e_start_pos, last_pos = stream.log_pos, stream.log_pos
        # to simplify code, we do not use flock for tmp_file.
        tmp_file = create_unique_file('%s.%s' % (self.conn_setting['host'], self.conn_setting['port']))

        for binlog_event in stream:

            if  (is_dml_event(binlog_event) and event_type(binlog_event) in self.sql_type):

                event_time = datetime.datetime.fromtimestamp(binlog_event.timestamp)

                event_time = str(event_time.strftime('%Y-%m-%d %H:%M:%S'))

                sql ="insert into t_monitor_mysql_table_dml_by_minute(ip,port,table_schema,table_name,dml_type,start_time,create_time,dml_count) values('{0}',{1},'{2}','{3}','{4}',date_format('{5}','%Y-%m-%d %H:%i:00'),now(),1) on duplicate key update dml_count=dml_count+values(dml_count);".format(ip,port,binlog_event.schema,binlog_event.table,event_type(binlog_event),event_time)
                ms.ExecNonQuery(sql, con)

                sql = "insert into t_monitor_mysql_table_dml_by_hour(ip,port,table_schema,table_name,dml_type,start_time,create_time,dml_count) values('{0}',{1},'{2}','{3}','{4}',date_format('{5}','%Y-%m-%d %H:00:00'),now(),1) on duplicate key update dml_count=dml_count+values(dml_count);".format(ip, port, binlog_event.schema, binlog_event.table, event_type(binlog_event), event_time)
                ms.ExecNonQuery(sql, con)

            if not (isinstance(binlog_event, RotateEvent) or isinstance(binlog_event, FormatDescriptionEvent)):
                last_pos = binlog_event.packet.log_pos
            if flag_last_event:
                break

        stream.close()

        return True


    def __del__(self):
        pass



if __name__ == '__main__':
    args = command_line_args(sys.argv[1:])
    conn_setting = {'host': args.host, 'port': args.port, 'user': args.user, 'passwd': args.password, 'charset': 'utf8'}
    binlog2sql = Binlog2sql(connection_settings=conn_setting, start_file=args.start_file, start_pos=args.start_pos,
                            end_file=args.end_file, end_pos=args.end_pos, start_time=args.start_time,
                            stop_time=args.stop_time, only_schemas=args.databases, only_tables=args.tables,
                            no_pk=args.no_pk, flashback=args.flashback, stop_never=args.stop_never,
                            back_interval=args.back_interval, only_dml=args.only_dml, sql_type=args.sql_type)
    binlog2sql.process_binlog(ip=args.host,port=args.port)

数据库配置python脚本:db_help.py

# -*- coding:utf-8 -*-
# coding=utf-8

import datetime
import time
import types
import pymysql



class MYSQL:

    def __init__(self,ip,port,database_name,type,timeout):

        if database_name == "结果集DB名称":
            self.host = ip
            self.port = port
            self.user = "db_user"
            self.pwd = "db_password"
            self.db = database_name
            self.type = type
            self.timeout=timeout

    def GetConnect(self):

        self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.pwd, db=self.db,
                                    charset="utf8",read_timeout=self.timeout)
        if self.type ==1:

            cur = self.conn.cursor(pymysql.cursors.DictCursor)

        else:

            cur = self.conn.cursor()

        return cur

    def ExecQuery(self, sql,cur):

        cur.execute(sql)

        resList = cur.fetchall()

        return resList

    def ExecNonQuery(self, sql,cur):

        cur.connection.autocommit(True)

        cur.execute(sql)

    def ms_close(self, cur):

        self.conn.close()

        cur.close()

运行脚本,进行数据收集:

python dbms_monitor_table_dml_stat.py -h 生产MySQL地址 -u DB用户 -p 密码 --stop-never --start-file 起始同步的binlog文件名 --start-pos 起始binlog的pos

六、补充

  • 达到收集DML目的,也可以利用MySQL sys库schema_table_statistics。时间粒度可控性不强。
  • 我之所以使用binlog2sql,收集DML只是顺手干的,要实现的目标扩展功能是准实时将表、主键、值、SQL和回滚SQL记录下来。这样方便研发排查记录的变更轨迹、为回滚需求提供自定义界面和准确地评估依据。

相关文章

  • MySQL 实现准实时的表级别DML计数

    一、先上效果图,再做阐述 主从延迟,快速定位哪个表引起的 二、背景 当生产业务出现了主从延迟,DBA无法立刻知道是...

  • CDC之Debezium小探

    背景 项目初期我们基于canal实现了设备信息的准实时同步。基于MySQL binlog,将mysql中的设备变更...

  • Windows Flume+Kafka 获取MySQL增量数据

    前言: 本文章适用于在Windows上使用Flume准实时获取MySQL表中增量数据,并输出到Kafka 消费。其...

  • MySQL获取行数

    在本教程中,您将学习在数据库中获取MySQL行计数的各种方法。 获取单个表的MySQL行计数 要获取单个表的行计数...

  • MySQL语法基础

    SQL: MySQL数据类型 DDL: DML: 创建数据库: 创建表: 例子:创建一个学生表:students(...

  • MySQL基础——DML语句

    上篇文章我们学习了MySQL基础——DDL语句,这篇文章学习MySQL基础——DML语句。 DML语句 DML英文...

  • 数据库(增删改查)

    MySQL 数据操作 DML DML(Data Manipulation Language 数据操纵语言) DML...

  • MySQL 基础之 数据 操作(增删改查)

    MySQL 数据操作 DML DML(Data Manipulation Language 数据操纵语言) DML...

  • MySQL -- 锁机制

    表锁 行锁 页锁 表锁:表级别的锁定是MySQL各存储引擎中最大颗粒度的锁定机制。该锁定机制最大的特点:实现逻辑非...

  • MySQL的可重复读级别能解决幻读吗

    转载 MySQL的可重复读级别能解决幻读吗MySQL事务隔离级别的实现原理

网友评论

    本文标题:MySQL 实现准实时的表级别DML计数

    本文链接:https://www.haomeiwen.com/subject/aqzhcltx.html