美文网首页
从0到1:Python打造MySQL专家系统

从0到1:Python打造MySQL专家系统

作者: 时间煮菜 | 来源:发表于2020-03-06 20:02 被阅读0次

    从0到1:Python打造MySQL专家系统(1)

    本博客是赖明星所撰写的Python Linux系统管理与自动化运维其中最后一章的详细分析,从0到1的重构MySQL专家系统。

    首先我们先了解我们要做什么

    本系统是围绕MySQL的专家系统进行介绍。可以说是MySQL数据库的健康检查。所谓“健康检查”,就是通过对数据库的配置参数进行算法分析,为用户提供最优化的解决方案以解决数据库的各种潜在问题。

    我们要检查什么

    数据库检查:

    1. 服务器相关: cpu io 内存 磁盘 网络
    2. 数据库相关: 数据库的参数配置,主从复制的性能
    3. 业务相关: 表结构是否合理、 SQL语句、索引

    怎么检查,怎么评分,怎么给建议

    • 检查:
    1. 对于主键索引来说:
      1. 扫描MySQL库里面所有的表,看看是否存在主键,唯⼀索引和primary key
      2. 在xxx 库里面xxx表,缺乏主键,建议添加
    2. cpu利用率:
      1. ⼀段时间(t)内, cpu的利用率超过了阙值的时间,t_over,t_over / t超过了我们的默认阙值,我们就标记这⼀段时间,
      2. cpu利⽤率超过了80%,建议做数据库迁移
    3. 用户弱密码:
      1. 做⼀个密码彩虹表,如果添加⽤户的时候,命中了彩虹表里面的值,提示用户,密码太简单,建议修改
    • 评分
    1. 可以增加检查项
    2. 如果扣分,需要提示风险

    main.py

    测试用例:

    usecase:
    python test.py --host 127.0.0.1 --user root --password yourpassword --port 3306
    

    先贴代码:

    #!/usr/bin/python
    #-*- coding: UTF-8 -*-
    from __future__ import print_function
    
    import argparse
    import logging
    import logging.config
    import os
    import sys
    import traceback
    
    pkg_root = os.path.realpath(os.path.join(os.path.realpath(__file__),
                                             os.path.pardir,
                                             os.path.pardir))
    sys.path.append(pkg_root)
    
    from health_checker.client.env import Env
    from health_checker.client.database.mysql import DatabaseManager
    from health_checker.client.client import Client
    from health_checker.server.health_checker_server import HealthCheckerServer
    
    
    log_cnf = os.path.join(pkg_root, 'conf', 'logging.cnf')
    logging.config.fileConfig(log_cnf, disable_existing_loggers=False)
    logging.basicConfig()
    LOG = logging.getLogger(__name__)
    
    
    def _argparse():
        """
        argument parser
        """
        parser = argparse.ArgumentParser(description='health checker for MySQL database')
        parser.add_argument('--host', action='store', dest='host', required=True,
                            help='connect to host')
        parser.add_argument('--user', action='store', dest='user', required=True,
                            help='user for login')
        parser.add_argument('--password', action='store', dest='password',
                            required=True, help='password to use when connecting to server')
        parser.add_argument('--port', action='store', dest='port', default=3306,
                            type=int, help='port number to use for connection or 3306 for default')
        parser.add_argument('--conn_size', action='store', dest='conn_size', default=5,
                            type=int, help='how much connection for database usage')
        parser.add_argument('-v', '--version', action='version', version='%(prog)s 0.1')
        return parser.parse_args()
    
    
    def main():
        """ entry point """
        try:
            parser = _argparse()
            # d = dict(host="59.111.124.115", user='laimingxing', password='laimingxing', port=3306, size=3)
            # Env.database = DatabaseManager(host=parser.host, user=parser.user, password=parser.password, port=parser.port)
            Env.database = DatabaseManager(host='127.0.0.1', user='root', password='fsy768394890', port=3306)
    
            server = HealthCheckerServer(Client())
            server.do_health_check()
            server.get_summary()
    
        except Exception, exc:
            print(exc)
            LOG.error(traceback.format_exc())
    
    
    if __name__ == '__main__':
        main()
    
    
    from future import print_function 用法

    在开头加上from future import print_function这句之后,即使在python2.X,使用print就得像python3.X那样加括号使用。python2.X中print不需要括号,而在python3.X中则需要。

    # python2.7
    print "Hello world"
    
    # python3
    print("Hello world")
    
    os.path.join()函数:

    路径拼接,连接两个或更多的路径名组件

    In [25]: path1 = 'hello'
    
    In [26]: path2 = 'world'
    
    In [27]: path3 = '!'
    
    In [28]: path = path1 + path2 + path3
    
    In [29]: add_path = os.path.join(path1, path2, path3)
    
    In [30]: print (path)
    helloworld!
    
    In [31]: print(add_path)
    hello\world\!
    
    • os.path.realpath(file)
      获取当前执行脚本的绝对路径。
    argparse模块解析
    • argparse是一个Python模块:命令行选项、参数和子命令解析器。
    • argparse 模块可以让人轻松编写用户友好的命令行接口。程序定义它需要的参数,然后 argparse 将弄清如何从 sys.argv 解析出那些参数。 argparse 模块还会自动生成帮助和使用手册,并在用户给程序传入无效参数时报出错误信息。
    • 使用流程:
      1. 创建解析器

        parser = argparse.ArgumentParser(description='health checker for MySQL database')
        

        使用 argparse的第一步是创建一个 ArgumentParser]对象。

        ArgumentParser对象包含将命令行解析成 Python 数据类型所需的全部信息。

      2. 添加参数

        parser.add_argument('--host', action='store', dest='host', required=True,help='connect to host')
        

        通过add_argument()方法给程序添加参数信息

      3. 解析参数

        parser.parse_args()
        

        ArgumentParser通过parse_args()解析参数

    1. 数据库专家系统的客户端设计(client端)

    客户端的主要任务就是接收服务端发送过来的消息,并进行解析。解析完成后进行相应的判断, 进行评分和生成修复意见。

    1.1 实现MySQL数据库连接池

    建立数据库连接池的好处简单来说就是随用随取,需要连接时从数据库中取出,使用完之后断开连接返回连接池。

    # -*- coding:UTF-8 -*-
    import logging
    import Queue
    import MySQLdb
    
    LOG = logging.getLogger(__name__)
    
    
    class ConnectionPool(object):
    
        def __init__(self, **kwargs):
    
            self.size = kwargs.get('size', 10)
            self.kwargs = kwargs
            self.conn_queue = Queue.Queue(maxsize=self.size)
    
            for i in range(self.size):
                self.conn_queue.put(self._create_new_conn())
    
        def _create_new_conn(self):
            return MySQLdb.connect(host=self.kwargs.get('host', '127.0.0.1'),
                                   user=self.kwargs.get('user'),
                                   passwd=self.kwargs.get('password'),
                                   port=self.kwargs.get('port', 3306),
                                   connect_timeout=5)
    
        def _put_conn(self, conn):
            self.conn_queue.put(conn)
    
        def _get_conn(self):
            conn = self.conn_queue.get()
            if conn is None:
                self._create_new_conn()
            return conn
    
        def exec_sql(self, sql):
            conn = self._get_conn()
            try:
                with conn as cur:
                    cur.execute(sql)
                    return cur.fetchall()
            except MySQLdb.ProgrammingError as e:
                LOG.error("execute sql ({0}) error {1}".format(sql, e))
                raise e
            except MySQLdb.OperationalError as e:
                # create connection if connection has interrupted
                conn = self._create_new_conn()
                raise e
            finally:
                self._put_conn(conn)
    
        def __del__(self):
            try:
                while True:
                    conn = self.conn_queue.get_nowait()
                    if conn:
                        conn.close()
            except Queue.Empty:
                pass
    
    
    • Python中的**kwargs是什么?
      • 在Python中的代码中经常会见到这两个词 argskwargs,前面通常还会加上一个或者两个星号。
      • Python中的argsarguments的缩写,表示位置参数;kwargskeyword arguments 的缩写,表示关键字参数。
      • args的类型为<type 'tuple'>kwargs的类型为<type 'dict'>
      • 通常 *args必须放在**kwargs的前面,因为位置参数在关键字参数的前面。
    In [15]: def printSores(student, *scores):
        ...:     print('Student Name:{0}'.format(student))
        ...:     for score in scores:
        ...:         print(score)
        ...:
    
    In [16]: printSores("Tom", 100, 98, 95, 92, 99)
    Student Name:Tom
    100
    98
    95
    92
    99
    

    同时推荐Python传入不固定的参数给函数,或者传入很多的内容给函数,常用在构造函数中可以看看实例加以运行理解,这里不多赘述。

    • 多个线程并发访问连接池使用的Queue模块

      Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。

      常用方法:

      • Queue.qsize() 返回队列的大小
      • Queue.empty() 如果队列为空,返回True,反之False
      • Queue.full() 如果队列满了,返回True,反之False,Queue.full 与 maxsize 大小对应
      • Queue.get([block[, timeout]])获取队列,timeout等待时间
      • Queue.get_nowait() 相当于Queue.get(False),非阻塞方法
      • Queue.put(item) 写入队列,timeout等待时间
      • Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号。每个get()调用得到一个任务,接下来task_done()调用告诉队列该任务已经处理完毕。
      • Queue.join() 实际上意味着等到队列为空,再执行别的操作
    • Python中的fetchone()fetchall()方法

      • fetchone() :

        返回单个的元组,也就是一条记录(row),如果没有结果 , 则返回 None

        cur.execute("select user,password from user where user='%s'" %name)
        arr= cur.fetchone()   
        ----此时 通过 arr[0],arr[1]可以依次访问user,password
        
      • fetchall() :

        返回多个元组,即返回多条记录(rows) 返回的是二维元组,如果没有结果,则返回 ()

        cur.execute("select * from user")
        
    1.2 处理MySQL数据库连接异常

    处理数据库连接时,MySQLdb可能会抛出两个异常,分别是ProgrammingErrorOperationalError.

    • ProgrammingError 表示SQL语句存在语法问题
    • OperationalError表示数据库连接中断。连接终端时,我们要重新创建连接。
    1.3 使用装饰器检查参数 util.py
    # -*- coding: utf8 -*-
    import inspect
    import logging
    import functools
    import re
    
    import psutil
    
    LOG = logging.getLogger(__name__)
    
    
    def lower_case_with_underscores(name):
        """
        convert camel case to under_line case
        CamelCase -> camel_case
        link: (http://stackoverflow.com/questions/1175208/
        elegant-python-function-to-convert-camelcase-to-camel-case)
        """
        s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
        return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
    
    
    def get_disk_capacity(path):
        """
        通过MySQL的变量datadir获取数据盘的路径,再使用psutil获取数据盘的空间
        In [1]: import psutil
    
        In [2]: psutil.disk_usage('/ebs/mysql_data')
        Out[2]: sdiskusage(total=214643507200, used=16532504576, free=198111002624, percent=7.7)
        """
        return psutil.disk_usage(path).total
    
    
    def check_required_args(parameters):
        """check parameters of action"""
        def decorated(f):
            """decorator"""
            @functools.wraps(f)
            def wrapper(*args, **kwargs):
                """wrapper"""
                # inspect.getcallargs(func[, *args][, **kwds]):
                # 将args和kwds参数到绑定到为func的参数名,作为func函数形参的值;
                func_args = inspect.getcallargs(f, *args, **kwargs)
                kwargs = func_args.get('kwargs')
                for item in parameters:
                    if kwargs.get(item) is None:
                        message = "check required args failed, `{0}` is not found in {1}".format(item, f.__name__)
                        LOG.error(message)
                        raise Exception(message)
    
                return f(*args, **kwargs)
            return wrapper
        return decorated
    
    
    • 用装饰器来实现,函数参数的强制类型检查。就不用为每个消息都编写一个检查器了

    • functools.wraps定义函数装饰器

      参照于出处。理解Python装饰器(Decorator)

      装饰器本质上是一个 Python 函数或类,它可以让其他函数或类在不需要做任何代码修改的前提下增加额外功能。

      它经常用于有切面需求的场景,比如:插入日志、性能测试、事务处理、缓存、权限校验等场景

      • 第一种:普通不带参数的装饰器

        def use_logging(func):
        
            def wrapper():
                logging.warn("%s is running" % func.__name__)
                return func()   # 把 foo 当做参数传递进来时,执行func()就相当于执行foo()
            return wrapper
        
        def foo():
            print('i am foo')
        
        foo = use_logging(foo)  # 因为装饰器 use_logging(foo) 返回的时函数对象 wrapper,这条语句相当于  foo = wrapper
        foo()                   # 执行foo()就相当于执行 wrapper()
        

        use_logging 就是一个装饰器,它一个普通的函数,它把执行真正业务逻辑的函数 func 包裹在其中,看起来像 foo 被 use_logging 装饰了一样,use_logging 返回的也是一个函数,这个函数的名字叫 wrapper。在这个例子中,函数进入和退出时 ,被称为一个横切面,这种编程方式被称为面向切面的编程。

        @ 符号就是装饰器的语法糖,它放在函数开始定义的地方,这样就可以省略最后一步再次赋值的操作。

    def use_logging(func):
    
        def wrapper():
            logging.warn("%s is running" % func.__name__)
            return func()
        return wrapper
    
    @use_logging
    def foo():
        print("i am foo")
    
    foo()
    

    输出

    In [71]: foo()
    WARNING:root:foo is running
    i am foo
    

    有了 @ ,我们就可以省去foo = use_logging(foo)这一句了

    直接调用 foo() 即可得到想要的结果。

    • 业务逻辑函数 foo 需要参数的情况:

      def foo(name):
          print("i am %s" % name)
      

      我们可以在定义 wrapper 函数的时候指定参数:

      def wrapper(name):
              logging.warn("%s is running" % func.__name__)
              return func(name)
          return wrapper
      

      这样 foo 函数定义的参数就可以定义在 wrapper 函数中。

      当装饰器不知道 foo 到底有多少个参数时,我们可以用*args来代替:

      def wrapper(*args):
              logging.warn("%s is running" % func.__name__)
              return func(*args)
          return wrapper
      

      如果 foo 函数还定义了一些关键字参数呢?比如:

      def foo(name, age=None, height=None):
          print("I am %s, age %s, height %s" % (name, age, height))
      

      这时,你就可以把 wrapper 函数指定关键字函数:

      def wrapper(*args, **kwargs):
              # args是一个数组,kwargs一个字典
              logging.warn("%s is running" % func.__name__)
              return func(*args, **kwargs)
          return wrapper
      
      • 第二种:带参数的装饰器

      装饰器还有更大的灵活性,例如带参数的装饰器,在上面的装饰器调用中,该装饰器接收唯一的参数就是执行业务的函数 foo 。装饰器的语法允许我们在调用时,提供其它参数,比如@decorator(a)。这样,就为装饰器的编写和使用提供了更大的灵活性。比如,我们可以在装饰器中指定日志的等级,因为不同业务函数可能需要的日志级别是不一样的。

      def use_logging(level):
          def decorator(func):
              def wrapper(*args, **kwargs):
                  if level == "warn":
                      logging.warn("%s is running" % func.__name__)
                  elif level == "info":
                      logging.info("%s is running" % func.__name__)
                  return func(*args)
              return wrapper
      
          return decorator
      
      @use_logging(level="warn")
      def foo(name='foo'):
          print("i am %s" % name)
      
      foo()
      

      上面的 use_logging 是允许带参数的装饰器。它实际上是对原有装饰器的一个函数封装,并返回一个装饰器。我们可以将它理解为一个含有参数的闭包。当我 们使用@use_logging(level="warn")调用的时候,Python 能够发现这一层的封装,并把参数传递到装饰器的环境中。

      @use_logging(level="warn") 等价于 @decorator

    1.4 利用Python的动态语言特性执行命令
    • Python内置的dir函数会返回属性的列表

      In [72]: class Person(object):
          ...:     def __init__(self, name):
          ...:         self.name = name
          ...:     def get_first_name(self):
          ...:         return self.name.split()[0]
          ...:     def get_last_name(self):
          ...:         return self.name.split()[-1]
          ...:
      
      In [73]: jason = Person('Jason Statham')
      
      In [74]: dir(jason)
      Out[74]:
      ['__class__',
       '__delattr__',
       '__dict__',
       '__doc__',
       '__format__',
       '__getattribute__',
       '__hash__',
       '__init__',
       '__module__',
       '__new__',
       '__reduce__',
       '__reduce_ex__',
       '__repr__',
       '__setattr__',
       '__sizeof__',
       '__str__',
       '__subclasshook__',
       '__weakref__',
       'get_first_name',
       'get_last_name',
       'name']
      
    • 已知jason对象所拥有的属性,我们可以用hasattrgetattr函数来测试某个属性是否存在并获取该属性,如下所示:

      In [76]: hasattr(jason, 'get_first_name')
      Out[76]: True
      
      In [77]: action = getattr(jason, 'get_first_name')
      
      In [78]: action()
      Out[78]: 'Jason'
      
      In [79]: action = getattr(jason, 'get_last_name')
      
      In [80]: action()
      Out[80]: 'Statham'
      
    • 在MySQL健康检查器中,我们利用了Python的自省功能,用hasattr函数判断客户端是否拥有相应的属性,然后调用getattr来获得该消息的处理函数。这样就免去了我们使用if/else语句处理不同的消息

    1.5 利用__call__方法实现可调用对象

    如果在类中实现了 call 方法,那么实例对象也将成为一个可调用对象。也就是说,我们可以像普通函数一样调用一个类对象。

    class CheckSafeReplication(object):
    
        def __init__(self, params):
            self.params= params
    
        def get_slave_status(self):
            res = {}
            slave_status_dict = Env.database.get_slave_status_dict()
            res['slave_io_running'] = slave_status_dict['Slave_IO_Running']
            res['slave_sql_running'] = slave_status_dict['Slave_SQL_Running']
            res['last_io_error'] = slave_status_dict['Last_IO_Error']
            res['last_sql_error'] = slave_status_dict['Last_SQL_Error']
    
            return res
    
        def __call__(self):
            res = dict(is_slave=Env.database.is_slave)
    
            if Env.database.is_slave:
                res.update(Env.database.get_multi_variables_value('relay_log_recovery',
                                                                  'relay_log_info_repository'))
    
                res.update(self.get_slave_status())
    
            return res
    

    因为CheckSafeReplication实现了__call__方法,因为我们可以向函数一样调用CheckSafeReplication的类对象,如下所示:

    def check_safe_replication(msg):
        obj = CheckSafeReplication(msg)
        return obj()
    

    详细解释可看Python __call__ 详解

    1.6 Python 的 property

    在Python中没有像Java那样的getter和setter的用法。

    因为我们可以直接修改对象的属性,如下所示:

    In [97]: class Person(object):
        ...:     def __init__(self, name, age):
        ...:         self.name = name
        ...:         self.age = age
        ...:
    
    In [98]: jason = Person('Jason Statham' , 50)
    
    In [99]: jason.age = -1
    
    In [100]: jason.age
    Out[100]: -1
    

    上面这段程序中,虽然age的取值从 50 变成了 -1 , 但是不符合逻辑,年龄和可能为-1,为了避免这种错误发生,我们可以参照java,项属性设置为私有,并提供一个gettersetter

    In [102]: class Person(object):
         ...:     def __init__(self, name, age):
         ...:         self.name = name
         ...:         self._age = age
         ...:     def get_age(self):
         ...:         return self._age
         ...:     def set_age(self, age):
         ...:         if age < 0 or age > 100:
         ...:             raise ValueError('age is illegal')
         ...:         self._age = age
    

    上面这种方法我们实现了像java中的set和get方法,但是不够Pythonic(不够Python范儿)。

    我们也可以很Python范儿,就是使用property装饰器将方法当做属性访问,从而提供更加友好的访问方式。如下:

    In [110]: class Person(object):
         ...:     def __init__(self, name, age):
         ...:         self.name = name
         ...:         self.age = age
         ...:     @property
         ...:     def age(self):
         ...:         return self._age
         ...:     @age.setter
         ...:     def age(self, age):
         ...:         if age < 0 or age > 100:
         ...:             raise ValueError('age is illegal')
         ...:         self._age = age
         ...:
    
    In [111]: jason = Person('Jason Statham' , 50)
    
    In [112]: jason.age = -1
    ---------------------------------------------------------------------------
    ValueError                                Traceback (most recent call last)
    <ipython-input-112-8f6f46022edd> in <module>()
    ----> 1 jason.age = -1
    
    <ipython-input-110-7a772ee4ed7c> in age(self, age)
          9     def age(self, age):
         10         if age < 0 or age > 100:
    ---> 11             raise ValueError('age is illegal')
         12         self._age = age
         13
    
    ValueError: age is illegal
    
    In [113]: jason.age = 20
    
    In [114]: jason.age
    Out[114]: 20
    

    上面这段代码中,age.setter装饰器为age属性创建了一个setter方法。当我们修改age时,这个setter方法将会自动调用。

    从0到1:Python打造MySQL专家系统(2)

    2. 数据库专家系统的服务端设计(server端)

    2.1 将相同的操作提升到父类中

    本数据库专家系统分为很多个检查项,每个检查项里面包含多个检查点。对于每一个检查项,在专家系统的服务端中都是一个Worker。每个Work本身独立,又有一些相同点,相同的地方比如打日志,处理异常。这个时候,我们可以把共性提取出来,在父类中实现(GenericWorker)。每一个子类Worker只需要继承GenericWorker这个父类,就实现了记录时间,打印日志和处理异常。这样一来,每个Worker就可以专注于具体的业务逻辑。

    检查项的继承关系
    2.2 在Python中实现map-reduce模型

    为每个Worker分配一个线程,等线程结束之后再将各个Worker的结果汇总。这是一个典型的map-reduce框架。在map阶段,将各个任务分发出去;在reduce阶段,将多个任务的执行结果汇总起来。

    我们举个栗子,以1-100的和为例说明在Python中实现一个map-reduce框架。首先我们将1~100分成10个区间,然后将每个区间分配给1个线程执行。我们一共要使用10个线程,等待10个线程都结束以后我们再将所有的结果汇总起来。

    In [1]: def Cal(object):
         ...:     def __init__(self, start, end):
         ...:         self.result = 0
         ...:         self.start = start
         ...:         self.end = end
         ...:     def map(self):
         ...:         for i in range(self.start, self.end):
         ...:             self.result += i
         ...:     def reduce(self, other):
         ...:         self.reduce += other.result
    

    在这个Cal类中,初始化了区间的起点和终点(startend)。

    map函数是我们的业务逻辑,在MySQL数据库专家系统中,我们将在map函数中进行数据库检查和评分。

    可以手动此处。一文搞懂python的map、reduce函数

    相关文章

      网友评论

          本文标题:从0到1:Python打造MySQL专家系统

          本文链接:https://www.haomeiwen.com/subject/eigjqhtx.html