python通过PyMySQL操作MySQL
安装 PyMySQL
pip install pymysql
注意 如何测试是否安装好了,打开cmd
进入python3
环境,导入 pymysql
包,能成功导入并打印出版本号,表示成功安装
>>> import pymysql
>>> pymysql.__version__
'0.8.1'
操作数据库
要操作数据库,首先就要建立和数据库的连接,有两种配置可以连接数据库:
-
第一种配置
con = pymysql.connect( host = '主机', port = '端口', user = '用户名', password = '密码', db = '数据库名', charset = 'utf8' )
-
第二种配置
config = { "host" : "主机", "port" : "端口", "user" : "用户名", "password" : "密码", "db" : "数据库名", "charset" : "utf8" } con = pymysql.connect(**config)
上述两种方式配置, 任选一种方式来连接数据库,都是可行的,然后我们要测试是不是可以成功连上数据库(我将采用第二种方式进行连接)
# 以下操作基于配置好连接,在进行测试
# 测试连接
cursor = con.cursor()
cursor.execute('select 1')
re = cursor.fetchone()
print(re)
# 关闭连接
cursor.close()
con.close()
控制台输出
# 打印以下,表示连接成功,否则连接失败
(1,)
增删改查
因为每次用到这些操作都会连接下数据库,那么我们就可以把连接封装成一个类,使用面向对象来进行操作
class MySQLClass:
# 初始化
def __init__(self):
self.get_conn()
# 创建连接
def get_conn(self):
try:
config = {
"host" : "主机",
"port" : "端口",
"user" : "用户名",
"password" : "密码",
"db" : "数据库名",
"charset" : "utf8"
}
self.conn = pymysql.connect(**config)
except pymysql.Error as e:
print('Error %s: %s' % (e.args[0], e.args[1]))
# 关闭连接
def close_conn(self):
try:
if self.conn:
self.conn.close()
except pymysql.Error as e:
print('Error %s: %s' % (e.args[0], e.args[1]))
增加
def add_operate(self):
try:
# 准备sql
sql = 'insert into user(name, age) values(%s, %s)'
# 找到cursor
cursor = self.conn.cursor()
# 执行sql
cursor.execute(sql, ('Tom', 20))
# 提交事务
self.conn.commit()
# 关闭连接
cursor.close()
self.close_conn()
except :
print('error')
# 回滚
self.conn.rollback()
删除
def delete_operate(self):
try:
sql = "delete from user where %s=%s" % ('id', 1)
cursor = self.conn.cursor()
cursor.execute(sql)
self.conn.commit()
except:
print('error')
self.conn.rollback()
修改
def update_operate(self):
try:
sql = "update user set name=%s where id=1"
cursor = self.conn.cursor()
cursor.execute(sql, ('Which666'))
self.conn.commit()
except:
print('error')
self.conn.rollback()
查询
def search_operate(self):
# fetchone 获取一条数据
sql ="select * from user where name=%s"
cursor = self.conn.cursor()
cursor.execute(sql, ('Tom', ))
result = cursor.fetchone()
print(result)
# 假设现在要通过 result['name'] 获取 报错
# 通过cursor的description 我们可以得出一个详情 这是一个元祖
print(cursor.description)
# 元祖转字典
result =dict(zip([k[0] for k in cursor.description], cursor.fetchone()))
print(result)
print(result['name'])
# 查询总共有多少行数据
print(cursor.rowcount)
cursor.close()
self.conn.close()
return result
def search_operate(self):
# fetchall() 获取多条数据
sql ="select * from user where name=%s"
cursor = self.conn.cursor()
cursor.execute(sql, ('Tom', ))
result = [dict(zip([k[0] for k in cursor.description], row)) for row in cursor.fetchall()]
for i in result:
print(i)
cursor.close()
self.conn.close()
return result
网友评论