美文网首页数据库
mysql_conn连接封装

mysql_conn连接封装

作者: 陆_志东 | 来源:发表于2018-07-04 10:49 被阅读1次

如果想要使用下面的连接封装,只需要把下面的代码拷贝到一个py文件里面,作为一个工具类模块.哪个地方使用就导入这个模块,不过在使用这个模块的时候你可以考虑建立一个全局的mysqlconn对象,其他地方导入这个连接对象,这样就不用重复的建立断开mysql连接.如果觉得性能不够好,就使用mysql连接池的封装

import pymysql
import re

class MysqlConn():
    def __init__(self, logger, config):
        self.logger = logger
        self.config = config
        self.re_errno = re.compile(r'^\((\d+),')

        try:
            self.conn = pymysql.Connect(**self.config)
            self.logger.info("pymysql.Connect() ok, {0}".format(id(self.conn)))
        except Exception as e:
            raise e

    def __del__(self):
        self.close()

    def close(self):
        if self.conn:
            self.logger.info("conn.close() {0}".format(id(self.conn)))
            self.conn.close()


    def execute_query(self, sql_str, sql_params=(), first=True):
        res_list = None
        cur = None
        try:
            cur = self.conn.cursor()
            cur.execute(sql_str, sql_params)
            res_list = cur.fetchall()
        except Exception as e:
            err = str(e)
            self.logger.error('execute_query: {0}'.format(err))
            if first:
                retry = self._deal_with_network_exception(err)
                if retry:
                    # self.logger.error('retry "{0}"'.format(sql_str))
                    return self.execute_query(sql_str, sql_params, False)
        finally:
            if cur is not None:
                cur.close()
        return res_list

    def execute_write(self, sql_str, sql_params=(), first=True):
        cur = None
        n = None
        err = None
        try:
            cur = self.conn.cursor()
            n = cur.execute(sql_str, sql_params)
            # self.logger.info("{0}".format(cur.mogrify(sql_str, sql_params)))
        except Exception as e:
            err = str(e)
            self.logger.error('execute_query: {0}'.format(err))
            if first:
                retry = self._deal_with_network_exception(err)
                if retry:
                    return self.execute_write(sql_str, sql_params, False)
        finally:
            if cur is not None:
                cur.close()
        return n, err

    def _deal_with_network_exception(self, stre):
        errno_str = self._get_errorno_str(stre)
        if errno_str != '2006' and errno_str != '2013' and errno_str != '0':
            return False
        try:
            self.conn.ping()
        except Exception as e:
            return False
        return True

    def _get_errorno_str(self, stre):
        # https://www.cnblogs.com/skillCoding/archive/2011/09/07/2169932.html
        # str1 = '(2006, "MySQL server has gone away (BrokenPipeError(32, \'Broken pipe\'))")'
        searchObj = self.re_errno.search(stre)
        if searchObj:
            errno_str = searchObj.group(1)
        else:
            errno_str = '-1'
        return errno_str

config = {
        "host": "",
        "port": 3306,
        "user": "",
        "password": "",
        "db": "",
        "autocommit": True,
        # "cursorclass": pymysql.cursors.DictCursor,
        "charset": "utf8"
}
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(levelname)s] %(pathname)s-%(lineno)d %(message)s')  # 配置语句
logger_root = logging.getLogger('')  # 拿到一个logging对象,使用该对象打印日志,比如 logger_root.info("this is info data")

mysqlconn = MysqlConn(logger_root , config )

相关文章

网友评论

    本文标题:mysql_conn连接封装

    本文链接:https://www.haomeiwen.com/subject/dwxeuftx.html