美文网首页我爱编程
Python 操作 MySQL

Python 操作 MySQL

作者: 江湖十年 | 来源:发表于2018-05-27 21:17 被阅读114次

    一. Python DB API

    官方文档:https://www.python.org/dev/peps/pep-0249/

    Python DB API Python DB API 流程

    二、数据库连接对象 connection

    建立方法(两种方式是一样的):
    ①pymysql.connections
    ②pymysql.connect

    常用参数:

    • host:MySQL 服务器地址 -- 字符串
    • port:MySQL 服务器端口号 -- 数字
    • user:用户名 -- 字符串
    • passwd:密码 -- 字符串
    • db:数据库名 -- 字符串
    • charset:连接编码 -- 字符串

    connection 对象常用方法:

    • cursor():使用该连接创建并返回游标
    • commit():提交当前事务
    • rollback():回滚当前事务
    • close():关闭连接
    • 示例:
    import pymysql
    
    
    connect = pymysql.connect(host='127.0.0.1',
                              port=3306,
                              user='pythonic',
                              passwd='pythonic',
                              db='test',
                              charset='utf8')
    cursor = connect.cursor()
    
    print(cursor)
    print(connect)
    
    cursor.close()
    connect.close()
    

    示例输出结果:

    <pymysql.cursors.Cursor object at 0x02FD9C10>
    <pymysql.connections.Connection object at 0x02FD9CD0>
    

    三、数据库游标对象 cursor

    游标对象用于执行查询和获取结果
    cursor 对象常用方法:

    • execute(op[,args]):执行一个数据库命令
    • fetchone():获取结果集下一行
    • fetchmany(size):获取结果集的下 size 行
    • fetchall():获取结果集中剩下的所有行
    • rowcount:最近一次执行 execute 返回的数据行数或影响行数
    • close():关闭游标对象
    cursor 对象 execute 方法 cursor 对象 fetch* 方法和 rowcount 属性

    四、实例演示

    1. 查询数据库(select)
    查询数据库
    • 示例
    数据库原始数据
    import pymysql
    
    
    connect = pymysql.connect(host='127.0.0.1',
                              port=3306,
                              user='pythonic',
                              passwd='pythonic',
                              db='test',
                              charset='utf8')
    cursor = connect.cursor()
    
    sql = 'select * from user'
    result = cursor.execute(sql)
    print(result)  # 事实上执行 cursor.execute() 方法的时候就会自动返回受影响行数,可以用一个变量进行接收
    print(cursor.rowcount)  # 获取受影响行数
    print(cursor.fetchone())  # 获取 1 条数据
    print(cursor.fetchmany(3))  # 获取 3 条数据
    print(cursor.fetchall())  # 获取剩下全部数据
    
    cursor.close()
    connect.close()
    
    • 示例输出结果
    6
    6
    (1, 'name1')  # 结果为元组
    ((2, 'name2'), (3, 'name3'), (4, 'name4'))
    ((5, 'name5'), (6, 'name6'))
    

    通过以上示例可以发现,fetch*() 方法返回的结果是元组或二维元组

    • 示例
    import pymysql
    
    
    connect = pymysql.connect(host='127.0.0.1',
                              port=3306,
                              user='pythonic',
                              passwd='pythonic',
                              db='test',
                              charset='utf8')
    cursor = connect.cursor()
    
    sql = 'select * from user'
    cursor.execute(sql)
    for row in cursor.fetchall():
        print(f'userid:{row[0]} -- username:{row[1]}')
        # print('userid:%s -- username:%s' % row)
    
    cursor.close()
    connect.close()
    
    • 示例输出结果
    userid:1 -- username:name1
    userid:2 -- username:name2
    userid:3 -- username:name3
    userid:4 -- username:name4
    userid:5 -- username:name5
    userid:6 -- username:name6
    
    1. 更新数据库(insert/update/delete)
    更新数据库 事务
    • pymysql 默认情况下会开启事务

    • 示例

    原始数据
    import pymysql
    
    
    connect = pymysql.connect(host='127.0.0.1',
                              port=3306,
                              user='pythonic',
                              passwd='pythonic',
                              db='test',
                              charset='utf8')
    cursor = connect.cursor()
    
    sql_insert = 'insert into user values(7, "name7")'
    sql_update = 'update user set username="66" where userid="6"'
    sql_delete ='delete from user where userid<3'
    
    cursor.execute(sql_insert)
    print(cursor.rowcount)
    
    cursor.execute(sql_update)
    print(cursor.rowcount)
    
    cursor.execute(sql_delete)
    print(cursor.rowcount)
    
    connect.commit()
    
    cursor.close()
    connect.close()
    
    • 示例输出结果
    1
    1
    2
    
    执行程序后结果数据
    • 示例
    原始数据
    import pymysql
    
    
    connect = pymysql.connect(host='127.0.0.1',
                              port=3306,
                              user='pythonic',
                              passwd='pythonic',
                              db='test',
                              charset='utf8')
    cursor = connect.cursor()
    
    sql_insert = 'insert into user values(7, "name7")'
    sql_update = 'update user set username="66" where userid="6"'
    sql_delete ='delete from user where userd<3'  # 错误的 sql 语句,没有 userd 这个字段
    
    try:
        cursor.execute(sql_insert)
        print(cursor.rowcount)
    
        cursor.execute(sql_update)
        print(cursor.rowcount)
    
        cursor.execute(sql_delete)  # 执行错误的 sql 语句会出现异常
        print(cursor.rowcount)
    
        connect.commit()
    except Exception as e:
        print(e)
        connect.rollback()  # 捕获到异常并回滚事务
    
    cursor.close()
    connect.close()
    
    • 示例输出结果
    1
    1
    (1054, "Unknown column 'userd' in 'where clause'")
    
    执行错误的 sql 语句并进行 rollback 后结果并没有发生变化
    • 示例[关于事务]
    原始数据
    import pymysql
    
    
    connect = pymysql.connect(host='127.0.0.1',
                              port=3306,
                              user='pythonic',
                              passwd='pythonic',
                              db='test',
                              charset='utf8',
                              autocommit=True)  # 可以通过传递关键字参数的方法开启或关闭事务
    # 可以通过connect.autocommit() 方法开启或关闭事务
    # 默认 False,事务为开启状态,改为 True 可以关闭事务
    # connect.autocommit(True)
    cursor = connect.cursor()
    
    sql_insert = 'insert into user values(7, "name7")'
    
    cursor.execute(sql_insert)
    print(cursor.rowcount)
    
    # connect.commit()  # 关闭事务后不需要显式的调用 connect.commit() 来进行提交
    
    
    cursor.close()
    connect.close()
    
    • 示例输出结果
    1
    
    执行关闭事务程序后结果数据

    五、实例演示 -- 模拟银行转账

    • 账户 1 给账户 2 转账 100 元
    账户 1 给账户 2 转账 100 元
    • 示例
    原始数据
    import sys
    import pymysql
    
    
    connect = pymysql.connect(host='127.0.0.1',
                              port=3306,
                              user='pythonic',
                              passwd='pythonic',
                              db='test',
                              charset='utf8',
                              autocommit=False)
    
    
    class TransferMoney(object):
    
        def __init__(self, conn):
            self.conn = conn
    
        def check_account_available(self, accountid):
            """
                检查账号是否正常
            Args:
                accountid: 账号 id
            Returns:
                None
            """
            cursor = self.conn.cursor()
            try:
                sql = 'select * from account where accountid=%s' % accountid
                cursor.execute(sql)
                result = cursor.fetchall()
                print(f'check_account_available: {sql}')
                if len(result) != 1:
                    raise Exception('账号 %s 不存在' % accountid)
            finally:
                cursor.close()
    
            # with self.conn.cursor as cursor:
            #     sql = 'select * from account where sccountid=%s' % accountid
            #     cursor.execute(sql)
            #     result = cursor.fetchall()
            #     if len(result) != 1:
            #         raise Exception('账号 %s 不存在' % accountid)
    
        def have_enough_money(self, accountid, money):
            """
                判断账号是否有足够的钱
            Args:
                accountid: 账号 id
                money: 需要判断是否满足的金额
            Returns:
                None
            """
            cursor = self.conn.cursor()
            try:
                sql = 'select * from account where accountid=%s and money>%s' % (accountid, money)
                cursor.execute(sql)
                result = cursor.fetchall()
                print(f'have_enough_money: {sql}')
                if len(result) != 1:
                    raise Exception('账号 %s 余额不足' % accountid)
            finally:
                cursor.close()
    
        def reduce_money(self, accountid, money):
            """
                减款
            Args:
                accountid: 减款的账号 id 
                money: 减款金额
            Returns:
                None
            """
            cursor = self.conn.cursor()
            try:
                sql = 'update account set money=money-%s where accountid=%s' % (money, accountid)
                cursor.execute(sql)
                print(f'reduce_money: {sql}')
                if cursor.rowcount != 1:
                    raise Exception('账号 %s 减款失败' % accountid)
            finally:
                cursor.close()
    
        def add_money(self, accountid, money):
            """
                加款
            Args:
                accountid: 加款的账号 id 
                money: 加款金额
            Returns:
                None
            """
            cursor = self.conn.cursor()
            try:
                sql = 'update account set money=money+%s where accountid=%s' % (money, accountid)
                cursor.execute(sql)
                print(f'add_money: {sql}')
                if cursor.rowcount != 1:
                    raise Exception('账号 %s 加款失败' % accountid)
            finally:
                cursor.close()
    
        def transfer(self, source_accountid, target_accountid, money):
            """
                转账
            Args:
                source_accountid: 转账发起者账号 id
                target_accountid: 转账接收者账账号 id
                money: 转账金额
            Returns:
                None
            """
            try:
                self.check_account_available(source_accountid)
                self.check_account_available(target_accountid)
                self.have_enough_money(source_accountid, money)
                self.reduce_money(source_accountid, money)
                self.add_money(target_accountid, money)
                self.conn.commit()
            except Exception as e:
                self.conn.rollback()
                raise e
    
    
    if __name__ == '__main__':
        source_accountid = sys.argv[1]  # 接收命令行参数
        target_accountid = sys.argv[2]
        money = sys.argv[3]
        transfer_money = TransferMoney(connect)
        try:
            transfer_money.transfer(source_accountid, target_accountid, money)
        except Exception as e:
            print(f'转账出错:{e}')
            connect.rollback()
    
    • 配置命令行参数
    配置命令行参数
    • 示例输出结果
    check_account_available: select * from account where accountid=1
    check_account_available: select * from account where accountid=2
    have_enough_money: select * from account where accountid=1 and money>100
    reduce_money: update account set money=money-100 where accountid=1
    add_money: update account set money=money+100 where accountid=2
    
    运行程序后成功转账后的结果数据
    • 此时如果对结果数据再次运行程序,会输出转账失败
    check_account_available: select * from account where accountid=1
    check_account_available: select * from account where accountid=2
    have_enough_money: select * from account where accountid=1 and money>100
    转账出错:账号 1 余额不足
    

    总结

    总结

    相关文章

      网友评论

        本文标题:Python 操作 MySQL

        本文链接:https://www.haomeiwen.com/subject/fskbjftx.html