如果想要使用下面的连接封装,只需要把下面的代码拷贝到一个py文件里面,作为一个工具类模块.哪个地方使用就导入这个模块,不过在使用这个模块的时候你可以考虑建立一个全局的mysqlconn对象,其他地方导入这个连接对象,这样就不用重复的建立断开mysql连接.如果觉得性能不够好,就使用mysql连接池的封装
import pymysql
import re
class MysqlConn():
def __init__(self, logger, config):
self.logger = logger
self.config = config
self.re_errno = re.compile(r'^\((\d+),')
try:
self.conn = pymysql.Connect(**self.config)
self.logger.info("pymysql.Connect() ok, {0}".format(id(self.conn)))
except Exception as e:
raise e
def __del__(self):
self.close()
def close(self):
if self.conn:
self.logger.info("conn.close() {0}".format(id(self.conn)))
self.conn.close()
def execute_query(self, sql_str, sql_params=(), first=True):
res_list = None
cur = None
try:
cur = self.conn.cursor()
cur.execute(sql_str, sql_params)
res_list = cur.fetchall()
except Exception as e:
err = str(e)
self.logger.error('execute_query: {0}'.format(err))
if first:
retry = self._deal_with_network_exception(err)
if retry:
# self.logger.error('retry "{0}"'.format(sql_str))
return self.execute_query(sql_str, sql_params, False)
finally:
if cur is not None:
cur.close()
return res_list
def execute_write(self, sql_str, sql_params=(), first=True):
cur = None
n = None
err = None
try:
cur = self.conn.cursor()
n = cur.execute(sql_str, sql_params)
# self.logger.info("{0}".format(cur.mogrify(sql_str, sql_params)))
except Exception as e:
err = str(e)
self.logger.error('execute_query: {0}'.format(err))
if first:
retry = self._deal_with_network_exception(err)
if retry:
return self.execute_write(sql_str, sql_params, False)
finally:
if cur is not None:
cur.close()
return n, err
def _deal_with_network_exception(self, stre):
errno_str = self._get_errorno_str(stre)
if errno_str != '2006' and errno_str != '2013' and errno_str != '0':
return False
try:
self.conn.ping()
except Exception as e:
return False
return True
def _get_errorno_str(self, stre):
# https://www.cnblogs.com/skillCoding/archive/2011/09/07/2169932.html
# str1 = '(2006, "MySQL server has gone away (BrokenPipeError(32, \'Broken pipe\'))")'
searchObj = self.re_errno.search(stre)
if searchObj:
errno_str = searchObj.group(1)
else:
errno_str = '-1'
return errno_str
config = {
"host": "",
"port": 3306,
"user": "",
"password": "",
"db": "",
"autocommit": True,
# "cursorclass": pymysql.cursors.DictCursor,
"charset": "utf8"
}
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(levelname)s] %(pathname)s-%(lineno)d %(message)s') # 配置语句
logger_root = logging.getLogger('') # 拿到一个logging对象,使用该对象打印日志,比如 logger_root.info("this is info data")
mysqlconn = MysqlConn(logger_root , config )
网友评论