1.pyCharm 安装连接mysql数据库的依赖
安装数据库驱动:pip install pymysql
安装连接池库:pip install dbutils
- 简单使用,无连接池配置
import pymysql.connections
class MysqlConnection():
def __init__(self,db=None):
self.db = pymysql.connect(
host="1.0.0.1",
user="admin",
passwd="123",
database="db"
)
def select(self,sql):
try:
mycursor = self.db.cursor()
mycursor.execute(sql)
myresult = mycursor.fetchall() # fetchall() 获取所有记录
rs_list = []
for x in myresult:
rs_list.append(x)
# print(x)
print(rs_list)
except Exception:
print("查询失败。")
finally:
mycursor.close()
self.db.close()
if __name__ == '__main__':
MysqlConnection().select("select code from activity a where deleted =0 and name='AAA'")
3.使用数据库连接池,并进行封装,高级使用
db_collection.py 连接初始化模块
1)设置类变量 pool 池,
2)设置方法 getconn ,作用通过 dbutils 模块的 PooledDB 方法连接数据库,给 pool 赋值
3)设置方法 getMyConn,返回自身实例对象
核心作用:创建一个类,此类可以创建一个连接池 pool
#-*-coding:GBK -*-
from dbutils.pooled_db import PooledDB
import pymysql
# import db_config as config
"""
@功能:创建数据库连接池
"""
# 数据库信息
DB_TEST_HOST = "1.0.0.1"
DB_TEST_USER = "admin"
DB_TEST_PASSWORD = "123"
DB_TEST_DBNAME = "db"
# 数据库连接编码
DB_CHARSET = "utf8"
# mincached : 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接)
DB_MIN_CACHED = 10
# maxcached : 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小)
DB_MAX_CACHED = 10
# maxshared : 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用
DB_MAX_SHARED = 20
# maxconnecyions : 创建连接池的最大数量(缺省值 0 代表不限制)
DB_MAX_CONNECYIONS = 100
# blocking : 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误<toMany......> 其他代表阻塞直到连接数减少,连接被分配)
DB_BLOCKING = True
# maxusage : 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
DB_MAX_USAGE = 0
# setsession : 一个可选的SQL命令列表用于准备每个会话,如["set datestyle to german", ...]
DB_SET_SESSION = None
# creator : 使用连接数据库的模块
DB_CREATOR = pymysql
class MyConnectionPool(object):
__pool = None
# 创建数据库连接conn和游标cursor
def __enter__(self):
self.conn = self.__getconn()
self.cursor = self.conn.cursor()
# 创建数据库连接池
def __getconn(self):
if self.__pool is None:
self.__pool = PooledDB(
creator=DB_CREATOR,
mincached=DB_MIN_CACHED,
maxcached=DB_MAX_CACHED,
maxshared=DB_MAX_SHARED,
maxconnections=DB_MAX_CONNECYIONS,
blocking=DB_BLOCKING,
maxusage=DB_MAX_USAGE,
setsession=DB_SET_SESSION,
host=DB_TEST_HOST,
user=DB_TEST_USER,
passwd=DB_TEST_PASSWORD,
db=DB_TEST_DBNAME,
use_unicode=True,
charset=DB_CHARSET
)
return self.__pool.connection()
# 释放连接池资源
def __exit__(self, exc_type, exc_val, exc_tb):
self.cursor.close()
self.conn.close()
# 从连接池中取出一个连接
def getconn(self):
conn = self.__getconn()
cursor = conn.cursor()
return cursor, conn
# 获取连接池,实例化
def get_my_connection():
return MyConnectionPool()
db_util.py
1)该模块,主要用于获取连接
2)封装执行命令,判断是否有参数和是否执行完就释放连接
3)执行语句,增/删/改/查,查询有结果返回结果没有返回0;增/删/改返回变更数据条数,没有返回0
#-*-coding:GBK -*-
from db_collection import get_my_connection
class JDBCUtills(object):
def __init__(self):
self.db = get_my_connection() # 从数据池中获取连接
def __new__(cls, *args, **kwargs):
if not hasattr(cls, 'inst'): # 单例
cls.inst = super(JDBCUtills, cls).__new__(cls, *args, **kwargs)
return cls.inst
# 封装执行命令
def execute(self, sql, param=None, autoclose=False):
"""
:param sql: 字符串类型,sql语句
:param param: sql语句中要替换的参数"select %s from tab where id=%s" 其中的%s就是参数
:param autoclose: 是否关闭连接
:return: 返回连接conn和游标cursor
"""
cursor, conn = self.db.getconn() # 从连接池获取连接
count = 0
try:
# count : 为改变的数据条数
if param:
count = cursor.execute(sql, param)
else:
count = cursor.execute(sql)
conn.commit()
if autoclose:
self.close(cursor, conn)
except Exception as e:
pass
return cursor, conn, count
# 释放连接
def close(self, cursor, conn):
"""释放连接归还给连接池"""
cursor.close()
conn.close()
# 查询所有
def selectall(self, sql, param=None):
try:
cursor, conn, count = self.execute(sql, param)
res = cursor.fetchall()
return res
except Exception as e:
print(e)
self.close(cursor, conn)
return count
# 查询单条
def selectone(self, sql, param=None):
try:
cursor, conn, count = self.execute(sql, param)
res = cursor.fetchone()
self.close(cursor, conn)
return res
except Exception as e:
print("error_msg:", e.args)
self.close(cursor, conn)
return count
# 增加
def insertone(self, sql, param):
try:
cursor, conn, count = self.execute(sql, param)
# _id = cursor.lastrowid() # 获取当前插入数据的主键id,该id应该为自动生成为好
conn.commit()
self.close(cursor, conn)
return count
# 防止表中没有id返回0
# if _id == 0:
# return True
# return _id
except Exception as e:
print(e)
conn.rollback()
self.close(cursor, conn)
return count
# 增加多行
def insertmany(self, sql, param):
"""
:param sql:
:param param: 必须是元组或列表[(),()]或((),())
:return:
"""
cursor, conn, count = self.db.getconn()
try:
cursor.executemany(sql, param)
conn.commit()
return count
except Exception as e:
print(e)
conn.rollback()
self.close(cursor, conn)
return count
# 删除
def delete(self, sql, param=None):
try:
cursor, conn, count = self.execute(sql, param)
self.close(cursor, conn)
return count
except Exception as e:
print(e)
conn.rollback()
self.close(cursor, conn)
return count
# 更新
def update(self, sql, param=None):
try:
cursor, conn, count = self.execute(sql, param)
conn.commit()
self.close(cursor, conn)
return count
except Exception as e:
print(e)
conn.rollback()
self.close(cursor, conn)
return count
if __name__ == '__main__':
result =JDBCUtills().selectone("select code from activity a where deleted =0 and name='AAA'")
print(result )
踩坑记录:
坑1,执行测试报错:SyntaxError: Non-UTF-8 code starting with '\xd0' in file
解决方案:在db_util.py 中头部加上
#-*-coding:GBK -*-
坑2,再执行仍错:SyntaxError: (unicode error) 'utf-8' codec can't decode byte 0xc1 in position 1: invalid start byte
解决方案:在db_collection.py 中头部加上
#-*-coding:GBK -*-
网友评论