美文网首页
python使用数据库连接池

python使用数据库连接池

作者: 托贝多尔 | 来源:发表于2022-08-02 23:42 被阅读0次

    脚本开发时常常会用到数据库访问的情况,目前业内也已经有了大量成熟的访问方案,现将平时工作中用到大经典案例总结如下。

    postgreSQL连接池

    from psycopg2 import pool
    from psycopg2.extras import RealDictCursor
    from contextlib import contextmanager
    import atexit
     
     
    class DBHelper:
        def __init__(self):
            self._connection_pool = None
     
        def initialize_connection_pool(self):
            db_dsn = 'postgresql://admin:password@localhost/testdb?connect_timeout=5'
            self._connection_pool = pool.ThreadedConnectionPool(1, 3,db_dsn)
     
        @contextmanager
        def get_resource(self, autocommit=True) -> Union[RealDictCursor, tuple[RealDictCursor, connection]]:
            if self._connection_pool is None:
                self.initialize_connection_pool()
     
            conn = self._connection_pool.getconn()
            conn.autocommit = autocommit
            cursor = conn.cursor(cursor_factory=RealDictCursor)
            try:
                if autocommit:
                    yield cursor
                else:
                    yield cursor, conn
            finally:
                cursor.close()
                self._connection_pool.putconn(conn)
     
        def shutdown_connection_pool(self):
            if self._connection_pool is not None:
                self._connection_pool.closeall()
     
     
    db_helper = DBHelper()
     
     
    @atexit.register
    def shutdown_connection_pool():
        db_helper.shutdown_connection_pool()
    # 一般用法
    from db_helper import db_helper
    with db_helper.get_resource() as cursor:
        cursor.execute('select * from users')
        for record in cursor.fetchall():
            ... process record, record['name'] ... 
    # 事务用法
    with db_helper.get_resource(autocommit=False) as (cursor, connection):
        try:
            cursor.execute('update users set name = %s where id = %s', ('new_name', 1))
            cursor.execute('delete from orders where user_id = %s', (1,))
            conn.commit()
        except:
            conn.rollback()
    

    mysql连接池:

    
    import pymysql
    from g_conf.config import config_template
    from DBUtils.PooledDB import PooledDB
     
     
    class MysqlPool:
        config = {
            'creator': pymysql,
            'host': config_template['MYSQL']['HOST'],
            'port': config_template['MYSQL']['PORT'],
            'user': config_template['MYSQL']['USER'],
            'password': config_template['MYSQL']['PASSWD'],
            'db': config_template['MYSQL']['DB'],
            'charset': config_template['MYSQL']['CHARSET'],
            'maxconnections': 70, # 连接池最大连接数量
            'cursorclass': pymysql.cursors.DictCursor
        }
        pool = PooledDB(**config)
     
        def __enter__(self):
            self.conn = MysqlPool.pool.connection()
            self.cursor = self.conn.cursor()
            return self
     
        def __exit__(self, type, value, trace):
            self.cursor.close()
            self.conn.close()
    # 一般用法
    def func(tar_id):
        with MysqlPool() as db:
            db.cursor.execute('YOUR_SQL')
            db.conn.commit()
    # 装饰器用法
    def db_conn(func):
        def wrapper(*args, **kw):
            with MysqlPool() as db:
                result = func(db, *args, **kw)
            return result
        return wrapper
     
    @db_conn
    def update_info(db, *args, **kw):
        try:
            db.cursor.execute("YOUR_SQL")
            db.conn.commit()
            return 0
        except Exception as e:
            db.conn.rollback()
            return 1
    
    

    相关文章

      网友评论

          本文标题:python使用数据库连接池

          本文链接:https://www.haomeiwen.com/subject/vynewrtx.html