#coding=utf-8
import pymysql
from warnings import filterwarnings
#过滤pymysql的告警信息
filterwarnings("ignore",category=pymysql.Warning)
#定义操作数据库工具类
"""
注:如果操作的表中含有中文,需要设置字符集:charset="utf8"
否则报错:UnicodeDecodeError: 'utf-8' codec can't decode byte 0
"""
class DBUtil:
def __init__(self):
self.con=pymysql.connect(host="localhost",user="root",password="hdc@328216",database="xdtestcase",port=3306,charset="utf8")
self.cursor=self.con.cursor(cursor=pymysql.cursors.DictCursor)
def __del__(self):
self.cursor.close()
self.con.close()
def query(self,sql,state="all"):
"""
查询数据库
:param sql:
:param state: all 代表查询所有fetchall(),1代表查询一条fetchone()
:return:
"""
try:
self.cursor.execute(sql)
if state=="all":
result=self.cursor.fetchall()
else:
result =self.cursor.fetchone()
return result
except Exception as e:
print("执行sql异常:{0}".format(e))
finally:
self.cursor.close()
def updateDB(self,sql):
"""
新增,删除,修改
:param sql:
:return:
"""
try:
row=self.cursor.execute(sql) #返回执行sql影响的行数:整型
self.con.commit() #执行增删改后需要提交事物
print("影响行数:{0}".format(row))
except Exception as e:
self.con.rollback() #当执行失败,需要回滚至原来数据
print("执行sql异常:{0}".format(e))
finally:
self.cursor.close()
if __name__ == '__main__':
sql="select * from `case`"
db=DBUtil()
result=db.query(sql) #查询所有
#result=db.query(sql,1) #查询1条
print(result)
# sql2="insert into `case` (`app`) values('xd')"
# db = DBUtil()
# db.updateDB(sql2)
├─case
│ │ TestIndexCase.py
│ │ UserTestCase.py
│ │ UserTestCase2.py
│ │ UserTestCase3.py
│ │
│
├─config
│ __init__.py
│
├─main
│ RunMain.py
│ __init__.py
│
├─req
│ requestDemo.py
│
├─until
│ │ dbUitls.py
│ │ RequestUtils.py
网友评论