美文网首页
Python ORM框架--SQLAlchemy

Python ORM框架--SQLAlchemy

作者: 无剑_君 | 来源:发表于2019-12-09 15:33 被阅读0次

一、SQLAlchemy简介

  SQLAlchemy是Python编程语言下的一款开源软件。提供了SQL工具包及对象关系映射(ORM)工具,使用MIT许可证发行。
  SQLAlchemy“采用简单的Python语言,为高效和高性能的数据库访问设计,实现了完整的企业级持久模型”。SQLAlchemy的理念是,SQL数据库的量级和性能重要于对象集合;而对象集合的抽象又重要于表和行。因此,SQLAlchemy采用了类似于Java里Hibernate的数据映射模型,而不是其他ORM框架采用的Active Record模型。不过,Elixir和declarative等可选插件可以让用户使用声明语法。
  SQLAlchemy首次发行于2006年2月,并迅速地在Python社区中最广泛使用的ORM工具之一,不亚于Django的ORM框架。


SQLAlchemy

官网:https://www.sqlalchemy.org/
文档地址:https://docs.sqlalchemy.org/en/13/orm/tutorial.html

二、模块安装

pip install pymysql
pip3 install sqlalchemy
模块安装
注:SQLAlchemy无法修改表结构,如果需要可以使用SQLAlchemy开发者开源的另外一个软件Alembic来完成,
官网doc:http://docs.sqlalchemy.org/en/latest/core/expression_api.html

三、数据库连接

1. 创建数据库

创建数据库

使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。

# _*_ coding:utf-8 _*_

# 导入模块
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

#   创建连接,允许溢出5个连接 用户名:root 密码:1234 地址:localhost 端口:3306
engine=create_engine("mysql+pymysql://root:1234@192.168.71.130:3306/BigData", max_overflow=5)
session = sessionmaker(bind=engine)
print(engine)
print(session)

运行结果:

Engine(mysql+pymysql://root:***@192.168.71.130:3306/BigData)
sessionmaker(class_='Session', bind=Engine(mysql+pymysql://root:***@192.168.71.130:3306/BigData), autoflush=True, autocommit=False, expire_on_commit=True)

四、DDL操作

1. 新建表

# _*_ coding:utf-8 _*_

# 导入模块
from sqlalchemy import String, Column, Integer
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

#   创建连接,允许溢出5个连接 用户名:root 密码:1234 地址:localhost 端口:3306
# '数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名'
engine = create_engine("mysql+pymysql://root:1234@192.168.71.130:3306/BigData", max_overflow=5)
# mysql 8.0 使用以下连接方式
# engine = create_engine("mysql+mysqlconnector://root:1234@127.0.0.1:3306/BigData", max_overflow=5)
# 创建DBSession类型
session = sessionmaker(bind=engine)
# declarative_base函数实例化一个数据库表的基类,之后所有的数据库表都要继承这个基类。
Base = declarative_base()


# 必须继承declaraive_base得到的那个基类
class Users(Base):
    # 必须要有__tablename__来指出这个类对应什么表,这个表可以暂时在库中不存在,SQLAlchemy会帮我们创建这个表
    __tablename__ = "Users"
    # Column类创建一个字段
    Sno = Column(String(10), primary_key=True)
    # nullable就是决定是否not null,unique就是决定是否unique,这里假定没人重名,
    # 设置index可以让系统自动根据这个字段为基础建立索引
    Sname = Column(String(20), nullable=False, unique=True,
                   index=True)
    Ssex = Column(String(2), nullable=False)
    Sage = Column(Integer, nullable=False)
    Sdept = Column(String(20))

    def __repr__(self):
        return "<Student>{}:{}".format(self.Sname, self.Sno)

# 这就是为什么表类一定要继承Base,因为Base会通过一些方法来通过引擎初始化数据库结构。
# 不继承Base自然就没有办法和数据库发生联系了。
Base.metadata.create_all(engine)

说明:
__repr__()是 Python 类中的一个特殊方法,由于 object 类己提供了该方法,而所有的 Python 类都是 object 类的子类,因此所有的 Python 对象都具有__repr__()方法。

2. 删除表

# _*_ coding:utf-8 _*_

# 导入模块
from sqlalchemy import String, Column, Integer
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

#   创建连接,允许溢出5个连接 用户名:root 密码:1234 地址:localhost 端口:3306
# engine = create_engine("mysql+pymysql://root:1234@localhost:3306/BigData", max_overflow=5)
# mysql 8.0 使用以下连接方式
engine = create_engine("mysql+mysqlconnector://root:1234@127.0.0.1:3306/BigData", max_overflow=5)
session = sessionmaker(bind=engine)
# declarative_base函数实例化一个数据库表的基类,之后所有的数据库表都要继承这个基类。
Base = declarative_base()

# 必须继承declaraive_base得到的那个基类
class Users(Base):
    # 必须要有__tablename__来指出这个类对应什么表,这个表可以暂时在库中不存在,SQLAlchemy会帮我们创建这个表
    __tablename__ = "Users"
    # Column类创建一个字段
    Sno = Column(String(10), primary_key=True)
    # nullable就是决定是否not null,unique就是决定是否unique,这里假定没人重名,
    # 设置index可以让系统自动根据这个字段为基础建立索引
    Sname = Column(String(20), nullable=False, unique=True,
                   index=True)
    Ssex = Column(String(2), nullable=False)
    Sage = Column(Integer, nullable=False)
    Sdept = Column(String(20))

    def __repr__(self):
        return "<Student>{}:{}".format(self.Sname, self.Sno)

# 删除表
Base.metadata.drop_all(engine)

五、DML操作

1. 创建表

# _*_ coding:utf-8 _*_

# 导入模块
from sqlalchemy import String, Column, Integer, ForeignKey
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

#   创建连接,允许溢出5个连接 用户名:root 密码:1234 地址:localhost 端口:3306
# engine = create_engine("mysql+pymysql://root:1234@localhost:3306/BigData", max_overflow=5)
# mysql 8.0 使用以下连接方式
engine = create_engine("mysql+mysqlconnector://root:1234@127.0.0.1:3306/BigData", max_overflow=5)
session = sessionmaker(bind=engine)
# declarative_base函数实例化一个数据库表的基类,之后所有的数据库表都要继承这个基类。
Base = declarative_base()

# 多对一:假设多个员工可以属于一个部门,而多个部门不能有同一个员工
class Dep(Base):
    __tablename__='dep'
    id=Column(Integer,primary_key=True,autoincrement=True)
    dname=Column(String(64),nullable=False,index=True)

class Emp(Base):
    __tablename__='emp'
    id=Column(Integer,primary_key=True,autoincrement=True)
    ename=Column(String(32),nullable=False,index=True)
    dep_id=Column(Integer,ForeignKey('dep.id'))

def init_db():
    Base.metadata.create_all(engine)

def drop_db():
    Base.metadata.drop_all(engine)

# 删除表
drop_db()
# 创建表
init_db()
# 创建会话
Session=sessionmaker(bind=engine)
session=Session()

2. 添加记录

# _*_ coding:utf-8 _*_

# 导入模块
from sqlalchemy import String, Column, Integer, ForeignKey
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

#   创建连接,允许溢出5个连接 用户名:root 密码:1234 地址:localhost 端口:3306
# engine = create_engine("mysql+pymysql://root:1234@localhost:3306/BigData", max_overflow=5)
# mysql 8.0 使用以下连接方式
engine = create_engine("mysql+mysqlconnector://root:1234@127.0.0.1:3306/BigData", max_overflow=5)
session = sessionmaker(bind=engine)
# declarative_base函数实例化一个数据库表的基类,之后所有的数据库表都要继承这个基类。
Base = declarative_base()


# 多对一:假设多个员工可以属于一个部门,而多个部门不能有同一个员工
class Dep(Base):
    __tablename__ = 'dep'
    id = Column(Integer, primary_key=True, autoincrement=True)
    dname = Column(String(64), nullable=False, index=True)


class Emp(Base):
    __tablename__ = 'emp'
    id = Column(Integer, primary_key=True, autoincrement=True)
    ename = Column(String(32), nullable=False, index=True)
    dep_id = Column(Integer, ForeignKey('dep.id'))


# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 添加单条记录,无需指定id,因其是自增长的
row_obj = Dep(dname='销售')
# 添加对象
session.add(row_obj)

# 添加多条记录
# session.add_all([Dep(dname='技术'), Dep(dname='运营'), Dep(dname='人事'), ])

# 必须提交事务,否则不会添加
session.commit()

3. 修改记录

# _*_ coding:utf-8 _*_

# 导入模块
from sqlalchemy import String, Column, Integer, ForeignKey
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

#   创建连接,允许溢出5个连接 用户名:root 密码:1234 地址:localhost 端口:3306
# engine = create_engine("mysql+pymysql://root:1234@localhost:3306/BigData", max_overflow=5)
# mysql 8.0 使用以下连接方式
engine = create_engine("mysql+mysqlconnector://root:1234@127.0.0.1:3306/BigData", max_overflow=5)
session = sessionmaker(bind=engine)
# declarative_base函数实例化一个数据库表的基类,之后所有的数据库表都要继承这个基类。
Base = declarative_base()


# 多对一:假设多个员工可以属于一个部门,而多个部门不能有同一个员工
class Dep(Base):
    __tablename__ = 'dep'
    id = Column(Integer, primary_key=True, autoincrement=True)
    dname = Column(String(64), nullable=False, index=True)


class Emp(Base):
    __tablename__ = 'emp'
    id = Column(Integer, primary_key=True, autoincrement=True)
    ename = Column(String(32), nullable=False, index=True)
    dep_id = Column(Integer, ForeignKey('dep.id'))


# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 更新id>0的数据
# session.query(Dep).filter(Dep.id > 0).update({'dname':'大数据啊大数据'})
# False - 不对session进行同步,直接进行delete or update操作。
# session.query(Dep).filter(Dep.id > 0).update({'dname':Dep.dname+'_BigData'},synchronize_session=False)
# 在delete or update操作之前,用query中的条件直接对session的identity_map中的objects进行eval操作,将符合条件的记录下来。
session.query(Dep).filter(Dep.id > 0).update({'id':Dep.id*100},synchronize_session='evaluate')

# 必须提交事务,否则不会添加
session.commit()

synchronize_session说明:
主要是session与数据库数据是否存在的关系。

4. 删除记录

#先查询,后删除
session.query(Emp).filter(Emp.id == 11).delete()
session.commit()


sqlalchemy session 执行 delete 时 synchronize_session 策略:
False:不同步 session,如果被删除的 objects 已经在 session 中存在,在 session commit 或者 expire_all 之前,这些被删除的对象都存在 session 中。不同步可能会导致获取被删除 objects 时出错。
fatch:删除之前从 db 中匹配被删除的对象并保存在 session 中,然后再从 session 中删除,这样做是为了让 session 的对象管理 identity_map 得知被删除的对象究竟是哪些以便更新引用关系。
evaluate:默认值。根据当前的 query criteria 扫描 session 中的 objects,如果不能正确执行则抛出错误,这句话也可以理解为,如果 session 中原本就没有这些被删除的 objects,扫描当然不会发生匹配,相当于匹配未正确执行。

六、查询

1. 简单查询

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

#查所有,取所有字段
res=session.query(Dep).all()
# 读取数据
for row in res:print(row.id,row.dname)

# 查所有,取所有字段
res=session.query(Dep.dname).order_by(Dep.id).all()
# 读取数据
for row in res:print(row.dname)

# 查询第一条记录
res=session.query(Dep.dname).first()
# 返回为元组
print(res)

# 条件查询
# 逗号分隔,默认为and
res=session.query(Dep).filter(Dep.id > 1,Dep.id <1000)
print([(row.id,row.dname) for row in res])

2. 复杂查询

1) 初始化数据
# _*_ coding:utf-8 _*_

# 导入模块
from sqlalchemy import String, Column, Integer, ForeignKey
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

#   创建连接,允许溢出5个连接 用户名:root 密码:1234 地址:localhost 端口:3306
# engine = create_engine("mysql+pymysql://root:1234@localhost:3306/BigData", max_overflow=5)
# mysql 8.0 使用以下连接方式
engine = create_engine("mysql+mysqlconnector://root:1234@127.0.0.1:3306/BigData", max_overflow=5)
session = sessionmaker(bind=engine)
# declarative_base函数实例化一个数据库表的基类,之后所有的数据库表都要继承这个基类。
Base = declarative_base()

# 多对一:假设多个员工可以属于一个部门,而多个部门不能有同一个员工
class Dep(Base):
    __tablename__ = 'dep'
    id = Column(Integer, primary_key=True, autoincrement=True)
    dname = Column(String(64), nullable=False, index=True)


class Emp(Base):
    __tablename__ = 'emp'
    id = Column(Integer, primary_key=True, autoincrement=True)
    ename = Column(String(32), nullable=False, index=True)
    dep_id = Column(Integer, ForeignKey('dep.id'))

def init_db():
    Base.metadata.create_all(engine)

def drop_db():
    Base.metadata.drop_all(engine)

drop_db()
init_db()

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

session.add_all([
    Dep(dname='技术'),
    Dep(dname='销售'),
    Dep(dname='运营'),
    Dep(dname='人事'),
])

session.add_all([
    Emp(ename='',dep_id=1),
    Emp(ename='李浩楠 ',dep_id=1),
    Emp(ename='廖佳宇',dep_id=1),
    Emp(ename='英雄宇',dep_id=2),
    Emp(ename='王  浩',dep_id=3),
    Emp(ename='程  超',dep_id=4),
    Emp(ename='牛文娟',dep_id=2),
    Emp(ename='杜泽瑜',dep_id=4),
    Emp(ename='狗捉猫',dep_id=3)
])

# 提交
session.commit()

2)条件查询
#   filter_by只能传参数等值条件
sql=session.query(Emp).filter_by(ename='狗捉猫')
res=sql.all() # sql语句的执行结果
for row in res:print(row.id,row.ename)

#   and 条件
res=session.query(Emp).filter(Emp.id>0).all()
for row in res:print(row.id,row.ename)

#   between 查询
res=session.query(Emp).filter(Emp.id.between(1,3)).all()
for row in res:print(row.id,row.ename)

#   in 查询
res=session.query(Emp).filter(Emp.id.in_([1,3,9,2])).all()
for row in res:print(row.id,row.ename)

#   ~代表取反,转换成sql就是关键字not
res=session.query(Emp).filter(~Emp.id.in_([1,3,9,2]))
for row in res:print(row.id,row.ename)

3)and or 查询
from sqlalchemy import and_,or_
# and 与 or 查询
# and 查询
res=session.query(Emp).filter(and_(Emp.id > 0,Emp.ename=='林海峰')).all()
for row in res:print(row.id,row.ename)

# or 查询
res=session.query(Emp).filter(or_(Emp.id < 2,Emp.ename=='狗捉猫')).all()
for row in res:print(row.id,row.ename)

# or + and 查询
res=session.query(Emp).filter(
    or_(
        Emp.dep_id == 3,
        and_(Emp.id > 1,Emp.ename=='狗捉猫'),
        Emp.ename != ''
    )
).all()
for row in res:print(row.id,row.ename)

4)like 查询
# like 查询
res=session.query(Emp).filter(Emp.ename.like('%王_%')).all()
for row in res:print(row.id,row.ename)
# like 取反
res=session.query(Emp).filter(~Emp.ename.like('%陈_%')).all()
for row in res:print(row.id,row.ename)

5)limit 查询
# limit 查询  limit:可以限制每次查询的时候只查询几条数据。 取3条记录
res=session.query(Emp).limit(3).all()
for row in res:print(row.id,row.ename)

# offset 查询 从第5和记录开始,取3条记录
res=session.query(Emp).limit(3).offset(5).all()
for row in res:print(row.id,row.ename)

# slice 查询 只对list有效 slice(起始值,结束值) 取2到5之间记录,不包括2
res= session.query(Emp).slice(2,5).all()
for row in res:print(row.id,row.ename)

# 切片 取0-5,间隔为2,就是取奇数条数据
res=session.query(Emp)[0:5:2]
for row in res:print(row.id,row.ename)
6)排序
# 排序
res=session.query(Emp).order_by(Emp.dep_id.desc()).all()
for row in res:print(row.id,row.ename)
res=session.query(Emp).order_by(Emp.dep_id.desc(),Emp.id.asc()).all()
for row in res:print(row.id,row.ename)
7)分组
from sqlalchemy.sql import func
# 分组, 主要是聚合函数
res=session.query(Emp.dep_id).group_by(Emp.dep_id).all()
res=session.query(
    func.max(Emp.dep_id),
    func.min(Emp.dep_id),
    func.sum(Emp.dep_id),
    func.avg(Emp.dep_id),
    func.count(Emp.dep_id),
).group_by(Emp.dep_id).all()

for row in res:print(row[0],row[1],row[2],row[3],row[4])

# 分组,部门数, 部门人数最多的有多少人,id是哪个
res=session.query(
    Emp.dep_id,
    func.count(1),
).group_by(Emp.dep_id).having(func.count(1) > 2).all()
for row in res:print(row.dep_id,row[1])

8)笛卡尔积查询
# 笛卡尔积   select * from emp,dep;
res=session.query(Emp,Dep).all()
for row in res:
    emp_tb=row[0]
    dep_tb=row[1]
    print(emp_tb.id,emp_tb.ename,dep_tb.id,dep_tb.dname)

9)多表查询
res=session.query(Emp,Dep).filter(Emp.dep_id==Dep.id).all()
for row in res:
    emp_tb=row[0]
    dep_tb=row[1]
    print(emp_tb.id,emp_tb.ename,dep_tb.id,dep_tb.dname)

10)连接查询
#内连接 join默认为内连接,SQLAlchemy会自动帮我们通过foreign key字段去找关联关系
# 该连接只有Emp表数据
res=session.query(Emp).join(Dep)
for row in res:
    print(row.id,row.ename,row.dep_id)

# 内连接
res=res=session.query(Emp.id,Emp.ename,Emp.dep_id,Dep.dname).join(Dep).all()
for row in res:
    print(row.id,row.ename,row.dep_id,row.dname)

# 左连接:isouter=True
res=session.query(Emp.id,Emp.ename,Emp.dep_id,Dep.dname).join(Dep,isouter=True).all()
for row in res:
    print(row.id,row.ename,row.dep_id,row.dname)

# 右连接:同左连接,只是把两个表的位置换一下
res=session.query(Dep.id,Dep.dname,Emp.dep_id,Emp.ename).join(Emp,isouter=True).all()
for row in res:
    print(row.id,row.ename,row.dep_id,row.dname)

11)合并查询
from sqlalchemy import or_

# 合并查询
q1=session.query(Emp.id,Emp.ename).filter(Emp.id > 0,Emp.id < 5)
q2=session.query(Emp.id,Emp.ename).filter(
    or_(
        Emp.ename.like('%海%'),
        Emp.ename.like('%昊%'),
    )
)
res1=q1.union(q2) #组合+去重
res2=q1.union_all(q2) #组合,不去重

print([i.ename for i in q1.all()])
print([i.ename for i in q2.all()])
print([i.ename for i in res1.all()])
print([i.ename for i in res2.all()]) 

12)子查询

有三种形式的子查询,注意:子查询的sql必须用括号包起来,尤其在形式三中需要注意这一点

#示例:查出id大于2的员工,当做子查询的表使用
#原生SQL:
# select * from (select * from emp where id > 2);
res=session.query(
    session.query(Emp).filter(Emp.id > 8).subquery()
).all()

for row in res:
    print(row.id,row.ename,row.dep_id)

#示例:#查出销售部门的员工姓名
#原生SQL:
# select ename from emp where dep_id in (select id from dep where dname='销售');
#ORM:
res=session.query(Emp.ename).filter(Emp.dep_id.in_(
    session.query(Dep.id).filter_by(dname='销售'),    #  传的是参数
    # session.query(Dep.id).filter(Dep.dname=='销售') #   传的是表达式
)).all()

for row in res:
    print(row[0])

#示例:查询所有的员工姓名与部门名
#原生SQL:
# select ename as 员工姓名,(select dname from dep where id = emp.dep_id) as 部门名 from emp;

#ORM: SELECT dep.dname FROM dep, emp WHERE dep.id = emp.dep_id
sub_sql=session.query(Dep.dname).filter(Dep.id==Emp.dep_id)
sub_sql.as_scalar()
# as_scalar的功能就是把上面的sub_sql加上了括号
res=session.query(Emp.ename,sub_sql.as_scalar()).all()

for row in res:
    print(row[0],row[1])

七、正查、反查

1. 创建表

# _*_ coding:utf-8 _*_

# 导入模块
from sqlalchemy import String, Column, Integer, ForeignKey
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 导入关系
from sqlalchemy.orm import relationship

#   创建连接,允许溢出5个连接 用户名:root 密码:1234 地址:localhost 端口:3306
# engine = create_engine("mysql+pymysql://root:1234@localhost:3306/BigData", max_overflow=5)
# mysql 8.0 使用以下连接方式
engine = create_engine("mysql+mysqlconnector://root:1234@127.0.0.1:3306/BigData", max_overflow=5)
session = sessionmaker(bind=engine)
# declarative_base函数实例化一个数据库表的基类,之后所有的数据库表都要继承这个基类。
Base = declarative_base()

# 多对一:假设多个员工可以属于一个部门,而多个部门不能有同一个员工
class Dep(Base):
    __tablename__='dep'
    id=Column(Integer,primary_key=True,autoincrement=True)
    dname=Column(String(64),nullable=False,index=True)

class Emp(Base):
    __tablename__='emp'
    id=Column(Integer,primary_key=True,autoincrement=True)
    ename=Column(String(32),nullable=False,index=True)
    dep_id=Column(Integer,ForeignKey('dep.id'))

    # 在ForeignKey所在的类内添加relationship的字段,注意:
    # 1:Dep是类名
    # 2:depart字段不会再数据库表中生成字段
    # 3:depart用于Emp表查询Dep表(正向查询),而xxoo用于Dep表查询Emp表(反向查询),
    # backref参数则对关系提供反向引用的声明。
    depart=relationship('Dep',backref='reverse')

def init_db():
    Base.metadata.create_all(engine)

def drop_db():
    Base.metadata.drop_all(engine)

drop_db()
init_db()

2. 初始化数据

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

session.add_all([
    Dep(dname='技术'),
    Dep(dname='销售'),
    Dep(dname='运营'),
    Dep(dname='人事'),
])

session.add_all([
    Emp(ename='',dep_id=1),
    Emp(ename='李浩楠 ',dep_id=1),
    Emp(ename='廖佳宇',dep_id=1),
    Emp(ename='英雄宇',dep_id=2),
    Emp(ename='王  浩',dep_id=3),
    Emp(ename='程  超',dep_id=4),
    Emp(ename='牛文娟',dep_id=2),
    Emp(ename='杜泽瑜',dep_id=4),
    Emp(ename='狗捉猫',dep_id=3)
])

# 提交
session.commit()

3. 正向、反向查询

# 示例:查询员工名与其部门名
res=session.query(Emp.ename,Dep.dname).join(Dep)
# 查询员工名与其部门名(正向查)
res=session.query(Emp)
for row in res:
    # depart 在创建Emp表中定义为关系
    print(row.ename,row.id,row.depart.dname)

#查询部门名以及该部门下的员工(反向查)
res=session.query(Dep)
for row in res:
    # 在创建Emp表中定义为关系
    print(row.dname, [r.ename for r in row.aabb])

七、事务

  SQLAlchemy 的 session 是用于管理数据库操作的一个像容器一样的东西. 模型实例对象本身独立存在, 而要让其修改(创建)生效, 则需要把它们加入某个 session 。同时你也可以把模型实例对象从 session 中去除。

被 session 管理的实例对象:
session.commit() 直接将数据库内存中的数据提交到数据库,此操作会内部调用session.flush(),其余的事务可以访问最新的数据;
session.rollback()是回滚当前事务的变更数据操作;
session.flush() 的作用是在事务管理内与数据库发生交互, 对应的实例状态被反映到数据库,比如自增 ID 被填充上值,但是数据库中当前事务的数据值并未更新上;相当于预提交,等于提交到数据库内存,还未写入数据库文件;
session.merge(obj)查询更新操作;就是更新之前先查询,如果没有自动插入;

  1. 简易嵌套
# 向emp表插入的一条数据
t1 = Emp(ename='测试1', dep_id=2)
t2 = Emp(ename='测试2', dep_id=3)

session.add(t2)
# 创建一个子嵌套事务,第一个commit只是将子事务的数据托管到父事务,并未提交到数据库
session.begin_nested()
session.add(t1)
session.commit()

# 父事务执行提交,才真正将t1,t2提交到数据库
session.commit()
  1. 复杂事务嵌套
# 向emp表插入的一条数据
t1 = Emp(ename='测试1', dep_id=2)
t2 = Emp(ename='测试2', dep_id=3)

session.add(t2)

try:
    # 开启嵌套事务
    with session.begin_nested():
        session.add(t1)
except Exception as e:
    print(e)
    # 异常回滚
    session.rollback()

# 事务提交
session.commit()

注意:begin_nested创建一个嵌套事务,事务的提交最后由外层的commit执行,with执行完毕,内层session自动托管到外层事务上。

八、执行原生sql语句

# _*_ coding:utf-8 _*_
# 导入模块
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
# 创建引擎
engine = create_engine("mysql+pymysql://root:1234@192.168.71.130:3306/BigData", max_overflow=5,)
Session = scoped_session(sessionmaker(bind=engine,autocommit=True))

# 添加
sql = "insert into goods (name,price,number,remark) values (:name,:price,:number,:remark)"
data = {"name": "可乐", "price": 2.55, "number": 10, "remark": "可口可乐"}
# 执行sql
cusor = Session().execute(sql, data)

# 删除记录
sql="delete from goods where id=5"
cusor = Session().execute(sql)

# 修改记录
sql="update goods set name=:name where id=5"
cusor = Session().execute(sql, {"name":"农夫山泉水"})

# 查询数据
sql="select * from goods"
cusor = Session().execute(sql)
# 获取结果集
result=cusor.fetchall()
for goods in result:
    print(goods.name)

九、常见问题:

  1. ModuleNotFoundError: No module named 'pymysql'
    安装
pip install pymysql
  1. Warning: (1366, "Incorrect string value: '\xD6\xD0\xB9\xFA\xB1\xEA...' for column 'VARIABLE_VALUE' at row 1")
    result = self._query(query)
    安装
pip install mysql-connector

更改配置:

engine = create_engine("mysql+mysqlconnector://root:1234@localhost:3306/BigData", max_overflow=5)
  1. mysql.connector.errors.NotSupportedError: Authentication plugin 'caching_sha2_password' is not supported
    原因:
    安装的是Mysql8版本,不支持,请安装以下模块。
pip install MySQL-connector-python 

更改配置:

engine = create_engine("mysql+mysqlconnector://root:1234@localhost:3306/BigData", max_overflow=5)

相关文章

网友评论

      本文标题:Python ORM框架--SQLAlchemy

      本文链接:https://www.haomeiwen.com/subject/qallgctx.html