美文网首页
python连接mysql连接池.md

python连接mysql连接池.md

作者: 未然猜 | 来源:发表于2019-10-10 11:00 被阅读0次
# !/usr/bin/env python
# -*- coding: utf-8 -*-

import traceback
import logging
import pymysql
from queue import Queue
import threading
import time
import re
import datetime
import os
import uuid

log = logging.getLogger(__name__)


class PoolError(Exception):
    """连接异常类"""
    pass


class PooledConnection(object):
    """连接池"""

    def __init__(self, connection_strings, max_count=10, min_free_count=1, monitor_log=False):
        self._max_count = max_count
        self._min_free_count = min_free_count
        self._connection_strings = connection_strings
        self._count = 0
        self._queue = Queue(max_count)
        self._lock = threading.Lock()

        if monitor_log:
            self._run_monitor()

    def __del__(self):
        while not self._queue.empty():

            conn = self._queue.get()

            if conn:
                self._close_connection(conn)
            else:
                break

    def _run_monitor(self):
        """日志定时器"""
        def process(p):
            log.info('pool connection state:pid:%s, max_count:%s,min_free_count:%s,count:%s,free_count:%s' %
                     (os.getpid(), p._max_count, p._min_free_count, p._count, p._queue.qsize()))

        t = threading.Timer(5.0, process, args=(self,))
        t.start()
        t.join()

    def _create_connection(self, autoCommit=True):
        if self._count >= self._max_count:
            raise PoolError('The maximum number of connections beyond!')
        conn = Connection(self, host=self._connection_strings.get('host'),
                          port=self._connection_strings.get('port'),
                          user=self._connection_strings.get('user'),
                          password=self._connection_strings.get('password'),
                          db=self._connection_strings.get('database'),
                          charset='utf8',
                          autocommit=autoCommit,
                          cursorclass=pymysql.cursors.DictCursor)
        self._count += 1
        return conn

    def release_connection(self, connection):
        """释放连接"""
        self._lock.acquire()
        if self._queue.qsize() >= self._min_free_count:
            self._close_connection(connection)
        else:
            self._queue.put(connection)
        self._lock.release()

    def get_connection(self, timeout=15):
        """获取一个连接"""
        bt = datetime.datetime.now()

        def get_conn():
            self._lock.acquire()
            try:
                # 自动补充连接池连接
                if not self._queue.empty():
                    conn = self._queue.get()
                elif self._count < self._max_count:
                    conn = self._create_connection()
                else:
                    conn = None
                return conn
            except:
                raise
            finally:
                self._lock.release()

        conn = get_conn()
        if conn:
            return conn
        else:
            if timeout:
                while (datetime.datetime.now() - bt).seconds < timeout:
                    conn = get_conn()
                    if conn:
                        break
                    time.sleep(0.2)
            if not conn:
                raise PoolError('Timeout!There has no enough connection to be used!')
            return conn

    def _close_connection(self, connection):
        """关闭连接"""
        try:
            if connection._close():
                self._count -= 1
        except:
            pass


class Connection(object):
    """连接类"""
    PARAMERTS_REG = re.compile(r'\:([_0-9]*[_A-z]+[_0-9]*[_A-z]*)')

    def __init__(self, pool, *args, **kwargs):
        self._pool = pool
        self.id = uuid.uuid4()
        # 连不上数据库时,自动重试
        try:
            self._conn = pymysql.connections.Connection(*args, **kwargs)
            self.__is_closed = False
        except pymysql.err.OperationalError:
            self._conn = pymysql.connections.Connection(*args, **kwargs)
            self.__is_closed = False

    def __del__(self):
        """销毁连接"""
        self.drop()

    def execute(self, sql, args=None):
        """执行 sql"""
        cursor = self._conn.cursor()
        sqlText = self.PARAMERTS_REG.sub(r'%(\1)s', sql)
        modelAttrs = []
        result = self.PARAMERTS_REG.finditer(sql)
        for m in result:
            modelAttrs.append(m.group(1))

        def filter_args(modelAttrs, m):
            if m is None:
                return None
            return {a: m[a] for a in modelAttrs}

        if args and isinstance(args, list):
            cursor.executemany(sqlText, [filter_args(modelAttrs, a) for a in args])
        else:
            cursor.execute(sqlText, filter_args(modelAttrs, args))
        return cursor

    def insert(self, sql, args=None):
        """插入记录"""
        cursor = None
        try:
            cursor = self.execute(sql, args)
            row_id = cursor.lastrowid
            return row_id
        except:
            raise
        finally:
            cursor and cursor.close()

    def update(self, sql, args=None):
        """更新记录"""
        cursor = None
        try:
            cursor = self.execute(sql, args)
            row_count = cursor.rowcount
            if not row_count:
                log.debug(cursor._last_executed)
            return row_count
        except:
            raise
        finally:
            cursor and cursor.close()

    def delete(self, sql, args=None):
        """删除记录"""
        cursor = None
        try:
            cursor = self.execute(sql, args)
            row_count = cursor.rowcount
            return row_count
        except:
            raise
        finally:
            cursor and cursor.close()

    def query(self, sql, args=None):
        """查询"""
        cursor = None
        try:
            cursor = self.execute(sql, args)
            return cursor.fetchall()
        except:
            raise
        finally:
            cursor and cursor.close()

    def query_one(self, sql, args=None):
        """查询返回一条数据"""
        cursor = None
        try:
            cursor = self.execute(sql, args)
            return cursor.fetchone()
        except:
            raise
        finally:
            cursor and cursor.close()

    def release(self):
        """释放连接,将连接放回连接池"""
        self._pool.release_connection(self)

    def close(self):
        """释放连接,将连接放回连接池"""
        self.release()

    def drop(self):
        """丢弃连接"""
        self._pool._close_connection(self)

    def _close(self):
        """真正关闭"""
        if self.__is_closed:
            return False
        try:
            self._conn.close()
            self.__is_closed = True
        except:
            log.error(traceback.format_exc())
        return True


class MySQLdb(object):
    """mysql 的数据库操作类,支持连接池"""

    def __init__(self, cfg):
        self.config = cfg
        self._pool = PooledConnection(self.config, self.config.get('maxConnections'),
                                      self.config.get('minFreeConnections', 1))

    def execute(self, sql, args=None):
        """执行 sql"""
        cursor = None
        conn = None
        try:
            try:
                conn = self._pool.get_connection()
                cursor = conn.execute(sql, args)
            except (pymysql.err.OperationalError, RuntimeError):
                log.error('execute error ready to retry', exc_info=1)
                conn and conn.drop()
                conn = self._pool.get_connection()
                cursor = conn.execute(sql, args)
        except (pymysql.err.InterfaceError, pymysql.err.IntegrityError):
            raise
        except:
            log.error('execute sql error:', exc_info=1)
            conn and conn.drop()
            conn = None
        finally:
            conn and conn.release()
        return cursor

    def insert(self, sql, args=None):
        """插入记录"""
        cursor = None
        try:
            cursor = self.execute(sql, args)
            row_id = cursor.lastrowid
            return row_id
        except:
            raise
        finally:
            cursor and cursor.close()

    def update(self, sql, args=None):
        """更新记录"""
        cursor = None
        try:
            cursor = self.execute(sql, args)
            row_count = cursor.rowcount
            return row_count
        except:
            raise
        finally:
            cursor and cursor.close()

    def delete(self, sql, args=None):
        """删除记录"""
        cursor = None
        try:
            cursor = self.execute(sql, args)
            row_count = cursor.rowcount
            return row_count
        except:
            raise
        finally:
            cursor and cursor.close()

    def query(self, sql, args=None):
        """查询"""
        cursor = None
        try:
            cursor = self.execute(sql, args)
            return cursor.fetchall()
        except:
            raise
        finally:
            cursor and cursor.close()

    def query_one(self, sql, args=None):
        """查询返回一条数据"""
        cursor = None
        try:
            cursor = self.execute(sql, args)
            return cursor.fetchone()
        except:
            raise
        finally:
            cursor and cursor.close()

    def begin(self):
        """开启并返回一个事务"""
        tran = Transaction(self._pool.get_connection())
        tran.begin()
        return tran

    def commit(self, tran):
        """提交事务"""
        return tran.commit()

    def rollback(self, tran):
        """回滚事务"""
        return tran.rollback()


class Transaction(object):
    """事务类"""

    def __init__(self, conn):
        self.__isBegan = False
        self.conn = conn
        self.__old_autocommit = self.conn._conn.get_autocommit()
        self.conn._conn.autocommit(False)

    def begin(self):
        """开启事务"""
        if not self.__isBegan:
            self.conn._conn.begin()
            self.__isBegan = True

    def commit(self):
        """提交事务"""
        self.conn._conn.commit()
        self.__isBegan = False
        self._finished()

    def rollback(self):
        """回滚事务"""
        self.conn._conn.rollback()
        self.__isBegan = False
        self._finished()

    def _finished(self):
        self.__reset_autocommit()
        self.conn.release()

    def __reset_autocommit(self):
        """将连接的自动提交设置重置回原来的设置"""
        self.conn._conn.autocommit(self.__old_autocommit)

    def __enter__(self):
        self.begin()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if exc_type is not None:
            self.rollback()
        else:
            self.commit()


if __name__ == '__main__':
    opts = {
        "host": "localhost",
        "port": 3306,
        "database": "hcndc",
        "user": "root",
        "password": "123456",
        "maxConnections": 10
    }
    db = MySQLdb(opts)
    result = db.query('SHOW DATABASES')
    print(result)

相关文章

网友评论

      本文标题:python连接mysql连接池.md

      本文链接:https://www.haomeiwen.com/subject/ujyopctx.html