美文网首页
3. 数据操作

3. 数据操作

作者: SingleDiego | 来源:发表于2022-03-16 16:58 被阅读0次

    官方文档:






    本章数据库模型

    本节中的操作将在以下数据库模型里进行。

    from sqlalchemy import create_engine, MetaData
    from sqlalchemy import Table, Column, Integer, String, ForeignKey
    
    engine = create_engine('sqlite:///memory.db', echo=True, future=True)
    metadata_obj = MetaData()
    
    user_table = Table(
        "user_account",
        metadata_obj,
        Column('id', Integer, primary_key=True),
        Column('name', String(30)),
        Column('fullname', String)
    )
    
    address_table = Table(
        "address",
        metadata_obj,
        Column('id', Integer, primary_key=True),
        Column('user_id', ForeignKey('user_account.id'), nullable=False),
        Column('email_address', String, nullable=False)
    )
    

    同时我们会使用 ORM 来定义数据库模型。

    from sqlalchemy import create_engine, MetaData
    from sqlalchemy import Table, Column, Integer, String, ForeignKey
    from sqlalchemy.orm import declarative_base, relationship
    
    engine = create_engine('sqlite:///memory.db', echo=True, future=True)
    Base = declarative_base()
    
    class User(Base):
        __tablename__ = 'user_account'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(30))
        fullname = Column(String)
    
        addresses = relationship("Address", back_populates="user")
    
        def __repr__(self):
            return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"
    
    class Address(Base):
        __tablename__ = 'address'
    
        id = Column(Integer, primary_key=True)
        email_address = Column(String, nullable=False)
        user_id = Column(Integer, ForeignKey('user_account.id'))
    
        user = relationship("User", back_populates="addresses")
    
        def __repr__(self):
            return f"Address(id={self.id!r}, email_address={self.email_address!r})"
    






    insert() 表达式

    一个 insert() 例子,stmt 变量列明了要插入数据的表格和要插入的数据。

    from sqlalchemy import insert
    
    stmt = insert(user_table).values(name='spongebob', fullname="Spongebob Squarepants")
    
    with engine.connect() as conn:
        result = conn.execute(stmt)
        conn.commit()
    

    上面的 stmt 变量是一个 Insert 实例。我们把它字符串化已更好地理解。

    >>> print(stmt)
    INSERT INTO user_account (name, fullname) VALUES (:name, :fullname)
    

    字符串化是通过一个数据库对象编译形创建的,我们使用 ClauseElement.compile() 获取这个对象;要想查看绑定的参数,可读取 params 变量。

    >>> compiled = stmt.compile()
    >>> compiled.params
    {'name': 'spongebob', 'fullname': 'Spongebob Squarepants'}
    

    执行上述代码后,会在指定的数据表插入一行数据,改行数据会被默认加上主键 id,其值默认为 1。我们可以使用 CursorResult.inserted_primary_key 来查看。

    >>> result.inserted_primary_key
    (1,)
    
    插入多行
    with engine.connect() as conn:
        result = conn.execute(
            insert(user_table),
            [
                {"name": "sandy", "fullname": "Sandy Cheeks"},
                {"name": "patrick", "fullname": "Patrick Star"}
            ]
        )
        conn.commit()
    
    使用标量子查询集(scalar subquery)插入

    现在我们实现一个复杂点的操作,有下面一组数据:

     [
        {"username": 'spongebob', "email_address": "spongebob@sqlalchemy.org"},
        {"username": 'sandy', "email_address": "sandy@sqlalchemy.org"},
        {"username": 'sandy', "email_address": "sandy@squirrelpower.org"},
    ]
    

    我们需要在 user_table 数据表根据 username 找到对应的行,再把对应的 idemail_address 写入到 address_table 表的 user_idemail_address 列中。

    from sqlalchemy import create_engine, MetaData
    from sqlalchemy import Table, Column, Integer, String, ForeignKey
    from sqlalchemy import insert, select, bindparam
    
    engine = create_engine('sqlite:///memory.db', echo=True, future=True)
    metadata_obj = MetaData()
    
    user_table = Table(
        "user_account",
        metadata_obj,
        Column('id', Integer, primary_key=True),
        Column('name', String(30)),
        Column('fullname', String)
    )
    
    address_table = Table(
        "address",
        metadata_obj,
        Column('id', Integer, primary_key=True),
        Column('user_id', ForeignKey('user_account.id'), nullable=False),
        Column('email_address', String, nullable=False)
    )
    
    scalar_subq = (
        select(user_table.c.id).
        where(user_table.c.name==bindparam('username')).
        scalar_subquery()
    )
    
    with engine.connect() as conn:
        result = conn.execute(
            insert(address_table).values(user_id=scalar_subq),
            [
                {"username": 'spongebob', "email_address": "spongebob@sqlalchemy.org"},
                {"username": 'sandy', "email_address": "sandy@sqlalchemy.org"},
                {"username": 'sandy', "email_address": "sandy@squirrelpower.org"},
            ]
        )
        conn.commit()
    






    select() 表达式

    我们使用 select() 表达式进行 SQL 的查询操作;我们把语句字符串化,让它更好理解。

    from sqlalchemy import select
    
    stmt = select(user_table).where(user_table.c.name == 'spongebob')
    print(stmt)
    
    # 输出结果:
    SELECT user_account.id, user_account.name, user_account.fullname
    FROM user_account
    WHERE user_account.name = :name_1
    

    现在我们执行这个查询语句。

    from sqlalchemy import select
    
    stmt = select(user_table).where(user_table.c.name == 'spongebob')
    
    with engine.connect() as conn:
        for row in conn.execute(stmt):
            print(row)
    
    # 输出结果:
    FROM user_account
    WHERE user_account.name = ?
    2022-03-22 11:02:50,869 INFO sqlalchemy.engine.Engine [generated in 0.00171s] ('spongebob',)
    (1, 'spongebob', 'Spongebob Squarepants')
    2022-03-22 11:02:50,902 INFO sqlalchemy.engine.Engine ROLLBACK
    

    当我们使用 ORM 方法定义数据库对象的时候,我们更适合的方式是使用 Session.execute() 来执行语句。

    from sqlalchemy.orm import Session
    from sqlalchemy import select
    
    stmt = select(User).where(User.name == 'spongebob')
    with Session(engine) as session:
        for row in session.execute(stmt):
            print(row)
    
    # 输出结果
    FROM user_account
    WHERE user_account.name = ?
    INFO sqlalchemy.engine.Engine [generated in 0.00102s] ('spongebob',)
    (User(id=1, name='spongebob', fullname='Spongebob Squarepants'),)
    INFO sqlalchemy.engine.Engine ROLLBACK
    

    两种方法有什么区别呢,我们把它字符串化打印出来。

    >>> print(select(user_table))
    SELECT user_account.id, user_account.name, user_account.fullname
    FROM user_account
    
    >>> print(select(User))
    SELECT user_account.id, user_account.name, user_account.fullname
    FROM user_account
    

    可见二者是等价的。如果要访问表的某个列对象,我们怎么做呢?

    >>> print(select(user_table.c.id))
    SELECT user_account.id
    FROM user_account
    
    >>> print(select(User.id))
    SELECT user_account.id
    FROM user_account
    

    可见前一种方法要使用 Table.c 方法来访问,后一种方法可以直接访问属性名。

    以对象方式显示查询结果

    当使用 ORM 方式时,我们希望数据库中的一行能被封装为一个 Python 对象。

    from sqlalchemy.orm import Session
    from sqlalchemy import select
    
    with Session(engine) as session:
        row = session.execute(select(User)).first()
        print(row)
    
    # 输出结果:
    (User(id=1, name='spongebob', fullname='Spongebob Squarepants'),)
    

    上例中,我们选择了 user_account 的第一行,并根据改行的数据封装成一个 User 对象;注意,row 现在是一个元组。

    >>> row[0]
    User(id=1, name='spongebob', fullname='Spongebob Squarepants')
    

    另一个方法是使用 session.scalars 方法来执行语句。

    with Session(engine) as session:
        user = session.scalars(select(User)).first()
        print(user)
    

    也可以直接获取行的某个数值而不返回一个 User 对象。

    with Session(engine) as session:
        row = session.execute(select(User.name, User.fullname)).first()
        print(row)
    
    # 输出结果:
    ('spongebob', 'Spongebob Squarepants')
    
    多重查询条件
    with Session(engine) as session:
        row = session.execute(
            select(User.name, Address).
            where(User.id==Address.user_id).
            order_by(Address.id)
        ).all()
    
        print(row)
    
    # 输出结果:
    [('spongebob', Address(id=1, email_address='spongebob@sqlalchemy.org')),
    ('sandy', Address(id=2, email_address='sandy@sqlalchemy.org')),
    ('sandy', Address(id=3, email_address='sandy@squirrelpower.org'))]
    

    上例中的查询相当于:

    SELECT user_account.name, address.id, address.email_address, address.user_id
    FROM user_account, address
    WHERE user_account.id = address.user_id ORDER BY address.id
    
    label 方法

    ColumnElement.label() 方法可以为查询结果设定一个“别名”,然后在其他地方调用。

    with Session(engine) as session:
        stmt = (
            select(
                ("Username: " + user_table.c.name).label("username"),
            ).order_by(user_table.c.name)
        )
        
        for row in session.execute(stmt):
            print(f"{row.username}")
    
    # 输出结果:
    Username: patrick
    Username: sandy
    Username: spongebob
    
    text 方法和 literal_column 方法

    text() 方法可以在 select() 表达式中增加一个自定义的字符串作为列对象。

    from sqlalchemy import select, text
    
    stmt = (
        select(
            text("'some phrase'"), user_table.c.name
        ).order_by(user_table.c.name)
    )
    
    
    with engine.connect() as conn:
        print(conn.execute(stmt).all())
    
    # 输出结果:
    [('some phrase', 'patrick'), ('some phrase', 'sandy'), ('some phrase', 'spongebob')]
    

    如果想把这种自定义的字符串和 label() 方法结合一起使用,可以用 literal_column() 方法实施。

    from sqlalchemy import literal_column
    
    stmt = (
        select(
            literal_column("'some phrase'").label("p"), user_table.c.name
        ).order_by(user_table.c.name)
    )
    
    with engine.connect() as conn:
        for row in conn.execute(stmt):
            print(f"{row.p}, {row.name}")
    
    # 输出结果:
    some phrase, patrick
    some phrase, sandy
    some phrase, spongebob
    






    where 字句

    where 子句用于使用查询条件,大多数 Python 运算符,如 ==!=<>=等都支持。

    >>> print(user_table.c.name == 'squidward')
    user_account.name = :name_1
    
    >>> print(address_table.c.user_id > 10)
    address.user_id > :user_id_1
    

    select 方法和 where 字句结合使用。

    >>> print(select(user_table).where(user_table.c.name == 'squidward'))
    SELECT user_account.id, user_account.name, user_account.fullname
    FROM user_account
    WHERE user_account.name = :name_1
    

    多个 where 子句可以并列使用,产生 AND 的效果。

    print(
        select(address_table.c.email_address).
        where(user_table.c.name == 'squidward').
        where(address_table.c.user_id == user_table.c.id)
    )
    
    # 输出结果:
    SELECT address.email_address
    FROM address, user_account
    WHERE user_account.name = :name_1 AND address.user_id = user_account.id
    

    另一种写法:

    print(
        select(address_table.c.email_address).
        where(
            user_table.c.name == 'squidward',
            address_table.c.user_id == user_table.c.id
        )
    )
    

    ANDOR 表达式可以使用 sqlalchemy 的 and_()or_() 方法来实现。

    from sqlalchemy import and_, or_
    
    print(
        select(Address.email_address).
        where(
            and_(
                or_(User.name == 'squidward', User.name == 'sandy'),
                Address.user_id == User.id
            )
        )
    )
    
    # 输出结果:
    SELECT address.email_address
    FROM address, user_account
    WHERE (user_account.name = :name_1 OR user_account.name = :name_2) AND address.user_id = user_account.id
    

    当我们使用 ORM 定义数据库时,还有一个简单的查询方法,就是使用 Select.filter_by() 方法。

    print(
        select(User).filter_by(name='spongebob', fullname='Spongebob Squarepants')
    )
    
    # 输出结果:
    SELECT user_account.id, user_account.name, user_account.fullname
    FROM user_account
    WHERE user_account.name = :name_1 AND user_account.fullname = :fullname_1
    






    join 子句

    要在 sqlalchemy 实现 JOIN 操作(JOIN 的具体含义见:SQL - JOIN
    ),一种方式是使用 Select.join_from()

    print(
        select(user_table.c.name, address_table.c.email_address).
        join_from(user_table, address_table)
    )
    
    # 输出结果:
    SELECT user_account.name, address.email_address
    FROM user_account JOIN address ON user_account.id = address.user_id
    

    另一种方式是使用 Select.join() 方法。

    print(
        select(user_table.c.name, address_table.c.email_address).
        join(address_table)
    )
    
    # 输出结果:
    SELECT user_account.name, address.email_address
    FROM user_account JOIN address ON user_account.id = address.user_id
    

    使用 join() 时候也可以把左表和右表的关系都写清楚。

    print(
        select(user_table.c.name, address_table.c.email_address).
        select_from(user_table).join(address_table)
    )
    

    那么使用 join() 的时候, sqlalchemy 怎么知道我们连接两表的条件是 user_account.id = address.user_id 的呢?这是因为我们的数据库模型设定了外键:ForeignKey('user_account.id')

    如果没有在数据库模型中设定外键,可以给 join() 方法额外添加一个参数来起到 ON 子句的作用。

    print(
        select(address_table.c.email_address).
        select_from(user_table).
        join(address_table, user_table.c.id == address_table.c.user_id)
    )
    
    LEFT OUTER JOIN 与 FULL OUTER JOIN

    当使用 join_from()join() 方法时候默认为 INNER JOIN,如果使用 LEFT OUTER JOINFULL OUTER JOIN 时,可以带上 Select.join.isouterSelect.join.full 参数。

    print(
        select(user_table).join(address_table, isouter=True)
    )
    
    # 输出结果:
    SELECT user_account.id, user_account.name, user_account.fullname
    FROM user_account LEFT OUTER JOIN address ON user_account.id = address.user_id
    
    print(
        select(user_table).join(address_table, full=True)
    )
    
    # 输出结果:
    SELECT user_account.id, user_account.name, user_account.fullname
    FROM user_account FULL OUTER JOIN address ON user_account.id = address.user_id
    

    还有一个 Select.outerjoin() 方法可以实现 LEFT OUTER JOIN

    print(
        select(user_table).outerjoin(address_table)
    )
    
    # 输出结果:
    SELECT user_account.id, user_account.name, user_account.fullname
    FROM user_account LEFT OUTER JOIN address ON user_account.id = address.user_id
    

    注意:SQL 还有一个 RIGHT OUTER JOIN 的子句,但 sqlalchemy 并不直接使用它,要达到该效果可以把左右表调转位置后再使用 LEFT OUTER JOIN






    ORDER BY, GROUP BY, HAVING 子句

    ORDER BY

    使用 Select.order_by() 方法来实现 ORDER BY 子句的功能。

    print(select(user_table).order_by(user_table.c.name))
    
    # 输出结果:
    SELECT user_account.id, user_account.name, user_account.fullname
    FROM user_account ORDER BY user_account.name
    

    要使用升序或降序来排序可以使用 ColumnElement.asc()ColumnElement.desc()

    print(select(user_table).order_by(user_table.c.fullname.desc()))
    
    # 输出结果:
    SELECT user_account.id, user_account.name, user_account.fullname
    FROM user_account ORDER BY user_account.fullname DESC
    
    使用 GROUP BY, HAVING 的聚合函数

    在 SQL 中,聚合函数(aggregate functions)允许将多行的列表达式聚合在一起,产生一个单一的结果。比如:计算数量、计算平均数、最大值、最小值等。

    >>> from sqlalchemy import func
    >>> count_fn = func.count(user_table.c.id)
    >>> print(count_fn)
    count(user_account.id)
    

    使用聚合函数时,GROUP BY 子句定义了分组的依据,HAVING 子句的使用方式与 WHERE 子句类似,用于定义过滤的条件。

    SQLAlchemy 使用 Select.group_by()Select.having() 方法来实现 GROUP BY 子句和 HAVING 子句。

    下面来看一个例子:

    from sqlalchemy import select, func
    
    with engine.connect() as conn:
        result = conn.execute(
            select(User.name, func.count(Address.id).label("count")).
            join(Address).
            group_by(User.name).
            having(func.count(Address.id) > 1)
        )
        print(result.all())
    
    # 查询结果:
    [('sandy', 2)]
    

    这等价于这样的 SQL 语句。

    SELECT user_account.name, count(address.id) AS count
    FROM user_account JOIN address 
    ON user_account.id = address.user_id 
    GROUP BY user_account.name
    HAVING count(address.id) > 1
    
    对聚合结果排序

    使用聚合函数后会产生一个或多个新的列,我们可以对这些新产生的列进行排序操作,使用 label 方法给它们加上自定义的名字会更方便。

    stmt = select(
        Address.user_id,
        func.count(Address.id).label('num_addresses')).\
        group_by("user_id").order_by("user_id", desc("num_addresses")
    )
    print(stmt)
    
    # 输出结果:
    SELECT address.user_id, count(address.id) AS num_addresses
    FROM address GROUP BY address.user_id ORDER BY address.user_id, num_addresses DESC
    






    使用别名(alias)

    有时候我们需要对同一个表的同一列作多次引用,这时候我们就需要使用 SQL 的 AS 子句对该列定一个“别名”。sqlalchemy 中我们使用 FromClause.alias() 方法来实现。

    user_alias_1 = user_table.alias()
    user_alias_2 = user_table.alias()
    
    print(
        select(user_alias_1.c.name, user_alias_2.c.name).
        join_from(user_alias_1, user_alias_2, user_alias_1.c.id > user_alias_2.c.id)
    )
    
    # 输出结果:
    SELECT user_account_1.name, user_account_2.name AS name_1
    FROM user_account AS user_account_1 
    JOIN user_account AS user_account_2 ON user_account_1.id > user_account_2.id
    
    ORM 下的别名

    使用 ORM 时我们用 aliased() 来定义别名。

    from sqlalchemy.orm import aliased
    
    address_alias_1 = aliased(Address)
    address_alias_2 = aliased(Address)
    
    print(
        select(User).
        join_from(User, address_alias_1).
        where(address_alias_1.email_address == 'patrick@aol.com').
        join_from(User, address_alias_2).
        where(address_alias_2.email_address == 'patrick@gmail.com')
    )
    
    # 输出结果:
    SELECT user_account.id, user_account.name, user_account.fullname
    FROM user_account 
    JOIN address AS address_1 ON user_account.id = address_1.user_id 
    JOIN address AS address_2 ON user_account.id = address_2.user_id
    WHERE address_1.email_address = :email_address_1 AND address_2.email_address = :email_address_2
    

    相关文章

      网友评论

          本文标题:3. 数据操作

          本文链接:https://www.haomeiwen.com/subject/cvacdrtx.html