美文网首页
python使用数据库连接池

python使用数据库连接池

作者: 托贝多尔 | 来源:发表于2022-08-02 23:42 被阅读0次

脚本开发时常常会用到数据库访问的情况,目前业内也已经有了大量成熟的访问方案,现将平时工作中用到大经典案例总结如下。

postgreSQL连接池

from psycopg2 import pool
from psycopg2.extras import RealDictCursor
from contextlib import contextmanager
import atexit
 
 
class DBHelper:
    def __init__(self):
        self._connection_pool = None
 
    def initialize_connection_pool(self):
        db_dsn = 'postgresql://admin:password@localhost/testdb?connect_timeout=5'
        self._connection_pool = pool.ThreadedConnectionPool(1, 3,db_dsn)
 
    @contextmanager
    def get_resource(self, autocommit=True) -> Union[RealDictCursor, tuple[RealDictCursor, connection]]:
        if self._connection_pool is None:
            self.initialize_connection_pool()
 
        conn = self._connection_pool.getconn()
        conn.autocommit = autocommit
        cursor = conn.cursor(cursor_factory=RealDictCursor)
        try:
            if autocommit:
                yield cursor
            else:
                yield cursor, conn
        finally:
            cursor.close()
            self._connection_pool.putconn(conn)
 
    def shutdown_connection_pool(self):
        if self._connection_pool is not None:
            self._connection_pool.closeall()
 
 
db_helper = DBHelper()
 
 
@atexit.register
def shutdown_connection_pool():
    db_helper.shutdown_connection_pool()
# 一般用法
from db_helper import db_helper
with db_helper.get_resource() as cursor:
    cursor.execute('select * from users')
    for record in cursor.fetchall():
        ... process record, record['name'] ... 
# 事务用法
with db_helper.get_resource(autocommit=False) as (cursor, connection):
    try:
        cursor.execute('update users set name = %s where id = %s', ('new_name', 1))
        cursor.execute('delete from orders where user_id = %s', (1,))
        conn.commit()
    except:
        conn.rollback()

mysql连接池:


import pymysql
from g_conf.config import config_template
from DBUtils.PooledDB import PooledDB
 
 
class MysqlPool:
    config = {
        'creator': pymysql,
        'host': config_template['MYSQL']['HOST'],
        'port': config_template['MYSQL']['PORT'],
        'user': config_template['MYSQL']['USER'],
        'password': config_template['MYSQL']['PASSWD'],
        'db': config_template['MYSQL']['DB'],
        'charset': config_template['MYSQL']['CHARSET'],
        'maxconnections': 70, # 连接池最大连接数量
        'cursorclass': pymysql.cursors.DictCursor
    }
    pool = PooledDB(**config)
 
    def __enter__(self):
        self.conn = MysqlPool.pool.connection()
        self.cursor = self.conn.cursor()
        return self
 
    def __exit__(self, type, value, trace):
        self.cursor.close()
        self.conn.close()
# 一般用法
def func(tar_id):
    with MysqlPool() as db:
        db.cursor.execute('YOUR_SQL')
        db.conn.commit()
# 装饰器用法
def db_conn(func):
    def wrapper(*args, **kw):
        with MysqlPool() as db:
            result = func(db, *args, **kw)
        return result
    return wrapper
 
@db_conn
def update_info(db, *args, **kw):
    try:
        db.cursor.execute("YOUR_SQL")
        db.conn.commit()
        return 0
    except Exception as e:
        db.conn.rollback()
        return 1

相关文章

网友评论

      本文标题:python使用数据库连接池

      本文链接:https://www.haomeiwen.com/subject/vynewrtx.html