SQLAlchemy 之 SQL 表达式语言指南(下)

作者: 0xE8551CCB | 来源:发表于2017-11-29 22:07 被阅读590次

由于简书限制,无法在一篇文章中包含太多文字,所以分成了上下两个部分

使用原生的 SQL

上一个例子实际上是很少见。想要将一个我们理解的文本 SQL,利用 SQLAlchemy 提供的各个部分,转换成 Python 风格的表达式是有一定难度的。所以,有时候当你知道了文本 SQL,并且对于动态特性支持要求不多,那就可以直接使用。text() 构造器可以用来构建文本 SQL,并保持大部分不会变化。下面,我们创建一个 text() 对象,并执行它:

from sqlalchemy.sql import text

s = text(
    "SELECT user.fullname || ', ' || address.email_address AS title "
    "FROM user, address "
    "WHERE user.id == address.user_id "
    "AND user.name BETWEEN :x AND :y "
    "AND (address.email_address LIKE :e1 "
        "OR address.email_address LIKE :e2)"
)
conn.execute(s, x='m', y='z', e1='%@aol.com', e2='%@msn.com').fetchall()
[('Wendy Williams, wendy@aol.com',)]

可以看到,在 text() 中,我们可以使用 :param 的方式创建绑定的参数,这种定义格式和后端的数据库无关,完全由 SQLAlchemy 解析。要想传递参数值,在 execute() 调用时,传递关键词参数即可。

指定绑定参数的行为

text() 构造器支持使用 TextClause.bindparams() 方法提前构建绑定值:

stmt = text("SELECT * FROM user WHERE user.name BETWEEN :x AND :y")
stmt = stmt.bindparams(x='m', y='z')

参数还支持显式指定类型呢:

from sqlalchemy import bindparam

stmt = stmt.bindparams(bindparam('x', String), bindparam('y', String))
result = conn.execute(stmt, x='m', y='z')

当 Python 这边或者某些特殊场合 SQL 那边处理需要提供指定类型时,绑定参数类型就可以派上用场了。

指定结果列行为

我们也可以使用 TextClause.columns() 方法指定结果列信息;这个方法可以用来基于名称指定返回类型:

stmt = stmt.columns(id=Integer, name=String)

或者可以按照对应位置传递完整的列表达式,不管是否指定了类型。这种情况下,能够在文本 SQL 中显式列出列是非常棒的主义,因为相关列表达式会按照位置自动关联到对应的列:

stmt = text("SELECT id, name FROM users")
stmt = stmt.columns(users.c.id, users.c.name)

当调用了 TextClause.columns() 方法后,我们会得到一个 TextAsFrom 对象,它支持 TextAsFrom.c 全部功能以及其它可选择的操作:

j = stmt.join(addresses, stmt.c.id == addresses.c.user_id)

new_stmt = select([stmt.c.id, addresses.c.id]).\
    select_from(j).where(stmt.c.name == 'x')
str(new_stmt)
'SELECT id, address.id \nFROM (SELECT id, name FROM users) JOIN address ON id = address.user_id \nWHERE name = :name_1'

TextClause.columns() 基于位置的形式在需要将文本 SQL 和现有的 Core 或者 ORM 模型关联时特别有用,因为我们可以直接使用列表达式而不用担心命名冲突或者其它和文本 SQL 中结果列中的其它相关问题:

stmt = text("SELECT user.id, address.id, user.id, user.name, "
            "address.email_address AS email "
            "FROM user JOIN address ON user.id = address.user_id "
            "WHERE user.id = 1").columns(
                users.c.id,
                addresses.c.id,
                addresses.c.user_id,
                users.c.name,
                addresses.c.email_address)
result = conn.execute(stmt)

我们可以看到有三列都叫做 id,但实际上由于我们有关联的列表达式,所有不用担心 id 命名冲突的问题,它会使用真正的列对象作为键。所以我们获取 email_address 列的方式可以这样:

row = result.fetchone()
row[addresses.c.email_address]
'jack@yahoo.com'

相反,如果我们使用字符串列键,我们将会看到冲突列错误提示:

try:
    row['id']
except Exception as err:
    print(err)
Ambiguous column name 'id' in result set column descriptions

需要格外注意的是,当使用 Column 对象从结果集中访问列似乎不太正常,但它实际上是 ORM 使用的唯一机制,这些都是 Query 对象在使用时透明地发生;所以,TextClause.columns() 在 ORM 环境中使用文本 SQL 非常适用。参见使用文本 SQL了解更多。

在更长的语句中使用 text() 片段

我们可以在 select() 对象中使用 text() 生成 SQL 片段,select 对象绝大多数的构建函数都可以将 text() 对象作为参数。下面,我们将在 select() 对象中使用 text()select() 构造器为我们提供了语句的轮廓,而 text() 构造器则在其中提供内容。我们可以在不引用任何预先构建的 Table 元数据的情况下生成一个语句:

s = select([
    text("user.fullname || ', ' || address.email_address AS title")
]). \
    where(
    and_(
        text("user.id = address.user_id"),
        text("user.name BETWEEN 'm' AND 'z'"),
        text(
            "(address.email_address LIKE :x "
            "OR address.email_address LIKE :y)")
    )
).select_from(text('user, address'))

conn.execute(s, x="%@aol.com", y='%@msn.com').fetchall()
[('Wendy Williams, wendy@aol.com',)]

借助 table(), literal_column() 和 column 使用更详细的文本

我们也可以朝着结构层次的反方向进行(与 ORM 对立),只需要为语句中部分关键元素使用 column(), literal_column()table() 就好咯。使用这些构造器,可以使我们获得比 text() 更多的表达能力,因为它们为存储的字符串如何使用提供了更多核心信息,而且不需要深入基于 Table 的元数据。下面的例子中,我们也给两个关键的 literal_column() 对象指定了 String 类型,以便字符串特定的连接可以工作。我们使用 literal_column() 也是为了使用表格式的表达,如 user.fullname,它们都会被如是地渲染;使用 column() 则暗示着一个独立的列名可能会被引用:

from sqlalchemy.sql import table, literal_column

s = select([
    literal_column("user.fullname", String) +
    ', ' +
    literal_column("address.email_address").label("title")
]). \
    where(
    and_(
        literal_column("user.id") == literal_column("address.user_id"),
        text("user.name BETWEEN 'm' AND 'z'"),
        text(
            "(address.email_address LIKE :x OR "
            "address.email_address LIKE :y)")
    )
).select_from(table('user')).select_from(table('address'))

conn.execute(s, x='%@aol.com', y='%@msn.com').fetchone()
('Wendy Williams, wendy@aol.com',)

使用标签实现排序和分组

有时候我们希望能在 ORDER BY 或者 GROUP BY 中使用标签字符串引用被贴了标签的列对象,此外,在 OVER 或者 DISTINCT 等子句中都可能会使用到。实际上,如果在 select() 构造器中,如果有这样的标签,我们就可以在 select.order_by() 或者 select.group_by() 等中直接使用标签字符串。这会直接映射到被贴了标签的列,同时也会防止表达式渲染两次:

from sqlalchemy import func
stmt = select([
    addresses.c.user_id.label('user_id'),
    func.count(addresses.c.id).label('num_addresses')
]).group_by('user_id').order_by('num_addresses')

print(stmt)
conn.execute(stmt).fetchall()
SELECT address.user_id AS user_id, count(address.id) AS num_addresses 
FROM address GROUP BY user_id ORDER BY num_addresses

[(1, 2), (2, 2)]

我们也可以在 asc() 或者 desc() 中使用标签字符串:

from sqlalchemy import func, desc
stmt = select([
    addresses.c.user_id.label('user_id'),
    func.count(addresses.c.id).label('num_addresses')
]).group_by('user_id').order_by(desc('num_addresses'))

print(stmt)
conn.execute(stmt).fetchall()
SELECT address.user_id AS user_id, count(address.id) AS num_addresses 
FROM address GROUP BY user_id ORDER BY num_addresses DESC

[(1, 2), (2, 2)]

注意,上述特性只有在我们使用了 label() 方法创建了特殊命名的标签后才有效。在其它情形中,我们会直接引用 ColumnElement 对象,这样可以让表达式系统自动抉择渲染的最佳方式。下面的例子演示了在想要基于相同的列名出现多次的情况下如何使用 ColumnElement 消除访问歧义:

ula, ulb = users.alias(), users.alias()
stmt = select([ula, ulb]).where(ula.c.name > ulb.c.name).order_by(ula.c.name)
conn.execute(stmt).fetchall()
[(2, 'wendy', 'Wendy Williams', 1, 'Jack', 'Jack Jones')]

使用别名

SQL 中的别名就是和一个「重命名」表或者 SELECT 语句相关,只要使用了 SELECT ... FROM sometable AS someothername 就会出现别名。AS 为表创建了新的名称。别名是非常重要的构造器,它们允许我们使用不同的名称引用任何表或者子查询。对于表而言,我们可以在 FROM 子句中给表创建多个别名。而对于 SELECT 语句,它可以为语句所表示的列提供一个父名称从而可以使用这个名称来引用。

在 SQLAlchemy 中,任何 Tableselect() 构造器或者其它可选的都可以使用 FromClause.alias() 方法创建别名,从而产生 Alias 构造器。举个栗子,我们假设知道用户 jack 有两个不同的邮箱。那我们如何才能根据这两个地址组合定位到 jack 呢?为了达到目的,我们对 address 表使用联合,每次针对一个地址。我们给 address 表创建两个 Alias 构造器,然后在 select() 构造器中使用它们:

a1 = addresses.alias()
a2 = addresses.alias()

s = select([users]). \
    where(and_(
    users.c.id == a1.c.user_id,
    users.c.id == a2.c.user_id,
    a1.c.email_address == 'jack@msn.com',
    a2.c.email_address == 'jack@yahoo.com'
))

# SELECT users.id, users.name, users.fullname
# FROM users, addresses AS addresses_1, addresses AS addresses_2
# WHERE users.id = addresses_1.user_id
#     AND users.id = addresses_2.user_id
#     AND addresses_1.email_address = ?
#     AND addresses_2.email_address = ?
# ('jack@msn.com', 'jack@yahoo.com')

conn.execute(s).fetchall()  
[(1, 'Jack', 'Jack Jones')]

注意 Alias 构造器在最终的 SQL 结果中生成了 addresses_1addresses_2。这些命名的生成是由它们在 SQL 中的位置决定的。如果我们创建的查询中只使用了 a2 这个别名,则最终产生的名称是 addresses_1。这种生成名称的方式也是确定的,也就是说相同的 SQLAlchemy 语句构造器在使用相同的「SQL 方言」时会产生完全相同的 SQL 字符串语句。

因为在外部,我们只需要使用 Alias() 构造器自己,而不用关心它产生的名称。但为了方便调试,我们也可以为它传递一个自定义的名称:

a1 = addresses.alias('a1')

当然,我们也可以对任何可 SELECT 的对象使用 alias,甚至是 SELECT 语句自己。我们可以将 user 表和使用 select() 创建的完整语句的别名进行自联合(self-join)。这里使用的 correlate(None) 是为了防止 SQLAlchemy 尝试将内 user 表和外面的表「关联」起来:

a1 = s.correlate(None).alias()
s = select([users.c.name]).where(users.c.id == a1.c.id)
print(s)
conn.execute(s).fetchall()
SELECT "user".name 
FROM "user", (SELECT "user".id AS id, "user".name AS name, "user".fullname AS fullname 
FROM "user", address AS address_1, address AS address_2 
WHERE "user".id = address_1.user_id AND "user".id = address_2.user_id AND address_1.email_address = :email_address_1 AND address_2.email_address = :email_address_2) AS anon_1 
WHERE "user".id = anon_1.id

[('Jack',)]

使用 Join

构建任何 SELECT 表达式的旅程已经完成一半了。下一个关键点就是学习 JOIN 表达式。我们已经在前面看到联结的示例了,但那只是将两个表放在 select() 的列子句或者 WHERE 子句中而已。如果我们想要生成真正的 JOIN 或者 OUTERJOIN 语句,就需要使用 join()outerjoin() 方法,通常是从联结的左表开始访问:

print(users.join(addresses))
"user" JOIN address ON "user".id = address.user_id

细心的读者会发现更多惊喜!SQLAlchemy 知道如何 JOIN 两张表!它基于我们之前在 address 表中设置的 ForeignKey 自动生成了 ON 条件。这下看起来 join() 才是联结表的大杀器啊。

当然啦,你可以联结任何想要的表达式,比如说,我们可以把所有用户名和邮箱名(去除 @xyz.com 这种后缀)相同的用户联结起来:

print(users.join(addresses, addresses.c.email_address.like(users.c.name + '%')))
"user" JOIN address ON address.email_address LIKE ("user".name || :name_1)

当我们创建 select() 语句时,SQLAlchemy 会查看我们提及的表,然后把他们放在 FROM 子句中。而当我们使用 JOIN 时,我们知道需要什么样的 FROM 子句,所以我们可以充分利用 select_from() 方法:

s = select([users.c.fullname]).select_from(
    users.join(addresses, addresses.c.email_address.like(users.c.name + '%'))
)
print(s)
conn.execute(s).fetchall()
SELECT "user".fullname 
FROM "user" JOIN address ON address.email_address LIKE ("user".name || :name_1)

[('Jack Jones',), ('Jack Jones',), ('Wendy Williams',)]

outerjoin() 方法会创建 LEFT OUTER JOIN 语句,用法和 join() 一致:

s = select([users.c.fullname]).select_from(users.outerjoin(addresses))
print(s)
SELECT "user".fullname 
FROM "user" LEFT OUTER JOIN address ON "user".id = address.user_id

以上就是 outerjoin() 的输出,当然,除非你正在使用 Oracle 9 以前的版本,并且你已经设置了 engine 为 Oracle-specific SQL:

from sqlalchemy.dialects.oracle import dialect as OracleDialect
print(s.compile(dialect=OracleDialect(use_ansi=False)))
SELECT "user".fullname 
FROM "user", address 
WHERE "user".id = address.user_id(+)

如果你看不懂上面 SQL 的含义,不用担心!Oracle DBA 的神秘部落不希望他们的黑魔法为众人所知 :)。

其它重要话题

我们已经介绍了创建 SQL 表达式的概念。剩下的就是各个主题的各个变种了。接下来,我们将罗列出剩余最重要的事情。

绑定参数对象(Bind Parameter Objects)

纵观这些列子,SQLAlchemy 在常量表达式出现时就会忙着生成绑定参数了。你也可以使用自定义的命名来绑定参数,并且重复使用相同的语句。bindparam() 构造器就可以用来生成一个指定名称的绑定参数。SQLAlchemy 总是会在 API 这侧使用命名绑定参数,而数据库方言则会在执行时转换成合适的命名或者占位符形式的语句,下面的例子就是转换成 SQLite 形式的占位符:

from sqlalchemy.sql import bindparam
s = users.select(users.c.name == bindparam('username'))
print(s)
conn.execute(s, username='wendy').fetchall()
SELECT "user".id, "user".name, "user".fullname 
FROM "user" 
WHERE "user".name = :username

[(2, 'wendy', 'Wendy Williams')]

bindparam() 的另一个重要方面就是它可以指定类型。绑定参数的类型会在表达式中决定自己的行为,并且会在发送数据到数据库之前进行相应的处理:

s = users.select(users.c.name.like(bindparam('username', type_=String) + text("'%'")))
print(s)
conn.execute(s, username='wendy').fetchall()
SELECT "user".id, "user".name, "user".fullname 
FROM "user" 
WHERE "user".name LIKE (:username || '%')

[(2, 'wendy', 'Wendy Williams')]

相同名称的 bindparam() 构造器可以使用多次,在执行参数中只需要一个命名值即可:

s = select([users, addresses]). \
    where(
    or_(
        users.c.name.like(
            bindparam('name', type_=String) + text("'%'")),
        addresses.c.email_address.like(
            bindparam('name', type_=String) + text("'@%'"))
    )
). \
    select_from(users.outerjoin(addresses)). \
    order_by(addresses.c.id)

print(s)
conn.execute(s, name='jack').fetchall()
SELECT "user".id, "user".name, "user".fullname, address.id, address.user_id, address.email_address 
FROM "user" LEFT OUTER JOIN address ON "user".id = address.user_id 
WHERE "user".name LIKE (:name || '%') OR address.email_address LIKE (:name || '@%') ORDER BY address.id

[(1, 'Jack', 'Jack Jones', 1, 1, 'jack@yahoo.com'),
 (1, 'Jack', 'Jack Jones', 2, 1, 'jack@msn.com')]

函数

SQL 函数可以用 func 关键字创建,它使用属性访问的方式生成函数:

from sqlalchemy.sql import func
print(func.now())

now()
print(func.concat('x', 'y'))
concat(:concat_1, :concat_2)

所谓的「生成」,是指任何你选择的名称都会创建成 SQL 函数:

print(func.foo_function())
foo_function()

有些名称是 SQLAlchemy 知道的,允许使用一些特殊规则。例如有些是 ANSI 函数,这就意味着它们不会包含 (),如 CURRENT_TIMESTAMP

print(func.current_timestamp())
CURRENT_TIMESTAMP

通常在 SELECT 语句的列上使用函数,当然也可以给它们打标签和指定类型。我们推荐你给函数贴标签,这样可以在结果行中通过标签名称得到具体的结果,当然,如果你需要结果集能够实现 Unicode 转换或者日期转换,就可以指定具体的类型。下面的例子中,我们使用 scalar() 函数读取第一行第一列的内容,然后关闭结果;虽然下面出现了标签,但这种用法下并不重要:

s = select([
    func.max(addresses.c.email_address, type_=String).
        label('maxemail')
])

print(s)
conn.execute(s).scalar()
SELECT max(address.email_address) AS maxemail 
FROM address

'www@www.org'

像 PostgreSQL 和 Orcale 数据库,它们支持函数返回完成的结果集,并可以放到可选单元中,从而在语句中使用。例如,有这么一个 calculate() 数据库函数,它接收参数 xy,然后返回了三列结果,姑且叫作 q, zr,我们可以使用 lexical 列对象和绑定参数:

from sqlalchemy.sql import column

calculate = select([column('q'), column('z'), column('r')]). \
    select_from(
    func.calculate(
        bindparam('x'),
        bindparam('y')
    )
)

calc = calculate.alias()
print(select([users]).where(users.c.id > calc.c.z))
SELECT "user".id, "user".name, "user".fullname 
FROM "user", (SELECT q, z, r 
FROM calculate(:x, :y)) AS anon_1 
WHERE "user".id > anon_1.z

如果我们想要第二次使用 calculate 语句,并且传递不同的绑定参数,我们可以使用 unique_params() 函数,它会创建一份拷贝,并将绑定参数标记为「唯一的」,从而隔离冲突命名。注意到,我们还创建了两个别名:

calc1 = calculate.alias('c1').unique_params(x=17, y=45)
calc2 = calculate.alias('c2').unique_params(x=5, y=12)
s = select([users]).where(users.c.id.between(calc1.c.z, calc2.c.z))

print(s)
s.compile().params 
SELECT "user".id, "user".name, "user".fullname 
FROM "user", (SELECT q, z, r 
FROM calculate(:x_1, :y_1)) AS c1, (SELECT q, z, r 
FROM calculate(:x_2, :y_2)) AS c2 
WHERE "user".id BETWEEN c1.z AND c2.z

{'x_1': 17, 'x_2': 5, 'y_1': 45, 'y_2': 12}

窗函数(Window Functions)

任何 FunctionElement,包括由 func 生成的函数,都可以转换成「窗函数」,这就是 OVER 子句,我们可以使用 FunctionElement.over() 方法创建:

s = select([
    users.c.id,
    func.row_number().over(order_by=users.c.name)
])
print(s)
SELECT "user".id, row_number() OVER (ORDER BY "user".name) AS anon_1 
FROM "user"

FunctionElement.over() 也支持指定范围,可以使用 expression.over.rows 或者 expression.over.range 实现:

s = select([
    users.c.id,
    func.row_number().over(
        order_by=users.c.name,
        rows=(-2, None))
])

print(s)
SELECT "user".id, row_number() OVER (ORDER BY "user".name ROWS BETWEEN :param_1 PRECEDING AND UNBOUNDED FOLLOWING) AS anon_1 
FROM "user"

expression.over.rowsexpression.over.range 都支持传入一个元组,它包含的是一个复值和正值的范围组合,0 表示 CURRENT ROW,而 None 则表示 UNBOUNDED。可以参见 over() 详细了解。

联合体(Unions)以及其它集合操作

有两种形式的联合,即 UNION 和 UNION ALL,我们可以使用模块级函数 union()union_all() 来生成:

from sqlalchemy.sql import union

u = union(
    addresses.select().
        where(addresses.c.email_address == 'foo@bar.com'),
    addresses.select().
        where(addresses.c.email_address.like('%@yahoo.com')),
).order_by(addresses.c.email_address)
print(u)
conn.execute(u).fetchall()
SELECT address.id, address.user_id, address.email_address 
FROM address 
WHERE address.email_address = :email_address_1 UNION SELECT address.id, address.user_id, address.email_address 
FROM address 
WHERE address.email_address LIKE :email_address_2 ORDER BY address.email_address

[(1, 1, 'jack@yahoo.com')]

当然,还有些并非在所有数据库都支持的集合操作函数:intersect(), intersect_all(), except_()except_all()

from sqlalchemy.sql import except_

u = except_(
    addresses.select().
        where(addresses.c.email_address.like('%@%.com')),
    addresses.select().
        where(addresses.c.email_address.like('%@msn.com'))
)

print(u)
conn.execute(u).fetchall()
SELECT address.id, address.user_id, address.email_address 
FROM address 
WHERE address.email_address LIKE :email_address_1 EXCEPT SELECT address.id, address.user_id, address.email_address 
FROM address 
WHERE address.email_address LIKE :email_address_2

[(1, 1, 'jack@yahoo.com'), (4, 2, 'wendy@aol.com')]

这些所谓的「复合」选择所带来的常见问题是它们实际上都是嵌套在括号里面的。SQLite 特别不喜欢以括号开头的语句。所以,当需要将一个「复合」嵌套在另外一个「复合」中时,通常就需要给最外层的「复合」的第一个元素(如果也是「复合」的情况下)使用 .alias().select() 方法。例如,要想将一个 union 和一个 select 放在 except_ 中,SQLite 希望 union 是一个子查询:

u = except_(
    union(
        addresses.select().
            where(addresses.c.email_address.like('%@yahoo.com')),
        addresses.select().
            where(addresses.c.email_address.like('%@msn.com'))
    ).alias().select(),  # 此处应用子查询
    addresses.select(addresses.c.email_address.like('%@msn.com'))
)

print(u)
conn.execute(u).fetchall()
SELECT anon_1.id, anon_1.user_id, anon_1.email_address 
FROM (SELECT address.id AS id, address.user_id AS user_id, address.email_address AS email_address 
FROM address 
WHERE address.email_address LIKE :email_address_1 UNION SELECT address.id AS id, address.user_id AS user_id, address.email_address AS email_address 
FROM address 
WHERE address.email_address LIKE :email_address_2) AS anon_1 EXCEPT SELECT address.id, address.user_id, address.email_address 
FROM address 
WHERE address.email_address LIKE :email_address_3

[(1, 1, 'jack@yahoo.com')]

标量选择(Scalar Selects)

标量选择就是指 SELECT 查询后返回一行中的一列结果。然后,它可以用作一个列表达式。一个标量选择通常是一个相关子查询,它依赖于 SELECT 语句,从而至少获得一个 FROM 子句。

通过调用 as_scalar()label() 方法,可以把 select() 构造器修改成一个列表达式:

stmt = select([func.count(addresses.c.id)]). \
    where(users.c.id == addresses.c.user_id). \
    as_scalar()
print(stmt)
(SELECT count(address.id) AS count_1 
FROM address, "user" 
WHERE "user".id = address.user_id)

上面的 stmt 现在就是一个 ScalarSelect 对象了,而不再是 FromClause 层次中的一部分了;相反,它是 ColumnElement 家族表达式成员之一了。我们可以 stmt 和其它列放在一起使用:

conn.execute(select([users.c.name, stmt])).fetchall()
[('Jack', 2), ('wendy', 2)]

要想给标量选择表达式加个名称,可以使用 SelectBase.label():

stmt = select([func.count(addresses.c.id)]). \
    where(users.c.id == addresses.c.user_id). \
    label('addr_count')

print(select([users.c.name, stmt]))
conn.execute(select([users.c.name, stmt])).fetchall()
SELECT "user".name, (SELECT count(address.id) AS count_1 
FROM address 
WHERE "user".id = address.user_id) AS addr_count 
FROM "user"

[('Jack', 2), ('wendy', 2)]

相关子查询

注意到在上一节的例子中,每个嵌入的 select 中并没有在 FROM 子句中包含 user 表。这是因为 SQLAlchemy 会自动将嵌入的 FROM 对象关联到一个闭合的查询中(如果存在,并且内部 SELECT 语句仍然有至少一个属于自己的 FROM 子句的话)。示例:

stmt = select([addresses.c.user_id]). \
    where(addresses.c.user_id == users.c.id). \
    where(addresses.c.email_address == 'jack@yahoo.com')
enclosing_stmt = select([users.c.name]).where(users.c.id == stmt)
print(enclosing_stmt)
conn.execute(enclosing_stmt).fetchall()
SELECT "user".name 
FROM "user" 
WHERE "user".id = (SELECT address.user_id 
FROM address 
WHERE address.user_id = "user".id AND address.email_address = :email_address_1)

[('Jack',)]

虽然自动相关联可以像预期那样工作,但我们仍然可以控制它。例如,我们希望一条语句只和 address 表相关,而不需要 user 表,即使它们都出现在了闭合的 SELECT 中。我们可以使用 correlate() 方法指定这些可能可以相关联的 FROM 子句:

stmt = select([users.c.id]). \
    where(users.c.id == addresses.c.user_id). \
    where(users.c.name == 'jack'). \
    correlate(addresses)
enclosing_stmt = select(
    [users.c.name, addresses.c.email_address]). \
    select_from(users.join(addresses)). \
    where(users.c.id == stmt)
print(enclosing_stmt)
conn.execute(enclosing_stmt).fetchall()
SELECT "user".name, address.email_address 
FROM "user" JOIN address ON "user".id = address.user_id 
WHERE "user".id = (SELECT "user".id 
FROM "user" 
WHERE "user".id = address.user_id AND "user".name = :name_1)

[]

想要完全禁止相关联,可以传递 Nonecorrelate()

stmt = select([users.c.id]). \
    where(users.c.name == 'wendy'). \
    correlate(None)

enclosing_stmt = select([users.c.name]). \
    where(users.c.id == stmt)
print(enclosing_stmt)
conn.execute(enclosing_stmt).fetchall()
[('wendy',)]

我们还可以通过排除(exclusion)的方式控制关联,使用 Select.correlate_except() 方法即可。例如,我们可以为 user 表写出一条 SELECT 语句,让它和除了 user 以外的 FROM 子句关联:

stmt = select([users.c.id]). \
    where(users.c.id == addresses.c.user_id). \
    where(users.c.name == 'jack'). \
    correlate_except(users)
enclosing_stmt = select(
    [users.c.name, addresses.c.email_address]). \
    select_from(users.join(addresses)). \
    where(users.c.id == stmt)
print(enclosing_stmt)
conn.execute(enclosing_stmt).fetchall()
SELECT "user".name, address.email_address 
FROM "user" JOIN address ON "user".id = address.user_id 
WHERE "user".id = (SELECT "user".id 
FROM "user" 
WHERE "user".id = address.user_id AND "user".name = :name_1)

[]

排序、组合、限制、偏移等

通过给 order_by() 传递列表达式实现排序:

stmt = select([users.c.name]).order_by(users.c.name)
print(stmt)
conn.execute(stmt).fetchall()
SELECT "user".name 
FROM "user" ORDER BY "user".name

[('Jack',), ('wendy',)]

使用 asc()desc() 修改器控制排序方式:

stmt = select([users.c.name]).order_by(users.c.name.desc())
print(stmt)
conn.execute(stmt).fetchall()
SELECT "user".name 
FROM "user" ORDER BY "user".name DESC
    
[('wendy',), ('Jack',)]

分组就是指 GROUP BY 子句,通常和聚合函数一块使用,从而构建分组的行结果。这可以通过 group_by() 方法实现,此外,having() 方法可以帮助我们在聚合结果集上过滤:

stmt = select([users.c.name, func.count(addresses.c.id)]). \
    select_from(users.join(addresses)). \
    group_by(users.c.name)
print(stmt)
print(conn.execute(stmt).fetchall())
print()
stmt = select([users.c.name, func.count(addresses.c.id)]). \
    select_from(users.join(addresses)). \
    group_by(users.c.name). \
    having(func.length(users.c.name) > 4)
print(stmt)
conn.execute(stmt).fetchall()
SELECT "user".name, count(address.id) AS count_1 
FROM "user" JOIN address ON "user".id = address.user_id GROUP BY "user".name
[('Jack', 2), ('wendy', 2)]

SELECT "user".name, count(address.id) AS count_1 
FROM "user" JOIN address ON "user".id = address.user_id GROUP BY "user".name 
HAVING length("user".name) > :length_1

[('wendy', 2)]

通常在 SQL 中使用 DISTINCT 处理重复数据的问题,我们可以使用 Select.distinct() 方法实现:

stmt = select([users.c.name]). \
    where(addresses.c.email_address.
          contains(users.c.name)). \
    distinct()
print(stmt)
conn.execute(stmt).fetchall()
SELECT DISTINCT "user".name 
FROM "user", address 
WHERE (address.email_address LIKE '%%' || "user".name || '%%')

[('Jack',), ('wendy',)]

多数数据库都支持指定提取并返回多少行结果,通过指定「偏移」则可以指定从哪儿开始。常见的数据 PostgreSQL, MySQL 和 SQLite 都支持 LIMIT 和 OFFSET 关键字,但也有些数据库需要用到类似「窗函数」和列 id 列表的方式实现相同的功能。limit()offset() 则为我们提供了下层的抽象,可以不用关心后端数据库具体使用何种方法:

stmt = select([users.c.name, addresses.c.email_address]). \
    select_from(users.join(addresses)). \
    limit(1).offset(1)
print(stmt)
conn.execute(stmt).fetchall()
SELECT "user".name, address.email_address 
FROM "user" JOIN address ON "user".id = address.user_id
 LIMIT :param_1 OFFSET :param_2

[('Jack', 'jack@msn.com')]

插入、更新和删除

我们在前面已经学习了 insert() 的使用方法。insert() 用于生成 INSERT 语句,而 update() 则用于生成 UPDATE 语句。这两个方法都有一个 values() 接口,用于指定并生成 VALUES 或者 SET 子句。

values() 方法接收任何列表达式作为参数值:

stmt = users.update().\
    values(fullname="Fullname: " + users.c.name)
print(stmt)
conn.execute(stmt)
UPDATE "user" SET fullname=(:name_1 || "user".name)

<sqlalchemy.engine.result.ResultProxy at 0x10ee4b128>

当我们使用 insert 或者 update 执行多条记录时,我们希望能够通过命名绑定参数,从而在参数列表中引用。这两个构造器都可以在 excute() 执行时传递字典参数,并自动为传入的名称生成绑定占位符。然而,当我们先搞在组合表达式中使用显式指定的命名参数时,我们需要使用 bindparam() 方法。当在 insert()update() 中使用 bindparam() 时,表的列名就会被保留为「自动」生成的绑定名称。我们可以组合使用隐式的绑定名称和显式的命名参数,示例如下:

stmt = users.insert(). \
    values(name=bindparam('_name') + " .. name")

print(stmt)
conn.execute(stmt, [
    {'id': 4, '_name': 'name1'},
    {'id': 5, '_name': 'name2'},
    {'id': 6, '_name': 'name3'},
])
INSERT INTO "user" (name) VALUES ((:_name || :param_1))

<sqlalchemy.engine.result.ResultProxy at 0x10ef71b70>

我们可以使用 update() 生成 UPDATE 语句。这看起来有点像 INSERT,除了多一个可指定的 WHERE 子句:

stmt = users.update(). \
    where(users.c.name == 'jack'). \
    values(name='ed')
print(stmt)
conn.execute(stmt)
UPDATE "user" SET name=:name WHERE "user".name = :name_1

<sqlalchemy.engine.result.ResultProxy at 0x10e2f37b8>

当我们在一个 execute many 环境中,可能需要在 WHERE 中使用显式命名绑定参数。这同样可以使用 bindparam() 实现:

stmt = users.update(). \
    where(users.c.name == bindparam('oldname')). \
    values(name=bindparam('newname'))
print(stmt)
conn.execute(stmt, [
    {'oldname': 'jack', 'newname': 'ed'},
    {'oldname': 'wendy', 'newname': 'mary'},
    {'oldname': 'jim', 'newname': 'jake'},
])
UPDATE "user" SET name=:newname WHERE "user".name = :oldname

<sqlalchemy.engine.result.ResultProxy at 0x10ef3b6d8>

相关更新(Correleted Updates)

关联更新允许你使用选择自其它表(或者相同表)的结果来更新一张表:

stmt = select([addresses.c.email_address]). \
    where(addresses.c.user_id == users.c.id). \
    limit(1)
print(stmt)
conn.execute(users.update().values(fullname=stmt))
SELECT address.email_address 
FROM address, "user" 
WHERE address.user_id = "user".id
 LIMIT :param_1

<sqlalchemy.engine.result.ResultProxy at 0x10ee2acc0>

多表更新

PostgreSQL,Microsoft SQL 服务器以及 MySQL 都支持指向多个表的 UPDATE 语句。对于 PG 和 MSSQL,使用 UPDATE FROM 语法即可,它会一次更新一张表,但是可以在一个额外的 FROM 子句中引用额外的表,而且可以在 WHERE 子句中直接使用。在 MySQL 中,多个表可以放在一个 UPDATE 语句中,用逗号隔开。SQLAlchemy 的 update() 同时支持这两种模式,只需要在 WHERE 子句中指定多个表即可:

stmt = users.update().\
        values(name='ed wood').\
        where(users.c.id == addresses.c.id).\
        where(addresses.c.email_address.startswith('ed%'))

上述生成的 SQL 如下:

UPDATE users SET name=:name FROM addresses
WHERE users.id = addresses.id AND
addresses.email_address LIKE :email_address_1 || '%'

使用 MySQL 时,每张表的列可以在 SET 子句中直接赋值,只需要给 Update.values() 传递字典即可:

stmt = users.update().\
        values({
            users.c.name:'ed wood',
            addresses.c.email_address:'ed.wood@foo.com'
        }).\
        where(users.c.id == addresses.c.id).\
        where(addresses.c.email_address.startswith('ed%'))

这样在 SET 子句中就显式引用了这些表:

UPDATE users, addresses SET addresses.email_address=%s,
        users.name=%s WHERE users.id = addresses.id
        AND addresses.email_address LIKE concat(%s, '%')

当在不支持的数据库上使用这些特殊功能时,SQLAlchemy 是不会做任何特殊的事情的。当有多张表时,默认会使用 UPDATE FROM 语法,如果数据库不支持相应的语法,则会拒绝相应的语句。

参数顺序更新

update() 构造器默认在生成 SET 子句时,使用原先在 Table 对象中定义的列顺序来渲染的。这个特点很重要,因为这意味着一个特定的 UPDATE 语句和特定列每次会渲染生成相同的语句,这对依赖语句格式的查询缓存系统(无论在服务端还是客户端)有很大影响。由于传递给 Update.values() 的参数是字典键,所以就没有其它可用的固定顺序了。

然而,有些情况下,UPDATE 语句中 SET 子句的参数顺序会非常重要。当使用 MySQL,并且用于更新的列值是基于其它列值时,就需要考虑这种顺序问题。以下语句的最终结果:

UPDATE some_table SET x = y + 10, y = 20

和下面的语句是完全不同的:

UPDATE some_table SET y = 20, x = y + 10

因为在 MySQL 中,每个 SET 子句都是基于每个值执行的,而不是基于整行的,并且当每条 SET 子句执行后,内嵌在行中的值就发生变化了。

为了应付这种特殊情况,我们可以使用 preserve_parameter_order 标志。当使用这种标志时,我们可以提供一个 Python 2-元组的列表作为参赛,传递给 Update.values() 方法:

stmt = some_table.update(preserve_parameter_order=True).\
    values([(some_table.c.y, 20), (some_table.c.x, some_table.c.y + 10)])

上面的 2-元组列表实际上就是原先的字典,只是它是有序的。使用上面的形式,我们可以保证 y 列首先渲染,其次才是 x 列的 SET 子句。

删除

最后,删除是通过调用 delete() 构造器实现的:

conn.execute(addresses.delete())
<sqlalchemy.engine.result.ResultProxy at 0x10e299f60>
conn.execute(users.delete().where(users.c.name > 'm'))
<sqlalchemy.engine.result.ResultProxy at 0x10e916198>

匹配行数

update()delete() 都有关联的「匹配行数」。这个数就是指 WHERE 子句匹配到的行数。注意到「匹配」这个词,它意味着即使没有发生变更的行也会算在内。我们可以用过 rowcount 获得:

result = conn.execute(users.delete())
result.rowcount
1

更多参考

  1. 「表达式语言」参考:SQL 语句和表达式 API
  2. 数据库元数据参考:使用 MetaData 描述数据库
  3. Engine 参考:Engine 配置
  4. 连接参考:使用 Engine 和连接
  5. 类型参考:列和数据类型

版权声明

  1. 本文翻译自 SQLAlchemy Document: SQL Expression Language Tutorial,译文仅供学习参考。
  2. 本文由 Christopher L 发表,采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。请确保你已了解许可协议,并在 转载 时声明。
  3. 本文固定链接: http://blog.chriscabin.com/?p=1608

相关文章

网友评论

    本文标题:SQLAlchemy 之 SQL 表达式语言指南(下)

    本文链接:https://www.haomeiwen.com/subject/cpxmbxtx.html