将pymysql对数据库的操作进行封装,通过调用函数和传递参数实现增删改查,作者主要用于unittest的setup和teardown中对数据库的操作。只实现单表查询,无法连表查询。废话不多说直接上代码:
import pymysql
import logging
logger = logging.getLogger()
db = pymysql.connect("hostname", "username", "password", "database")
class DBOpeartion:
def __init__(self, table: str, data: dict or tuple = None, term: dict = None, codi: str = 'and'):
"""
pymysql增删改查
:param table: 表名称
:param data: 新增操作,传入新增的数据dict;
"insert into table (data[k]) values (data[v])"
更新操作,传入set后需要更新的字段和值dict;
"update table set data[k]=data[v] where term[k]=term[v]"
查询操作,data不传时为*,查询一个时可传str,查询多个字段传入tuple字段名
"select data from table where term[k]=term[v]"
删除操作,传入条件dict;
"delete from table where term[k]=term[v]"
:param term: where后面的条件,传入dict
:param codi: 查询条件,默认and,可传or
"""
self.cursor = db.cursor(pymysql.cursors.DictCursor) # 返回dict类型查询结果
self.table = table
self.data = data
self.term = term
self.codi = codi
if type(self.data) is dict:
self.dt_in = [(str(k), '\"' + self.data[k] + '\"') for k in self.data if self.data[k]]
self.dt_up = [(str(k) + '=\"' + str(self.data[k]) + '\"') for k in self.data if self.data[k]]
elif type(self.data) is tuple:
self.dt_dse = [self.data[i] for i in range(len(self.data)) if self.data[i]]
self.dt_se = ','.join([i for i in self.dt_dse])
elif type(self.data) is str:
self.dt_se = self.data
elif self.data is None:
self.dt_se = '*'
if self.term is not None:
self.tm = [(str(k) + '=\"' + str(self.term[k]) + '\"') for k in self.term if self.term[k]]
def db_insert(self):
ins_sql = 'insert into ' + self.table + ' (' + ','.join([i[0] for i in self.dt_in]) + ') values (' + ','.join(
[i[1] for i in self.dt_in]) + ');'
try:
self.cursor.execute(ins_sql)
db.commit()
return
except:
logger.info("db insert error")
db.rollback()
def db_update(self):
upd_sql = 'update ' + self.table + ' set ' + ','.join([i for i in self.dt_up]) + ' where ' + (
' ' + self.codi + ' ').join(
[i for i in self.tm]) + ';'
try:
self.cursor.execute(upd_sql)
db.commit()
return
except:
logger.info("db update error")
db.rollback()
def db_select(self):
sel_sql = 'select ' + self.dt_se + ' from ' + self.table + ' where ' + (
' ' + self.codi + ' ').join(
[i for i in self.tm]) + ';'
try:
self.cursor.execute(sel_sql)
result = self.cursor.fetchall()
# 返回一个list,里面是dict
return result
except:
logger.info("db select error")
raise
def db_delete(self):
del_sql = 'delete from ' + self.table + ' where ' + (' ' + self.codi + ' ').join(
[i for i in self.tm]) + ';'
try:
self.cursor.execute(del_sql)
db.commit()
return
except:
logger.info("db delete error")
db.rollback()
# 析构函数会导致执行多条语句时提前关闭数据库,建议close写在代码内
# def __del__(self):
# try:
# db.close()
# except pymysql.err.Error:
# logger.info("已关闭")
网友评论