python-MySQLdb的二三事

作者: 若与 | 来源:发表于2017-01-13 13:00 被阅读1725次
    追寻
    介绍

    mysqldb是python操作mysql数据库的一个库.mysql的几乎所有的操作都可以实现,另外,mysqldb的一些比较的option让数据操作更符合pythonic风格.在python2系列使用Mysqldb,在python3系列使用pymysql和mysql.connect.

    Mysqldb的安装

    下面按python2系列安装

    1. pip方式安装
    pip install MySQL-python
    
    2. yum安装
    sudo yum install python-mysqldb
    
    3. apt安装
    sudo apt-get install  python-mysqldb
    
    4.源码安装(这里就不介绍了,源码地址给出)
    https://pypi.python.org/packages/a5/e9/51b544da85a36a68debe7a7091f068d802fc515a3a202652828c73453cad/MySQL-python-1.2.5.zip#md5=654f75b302db6ed8dc5a898c625e030c
    
    Mysqldb的使用介绍

    我们直接操作mysql的流程如下:
    1.使用mysql-client连上数据库
    mysql -uuser_anme -ppassword -hhost_name
    2.再执行具体的sql语句,获取所需的数据
    >use db_name;
    >select * from table_name;

    python-mysqldb的使用方式和上面的流程是一样的.下面进入正题.

    1. 连接数据库

    import MySQLdb
    conn = MySQLdb.connect(db='database', host='172.16.0.1', user='user', passwd='password', port=3306)
    上面只是进行了数据库的连接
    

    2. 游标对象

    cur = conn.cursor()
    

    3. 执行语句

    sql = '****'
    cur.execute(sql)
    

    介绍一下具体的对象的method:

    下面介绍一下method的使用

    connect对象
    conn = MySQLdb.connect(db='database', host='172.16.0.1', user='user', passwd='password', port=3306)
    

    connect()的参数列表

    参数 描述
    user username
    passwd password
    host hostname
    database databasename
    dsn data source name
    port 端口,int
    conv 数据转换
    charset 字符集

    其他的一些参数:unix_socket,compress,connect_timeout,named_pipe,
    init_command,read_default_file,read_default_group,cursorclass,use_unicode,sql_mode,ssl

    下面详细介绍一下conv这个参数
    介绍一个类型对象的概念,通常不同的系统的接口要求的参数类型是不一致的,譬如python调用c函数时python对象和c类型之间就需要进行数据格式的转换.所以,在python对象和原生数据库对象之间也需要进行数据格式的转换.
    在MySQLdb.converters.conversions中

    encoders = {
            bool: escape_bool,
            int: escape_int,
            long_type: escape_int,
            float: escape_float,
            str: escape_str,
            text_type: escape_unicode,
            tuple: escape_sequence,
            list: escape_sequence,
            set: escape_sequence,
            dict: escape_dict,
            type(None): escape_None,
            datetime.date: escape_date,
            datetime.datetime: escape_datetime,
            datetime.timedelta: escape_timedelta,
            datetime.time: escape_time,
            time.struct_time: escape_struct_time,
            Decimal: str,
            }
    decoders = {
            FIELD_TYPE.BIT: convert_bit,
            FIELD_TYPE.TINY: int,
            FIELD_TYPE.SHORT: int,
            FIELD_TYPE.LONG: int,
            FIELD_TYPE.FLOAT: float,
            FIELD_TYPE.DOUBLE: float,
            FIELD_TYPE.DECIMAL: float,
            FIELD_TYPE.NEWDECIMAL: float,
            FIELD_TYPE.LONGLONG: int,
            FIELD_TYPE.INT24: int,
            FIELD_TYPE.YEAR: int,
            FIELD_TYPE.TIMESTAMP: convert_mysql_timestamp,
            FIELD_TYPE.DATETIME: convert_datetime,
            FIELD_TYPE.TIME: convert_timedelta,
            FIELD_TYPE.DATE: convert_date,
            FIELD_TYPE.SET: convert_set,
            FIELD_TYPE.BLOB: through,
            FIELD_TYPE.TINY_BLOB: through,
            FIELD_TYPE.MEDIUM_BLOB: through,
            FIELD_TYPE.LONG_BLOB: through,
            FIELD_TYPE.STRING: through,
            FIELD_TYPE.VAR_STRING: through,
            FIELD_TYPE.VARCHAR: through,
            FIELD_TYPE.DECIMAL: Decimal,
            FIELD_TYPE.NEWDECIMAL: Decimal,
            }
    

    下面来说说,自己如何自定义使用:

    from MySQLdb.constants import FIELD_TYPE
    
    my_conv = { 
        FIELD_TYPE.LONG: int,   # 长整型转成int,默认数据后面有一个L,去掉
        FIELD_TYPE.DATE: str    # 日期转成字符串
        }
    conn = MySQLdb.connect(db='database', host='172.16.0.1', user='user', passwd='password', port=3306,conv=my_conv)
    

    开始扩展connect对象的方法

    方法名 描述
    close() 关闭连接
    commit() 提交当前事务
    autocommit() 自动提交事务
    rollback() 取消当前事务
    cursor() 实例一个游标对象
    errorhandler(cxn,cur,errcls,errval) 作为已给游标的句柄

    这里介绍一下事务
    先来举个小例子:

    conn = MySQLdb.connect(*args, **kwags)
    cur = conn.cursor()
    sql = "insert into tb_name values (%(id)s,%(name)s,%(age)s);" % {'id': 1, 'name': 'ruyu', 'age': 99}
    cur.excute(sql)
    cur.close()
    conn.close()
    

    插入一行数据,上面操作是不生效的
    需要添加一行

    conn.commit()
    
    补充知识: 数据库事务

    数据库事务(Database Transaction) ,是指作为单个逻辑工作单元执行的一系列操作,要么完全地执行,要么完全地不执行。
    事务处理可以确保除非事务性单元内的所有操作都成功完成,否则不会永久更新面向数据的资源。
    事务的属性:必须满足ACID

    1. (Atomic)(Atomicity)原子性
    2. (Consistent)(Consistency)一致性
    3. (Insulation)(Isolation)隔离性
    4. (Duration)(Durability) 持久性

    顺便提一下mysql的隔离级别吧! **

    1. Read Uncommitted(读取未提交内容)
    2. Read Committed(读取提交内容)
    3. Repeatable Read(可重读)
    4. Serializable(可串行化)

    这里涉及一些脏读,幻读,不可重复读的概念,希望读者自己百度,找个教程,做一下数据库事务的实验,理解一下事务的概念.

    让心去旅行

    游标对象

    cur = MySQLdb.connect(*args, **kwags).cursor()
    

    先介绍一下cursor()的函数

        def cursor(self, cursorclass=None):
            return (cursorclass or self.cursorclass)(self)
    

    其中一个参数是cursorclass
    在MySQLdb.cursors中有所有的cursorclass
    分别如下:

    BaseCursor,CursorStoreResultMixIn,CursorUseResultMixIn,CursorTupleRowsMixIn,CursorDictRowsMixIn,CursorOldDictRowsMixIn,CursorDictRowsMixIn,CursorOldDictRowsMixIn,Cursor,DictCursor,SSCursor,SSDictCursor

    经常使用的应该是DictCursor,Cursor
    一种是返回的数据以字典格式,一种是tuple格式.

    下面介绍cursor对象的属性和方法

    对象属性和方法 描述
    arraysize 使用fetchmany()方法一次取出多少条记录,默认值为1
    connection 创建此游标对象的连接connect对象
    description 返回游标活动状态(七个元素的元祖):(name,type_code,display_size,interal_size,precision,scale,null_ok);只有name和type_code是必须提供的
    lastrowid 返回最后更新的id(可选),适用场景,在插入数据,返回插入数据的id
    rowcount 最后一次execute()操作返回或影响的行数
    callproc(func[,args]) 调用一个存储过程
    close() 关闭游标对象
    execute(op[,args]) 执行一个数据库查询或命令
    executemany(op,args) 类似execute()和map()的结合,为给定的每一个参数准备并执行一个数据库的查询/命令
    fetchone() 得到结果集的下一行
    fetchmany([size=cursor.arraysize]) 得到结果集的下几行
    fetchall() 返回所有的结果
    _iter_() 创建一个迭代对象
    messages 游标执行后数据库返回的信息列表
    next() 使用迭代对象得到结果集的下一行
    nextset() 移到下一个结果集
    rownumber 当前结果集中游标的索引
    setinput-sizes(sizes) 设置输入最大值
    setoutput-size(size[,col]) 设置大列的缓冲区大写

    提示: 使用最多的就是execute和fetch.

    上面的介绍差不多了,这里给出一个mysqldb的封装的库torndb的源码:

    #!/usr/bin/env python
    from __future__ import absolute_import, division, with_statement
    
    import copy
    import logging
    import os
    import time
    
    #下面是解决python2和python3的mysqldb不同的
    try:
        import MySQLdb.constants
        import MySQLdb.converters
        import MySQLdb.cursors
    except ImportError:
        try:
            import pymysql as MySQLdb
        except ImportError:
            if 'READTHEDOCS' in os.environ:
                MySQLdb = None
            else:
                raise
    
    version = "0.3"
    version_info = (0, 3, 0, 0)
    
    
    # 定义一个connect类,可以实例connect()对象
    class Connection(object):
        def __init__(self, host, database, user=None, password=None,
                     max_idle_time=7 * 3600, connect_timeout=0,
                     time_zone="+0:00", charset="utf8", sql_mode="TRADITIONAL",
                     **kwargs):
            self.host = host
            self.database = database
            self.max_idle_time = float(max_idle_time)
    
            args = dict(conv=CONVERSIONS, use_unicode=True, charset=charset,
                        db=database, init_command=('SET time_zone = "%s"' % time_zone),
                        connect_timeout=connect_timeout, sql_mode=sql_mode, **kwargs)
            if user is not None:
                args["user"] = user
            if password is not None:
                args["passwd"] = password
    
            # We accept a path to a MySQL socket file or a host(:port) string
            if "/" in host:
                args["unix_socket"] = host
            else:
                self.socket = None
                pair = host.split(":")
                if len(pair) == 2:
                    args["host"] = pair[0]
                    args["port"] = int(pair[1])
                else:
                    args["host"] = host
                    args["port"] = 3306
    
            self._db = None
            self._db_args = args
            self._last_use_time = time.time()
            try:
                self.reconnect()
            except Exception:
                logging.error("Cannot connect to MySQL on %s", self.host,
                              exc_info=True)
    
        def __del__(self):  # 删除和关闭
            self.close()
    
        def close(self):  # 关闭并置位空
            if getattr(self, "_db", None) is not None:
                self._db.close()
                self._db = None
    
        def reconnect(self): # 重新连接
            self.close()
            self._db = MySQLdb.connect(**self._db_args)
            self._db.autocommit(True)  # 开启自动提交的功能
    
        def iter(self, query, *parameters, **kwparameters):  # 进行迭代的
            self._ensure_connected()
            cursor = MySQLdb.cursors.SSCursor(self._db)
            try:
                self._execute(cursor, query, parameters, kwparameters)
                column_names = [d[0] for d in cursor.description]
                for row in cursor:
                    yield Row(zip(column_names, row))
            finally:
                cursor.close()
    
        def query(self, query, *parameters, **kwparameters):  # 进行执行,返回结果
            """Returns a row list for the given query and parameters."""
            cursor = self._cursor()
            try:
                self._execute(cursor, query, parameters, kwparameters)
                column_names = [d[0] for d in cursor.description]
                return [Row(zip(column_names, row)) for row in cursor]
            finally:
                cursor.close()
    
        def get(self, query, *parameters, **kwparameters):
            rows = self.query(query, *parameters, **kwparameters)
            if not rows:
                return None
            elif len(rows) > 1:
                raise Exception("Multiple rows returned for Database.get() query")
            else:
                return rows[0]
    
    
        def execute(self, query, *parameters, **kwparameters):
            return self.execute_lastrowid(query, *parameters, **kwparameters)
    
        def execute_lastrowid(self, query, *parameters, **kwparameters):
            cursor = self._cursor()
            try:
                self._execute(cursor, query, parameters, kwparameters)
                return cursor.lastrowid
            finally:
                cursor.close()
    
        def execute_rowcount(self, query, *parameters, **kwparameters):
            cursor = self._cursor()
            try:
                self._execute(cursor, query, parameters, kwparameters)
                return cursor.rowcount
            finally:
                cursor.close()
    
        def executemany(self, query, parameters):
            return self.executemany_lastrowid(query, parameters)
    
        def executemany_lastrowid(self, query, parameters):
            cursor = self._cursor()
            try:
                cursor.executemany(query, parameters)
                return cursor.lastrowid
            finally:
                cursor.close()
    
        def executemany_rowcount(self, query, parameters):
            cursor = self._cursor()
            try:
                cursor.executemany(query, parameters)
                return cursor.rowcount
            finally:
                cursor.close()
    
        update = delete = execute_rowcount
        updatemany = executemany_rowcount
    
        insert = execute_lastrowid
        insertmany = executemany_lastrowid
    
        def _ensure_connected(self):
            if (self._db is None or
                    (time.time() - self._last_use_time > self.max_idle_time)):
                self.reconnect()
            self._last_use_time = time.time()
    
        def _cursor(self):
            self._ensure_connected()
            return self._db.cursor()
    
        def _execute(self, cursor, query, parameters, kwparameters):
            try:
                return cursor.execute(query, kwparameters or parameters)
            except OperationalError:
                logging.error("Error connecting to MySQL on %s", self.host)
                self.close()
                raise
    
    
    class Row(dict):
        def __getattr__(self, name):
            try:
                return self[name]
            except KeyError:
                raise AttributeError(name)
    
    
    if MySQLdb is not None:
        FIELD_TYPE = MySQLdb.constants.FIELD_TYPE
        FLAG = MySQLdb.constants.FLAG
        CONVERSIONS = copy.copy(MySQLdb.converters.conversions)
    
        field_types = [FIELD_TYPE.BLOB, FIELD_TYPE.STRING, FIELD_TYPE.VAR_STRING]
        if 'VARCHAR' in vars(FIELD_TYPE):
            field_types.append(FIELD_TYPE.VARCHAR)
    
        for field_type in field_types:
            CONVERSIONS[field_type] = [(FLAG.BINARY, str)] + CONVERSIONS[field_type]
        IntegrityError = MySQLdb.IntegrityError
        OperationalError = MySQLdb.OperationalError
    

    上面源码就不解释了,都是很基础的内容,很适合入门学习.

    这里说一个python-mysqldb遇到的问题,很复杂的sql语句,在mysql中有数据,但是在mysqldb第一次执行确有部分字段是None,第二次或后面都是没问题的,我也请教了我们的python大神,他说他遇到,t并且他说这不是bug.他不能重现问题,所以也没解决,我的缓兵之计如下:

        def query(self, query):
            cursor = self._cursor()
            try:
                cursor.execute(query)
                data = cursor.fetchall()
                cursor.execute(query)
                raw = cursor.fetchall()
                return raw
            finally:
                cursor.close()
    
    安静

    内容到这里就结束了!!!!
    预告一下,后续的内容: ORM,自己带着写一个ORM,Django ORM的queryset,manager以及一些定制化的,当然还有sqlalchemy.

    再见

    相关文章

      网友评论

      • ThinkerTrek:sudo yum install python-mysqldb不对,应该是yum install MySQL-python
        ThinkerTrek:@若与 昨天在将ubuntu下的脚本改为centos可运行,发现你这个是错的:stuck_out_tongue_winking_eye:
        若与:@yaoel 😂

      本文标题:python-MySQLdb的二三事

      本文链接:https://www.haomeiwen.com/subject/ovtsbttx.html