一. Python DB API
官方文档:https://www.python.org/dev/peps/pep-0249/
Python DB API Python DB API 流程二、数据库连接对象 connection
建立方法(两种方式是一样的):
①pymysql.connections
②pymysql.connect常用参数:
- host:MySQL 服务器地址 -- 字符串
- port:MySQL 服务器端口号 -- 数字
- user:用户名 -- 字符串
- passwd:密码 -- 字符串
- db:数据库名 -- 字符串
- charset:连接编码 -- 字符串
connection 对象常用方法:
- cursor():使用该连接创建并返回游标
- commit():提交当前事务
- rollback():回滚当前事务
- close():关闭连接
- 示例:
import pymysql
connect = pymysql.connect(host='127.0.0.1',
port=3306,
user='pythonic',
passwd='pythonic',
db='test',
charset='utf8')
cursor = connect.cursor()
print(cursor)
print(connect)
cursor.close()
connect.close()
示例输出结果:
<pymysql.cursors.Cursor object at 0x02FD9C10>
<pymysql.connections.Connection object at 0x02FD9CD0>
三、数据库游标对象 cursor
cursor 对象 execute 方法 cursor 对象 fetch* 方法和 rowcount 属性游标对象用于执行查询和获取结果
cursor 对象常用方法:
- execute(op[,args]):执行一个数据库命令
- fetchone():获取结果集下一行
- fetchmany(size):获取结果集的下 size 行
- fetchall():获取结果集中剩下的所有行
- rowcount:最近一次执行 execute 返回的数据行数或影响行数
- close():关闭游标对象
四、实例演示
- 查询数据库(select)
- 示例
import pymysql
connect = pymysql.connect(host='127.0.0.1',
port=3306,
user='pythonic',
passwd='pythonic',
db='test',
charset='utf8')
cursor = connect.cursor()
sql = 'select * from user'
result = cursor.execute(sql)
print(result) # 事实上执行 cursor.execute() 方法的时候就会自动返回受影响行数,可以用一个变量进行接收
print(cursor.rowcount) # 获取受影响行数
print(cursor.fetchone()) # 获取 1 条数据
print(cursor.fetchmany(3)) # 获取 3 条数据
print(cursor.fetchall()) # 获取剩下全部数据
cursor.close()
connect.close()
- 示例输出结果
6
6
(1, 'name1') # 结果为元组
((2, 'name2'), (3, 'name3'), (4, 'name4'))
((5, 'name5'), (6, 'name6'))
通过以上示例可以发现,fetch*() 方法返回的结果是元组或二维元组
- 示例
import pymysql
connect = pymysql.connect(host='127.0.0.1',
port=3306,
user='pythonic',
passwd='pythonic',
db='test',
charset='utf8')
cursor = connect.cursor()
sql = 'select * from user'
cursor.execute(sql)
for row in cursor.fetchall():
print(f'userid:{row[0]} -- username:{row[1]}')
# print('userid:%s -- username:%s' % row)
cursor.close()
connect.close()
- 示例输出结果
userid:1 -- username:name1
userid:2 -- username:name2
userid:3 -- username:name3
userid:4 -- username:name4
userid:5 -- username:name5
userid:6 -- username:name6
- 更新数据库(insert/update/delete)
-
pymysql 默认情况下会开启事务
-
示例
import pymysql
connect = pymysql.connect(host='127.0.0.1',
port=3306,
user='pythonic',
passwd='pythonic',
db='test',
charset='utf8')
cursor = connect.cursor()
sql_insert = 'insert into user values(7, "name7")'
sql_update = 'update user set username="66" where userid="6"'
sql_delete ='delete from user where userid<3'
cursor.execute(sql_insert)
print(cursor.rowcount)
cursor.execute(sql_update)
print(cursor.rowcount)
cursor.execute(sql_delete)
print(cursor.rowcount)
connect.commit()
cursor.close()
connect.close()
- 示例输出结果
1
1
2
执行程序后结果数据
- 示例
import pymysql
connect = pymysql.connect(host='127.0.0.1',
port=3306,
user='pythonic',
passwd='pythonic',
db='test',
charset='utf8')
cursor = connect.cursor()
sql_insert = 'insert into user values(7, "name7")'
sql_update = 'update user set username="66" where userid="6"'
sql_delete ='delete from user where userd<3' # 错误的 sql 语句,没有 userd 这个字段
try:
cursor.execute(sql_insert)
print(cursor.rowcount)
cursor.execute(sql_update)
print(cursor.rowcount)
cursor.execute(sql_delete) # 执行错误的 sql 语句会出现异常
print(cursor.rowcount)
connect.commit()
except Exception as e:
print(e)
connect.rollback() # 捕获到异常并回滚事务
cursor.close()
connect.close()
- 示例输出结果
1
1
(1054, "Unknown column 'userd' in 'where clause'")
执行错误的 sql 语句并进行 rollback 后结果并没有发生变化
- 示例[关于事务]
import pymysql
connect = pymysql.connect(host='127.0.0.1',
port=3306,
user='pythonic',
passwd='pythonic',
db='test',
charset='utf8',
autocommit=True) # 可以通过传递关键字参数的方法开启或关闭事务
# 可以通过connect.autocommit() 方法开启或关闭事务
# 默认 False,事务为开启状态,改为 True 可以关闭事务
# connect.autocommit(True)
cursor = connect.cursor()
sql_insert = 'insert into user values(7, "name7")'
cursor.execute(sql_insert)
print(cursor.rowcount)
# connect.commit() # 关闭事务后不需要显式的调用 connect.commit() 来进行提交
cursor.close()
connect.close()
- 示例输出结果
1
执行关闭事务程序后结果数据
五、实例演示 -- 模拟银行转账
- 账户 1 给账户 2 转账 100 元
- 示例
import sys
import pymysql
connect = pymysql.connect(host='127.0.0.1',
port=3306,
user='pythonic',
passwd='pythonic',
db='test',
charset='utf8',
autocommit=False)
class TransferMoney(object):
def __init__(self, conn):
self.conn = conn
def check_account_available(self, accountid):
"""
检查账号是否正常
Args:
accountid: 账号 id
Returns:
None
"""
cursor = self.conn.cursor()
try:
sql = 'select * from account where accountid=%s' % accountid
cursor.execute(sql)
result = cursor.fetchall()
print(f'check_account_available: {sql}')
if len(result) != 1:
raise Exception('账号 %s 不存在' % accountid)
finally:
cursor.close()
# with self.conn.cursor as cursor:
# sql = 'select * from account where sccountid=%s' % accountid
# cursor.execute(sql)
# result = cursor.fetchall()
# if len(result) != 1:
# raise Exception('账号 %s 不存在' % accountid)
def have_enough_money(self, accountid, money):
"""
判断账号是否有足够的钱
Args:
accountid: 账号 id
money: 需要判断是否满足的金额
Returns:
None
"""
cursor = self.conn.cursor()
try:
sql = 'select * from account where accountid=%s and money>%s' % (accountid, money)
cursor.execute(sql)
result = cursor.fetchall()
print(f'have_enough_money: {sql}')
if len(result) != 1:
raise Exception('账号 %s 余额不足' % accountid)
finally:
cursor.close()
def reduce_money(self, accountid, money):
"""
减款
Args:
accountid: 减款的账号 id
money: 减款金额
Returns:
None
"""
cursor = self.conn.cursor()
try:
sql = 'update account set money=money-%s where accountid=%s' % (money, accountid)
cursor.execute(sql)
print(f'reduce_money: {sql}')
if cursor.rowcount != 1:
raise Exception('账号 %s 减款失败' % accountid)
finally:
cursor.close()
def add_money(self, accountid, money):
"""
加款
Args:
accountid: 加款的账号 id
money: 加款金额
Returns:
None
"""
cursor = self.conn.cursor()
try:
sql = 'update account set money=money+%s where accountid=%s' % (money, accountid)
cursor.execute(sql)
print(f'add_money: {sql}')
if cursor.rowcount != 1:
raise Exception('账号 %s 加款失败' % accountid)
finally:
cursor.close()
def transfer(self, source_accountid, target_accountid, money):
"""
转账
Args:
source_accountid: 转账发起者账号 id
target_accountid: 转账接收者账账号 id
money: 转账金额
Returns:
None
"""
try:
self.check_account_available(source_accountid)
self.check_account_available(target_accountid)
self.have_enough_money(source_accountid, money)
self.reduce_money(source_accountid, money)
self.add_money(target_accountid, money)
self.conn.commit()
except Exception as e:
self.conn.rollback()
raise e
if __name__ == '__main__':
source_accountid = sys.argv[1] # 接收命令行参数
target_accountid = sys.argv[2]
money = sys.argv[3]
transfer_money = TransferMoney(connect)
try:
transfer_money.transfer(source_accountid, target_accountid, money)
except Exception as e:
print(f'转账出错:{e}')
connect.rollback()
- 配置命令行参数
- 示例输出结果
check_account_available: select * from account where accountid=1
check_account_available: select * from account where accountid=2
have_enough_money: select * from account where accountid=1 and money>100
reduce_money: update account set money=money-100 where accountid=1
add_money: update account set money=money+100 where accountid=2
运行程序后成功转账后的结果数据
- 此时如果对结果数据再次运行程序,会输出转账失败
check_account_available: select * from account where accountid=1
check_account_available: select * from account where accountid=2
have_enough_money: select * from account where accountid=1 and money>100
转账出错:账号 1 余额不足
网友评论