changelog
- [2019-05-06 16:49] 创建文档
背景
近期团队内部拟培训 SQLAlchemy,但我觉得组织培训太费资源了,只是入门教程的话,还是用文档+代码说话吧。所以有了这篇文章,相信工程师们一点就通,不懂的请留言/互相探讨切磋/自行根据文档尝试/google尝试等。
历史
有关关系型数据库和编程,我们知道几点:
- SQL 是一种编程语言,专用于数据库查询;Python 也是一种编程语言。
- 不同的编程语言需要由不同的后端来执行。SQL 需要有相应的数据库后端来执行,Python 则需要相应的解释器来执行。
- Python 解释器可以直接调用 SQL 执行后端提供的 API,但稍微了解数据库的同学就知道,涉及到的细节操作还挺多的(例如编码、网络等)。所有了若干 Python 包,用户可以直接用这些包提供的高级一点的 API 来和数据库打交道(而不用处理许多细节)。
- 准确地说,SQL 更像一种规范,即「官话」、「普通话」,有许多方言比如「北京话」、「东北话」等——MySQL,PostgreSQL,Oracle 等等,它们的共性也有各自的特性,如语法、如数据类型。
- 不同的方言需要不同的 Python 包来「翻译」和执行,比如 PostgreSQL 常用的 Python 后端是 psycopg2
由上可知,我们在服务某些客户时将遇到这种情况:(不同的)客户的数据存储在不同的数据库中,如 A 客户有 MySQL,B 客户有 Oracle,我们给他们提供的服务相同,只是数据库不同;但我们希望只写一套 Python 代码,通过少数配置项的修改(而非逐一检查所有 SQL 并修改!),就能使这套代码同时运行在两个数据库上。这时候,我们的 ORM 就出场了。
本文仅简单介绍 SQLAlchemy(以下简称 SA)
基本用法
截止文档撰写时,1.3 版本的 SA 是最新的稳定版,因此本文档主要基于 1.3 版撰写而成,主要参考材料是官方文档 https://docs.sqlalchemy.org/en/13/和项目实践
套路:4步法
对于初学者而言,最核心的文档就是这篇 https://docs.sqlalchemy.org/en/13/orm/tutorial.html,根据需要查询 API 即可。SA 使用套路主要就是以下 4 步:
- 照着关系型数据库的表结构,定义一个映射(mapping)关系 M
- 创建一个引擎(engine)和一个会话(session)
- 将会话(session)绑定到引擎(engine)上
- 使用会话(session)和映射(M)进行查询
定义映射
有 2 种定义方式:https://docs.sqlalchemy.org/en/13/orm/mapping_styles.html
- 声明法(declarative)
- 经典法(classical)
除非有特殊需要,否则一般使用声明法即可,就像写一个类一样简单。
以下是一个示例:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, Integer
from sqlalchemy.dialects.postgresql import JSONB
Base = declarative_base()
class PaginationCache(Base):
__tablename__ = 'pagination_cache' # 对应数据库中的表名
token = Column('token', String(50), primary_key=True)
page = Column('page', Integer, primary_key=True)
content = Column('content', JSONB)
Base.metadata.create_all(sa_engine['db_user']) # 建表
关于定义映射所需要的数据类型,请见下一小节。
数据类型
一般只要了解基本数据类型即可:https://docs.sqlalchemy.org/en/13/core/type_basics.html
from sqlalchemy.types import Date, String, Float, Numeric, Text
一些特殊的数据类型需要从方言中导入:https://docs.sqlalchemy.org/en/13/core/type_basics.html#vendor-specific-types
from sqlalchemy.dialects.postgresql import ARRAY
更复杂的内容请自行查阅文档(一般用不上):https://docs.sqlalchemy.org/en/13/core/types.html
创建引擎与会话
基本的用法教程中都有,具体参数请自行查阅文档。给出一份示例代码并简要解释若干点:
对于 engine:
- 第 1 个位置参数是数据库协议,具体请见 https://docs.sqlalchemy.org/en/13/core/engines.html
- creator 参数接受一个函数,该函数返回一个数据库连接。强调:creator 接受的是「函数」,而不是 「函数返回值」!
- pool_size 限制了同时允许存在的连接数,设为 0 表示不限制连接数
- (可选)echo 为 True/"debug" 时,用于打印日志。详情看文档 https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine
- (可选)json_serializer 不是必要的:当连接 Postgres 且需要使用 JSON/JSONB 类型时,可以将一个自定义的函数传递给该参数,从而在数据传递进数据库前通过该函数进行 dump 操作 https://docs.sqlalchemy.org/en/13/dialects/postgresql.html#sqlalchemy.dialects.postgresql.JSON
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import psycopg2 # 如果要使用下面的 pg 连接辅助函数 connect
import json
import datetime
import decimal
import mysql
def make_sqlalchemy_handlers(db_choice):
creator_table = {
"postgresql://": lambda x=db_choice: pg_connect(x),
"mysql+mysqlconnector://": lambda x=db_choice: mysql_connect(x)
}
kwargs = {
"pool_size": 0,
"json_serializer": json_stringify,
"creator": creator_table[protocol],
"encoding": "utf8",
"pool_recycle": 600 # https://docs.sqlalchemy.org/en/13/core/pooling.html#setting-pool-recycle
# "echo": True
}
engine = create_engine(
protocol,
**kwargs)
Session = sessionmaker(autoflush=False)
Session.configure(bind=engine)
sa_session = Session()
return engine, sa_session
# 用 psycopg2 连接中文数据库的辅助函数
def pg_connect(db_choice):
dbname = config.get(db_choice, 'database')
host = config.get(db_choice, 'host')
port = config.get(db_choice, 'port')
user = config.get(db_choice, 'user')
password = config.get(db_choice, 'password')
conn = psycopg2.connect(
"dbname='%(dbname)s' user='%(user)s' password='%(password)s' host='%(host)s' port='%(port)s'"
% {'dbname': dbname, 'user': user, 'password': password, 'host': host, 'port': port}
)
return conn
def mysql_connect(db_choice):
dbname = config.get(db_choice, 'database')
host = config.get(db_choice, 'host')
port = config.get(db_choice, 'port')
user = config.get(db_choice, 'user')
password = config.get(db_choice, 'password')
conn = mysql.connector.connect(
user=user, password=password,
host=host, port=port,
database=dbname, buffered=True)
return conn
def json_default(o):
if isinstance(o, datetime.datetime):
return o.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(o, datetime.date):
return o.isoformat()
elif isinstance(o, decimal.Decimal):
return float(o)
def json_stringify(some_dict):
return json.dumps(some_dict,
default=json_default,
ensure_ascii=False,
indent=4
)
对于 session: https://docs.sqlalchemy.org/en/13/orm/session_api.html#sqlalchemy.orm.session.Session.init
- autoflush 保证每次查询时能够自动把新加入的对象刷入数据库
- bind 用来绑定会话(session)与引擎(engine)
查询
以上面的 session 和类为例,如下的方法创建了 2 个查询对象:
# 同时查多个列
orm_query_obj_0 = sa_session['db_user'].query(
PaginationCache.content, PaginationCache.page).filter(
PaginationCache.token == token)
# 同时有多个筛选条件
orm_query_obj_1 = sa_session['db_user'].query(PaginationCache.content).filter(
PaginationCache.token == token, PaginationCache.page > 2)
先对上述代码做点解释:
-
sa_session['db_user']
是从上面的make_sqlalchemy_handlers
返回的 session - 在用声明法定义映射关系的前提下(即写了一个类来表达关系型数据库的表结构):
- 要查什么内容,只要把待查的若干
映射.列名i0, 映射.列名i1, ...
作为参数传给query(*args)
方法即可- 要进行多表联合(join)查询时,如果不是特别复杂,不一定需要定义 relationship,可以直接用隐式 join 查询:
session.query(A.x, B.y).filter(A.x == B.x, A.x == 'constant')
https://docs.sqlalchemy.org/en/13/orm/tutorial.html#querying-with-joins
- 要进行多表联合(join)查询时,如果不是特别复杂,不一定需要定义 relationship,可以直接用隐式 join 查询:
- 要筛选什么内容,只要把上一小点中的那些参数传给
filter(*args)
方法即可- 筛选条件中的任何比较使用 Python 语言的原生运算符即可,特殊的运算符(如要把几个条件用逻辑运算符连接起来)请查阅文档如 https://docs.sqlalchemy.org/en/13/core/sqlelement.html 有时候;Python 标准库的 operator 可能会有用 https://docs.python.org/3/library/operator.html
- 要查什么内容,只要把待查的若干
构建好查询对象 q 后:
- 直接
print(q)
会打印出这个查询对象q
对应的 SQL;但具体的常量值会以占位符的形式出现而不会打印填充了值以后的SQL。这是对应于上面的 join 查询的查询对象的打印结果:SELECT A.x AS A_x, B.y AS B_y FROM A, B WHERE A.x = B.y AND A.x = %(A_x)s
- 常规筛选:(更多 API 请查阅文档 https://docs.sqlalchemy.org/en/13/orm/query.html )
-
q.filter()
: 相当于 SQL WHERE -
q.offset()
: 偏移 SQL OFFSET -
q.limit()
: 限制数量 SQL LIMIT -
q.order_by()
: 排序 SQL ORDER BY
-
- 获取结果:(更多 API 请查阅文档 https://docs.sqlalchemy.org/en/13/orm/query.html )
-
q.all()
: 所有结果列表(哪怕是空列表) -
q.one()
: 拿到 1 条结果,若拿不到(查不到)则抛出异常 -
q.first()
: 拿到 1 条结果,若拿不到(查不到)则返回 None
-
写入
基本套路就是:
- 把值传递给定义好的映射关系,创建若干对象
- 把对象传给 ORM 会话,由会话提交操作
- 提交操作失败时,可以使用「回滚」(rollback)操作
有 2 种方法:
- 批量添加,一次提交
- 逐个添加,逐次提交
示例代码如下:
# 批量添加对象,一次性提交
for i in range(pages):
new_page = PaginationCache(
token=token,
page=i,
content=[{"a": i, "b": i*2}, {"a": i+1, "b": (i+1)*2}])
new_pages.append(new_page)
sa_session['db_user'].add_all(new_pages)
sa_session['db_user'].commit()
# 逐个添加对象,逐次提交;附上事务回滚示例
for i in range(pages):
new_page = PaginationCache(
token=token,
page=i,
content=[{"a": i, "b": i*2}, {"a": i+1, "b": (i+1)*2}])
sa_session['db_user'].add(new_page)
try:
sa_session['db_user'].commit()
except:
print('Error when commit pagination cache.')
print(traceback.format_exc()) # 打印错误栈
sa_session['db_user'].rollback() # 消除这次提交,回滚状态
稍微进阶
每当交互时才创建连接会话/连接上下文
在上面最初的版本中,会话在一开始就创建了,然后多次使用:
# in db.py
def make_sqlalchemy_handlers(db_choice):
# ...blabla...
Session = sessionmaker(autoflush=False)
Session.configure(bind=engine)
sa_session = Session()
return engine, sa_session
engine, session = make_sqlalchemy_handlers('postgresql://')
# in a.py
from db import session
orm_query_obj_0 = session.query(*args_0).all()
# in b.py
from db import session
orm_query_obj_1 = session.query(*args_1).all()
上述这种做法在特别简单的系统可能不会出大问题;但只要系统运行时间变长,系统复杂程度提高,那么这么做就有风险了。至少有 1 点风险:
- 执行动作 A,通过会话 S 与数据库进行交互,过程中出错;出错后未回滚或更新会话 S,于是在动作 A 之后的动作都将延续这种脏状态,便一直无法正常访问数据库——除非重启整个应用。
鉴于此,可以考虑做一个简单的上下文管理器:
# db.py
import contextlib
from sqlalchemy.orm import sessionmaker
# 已经定义了 make_sqlalchemy_handlers,同上 ...
sa_engine = dict()
sa_session_maker = dict()
for db_section in ['db_a', 'db_b', 'db_c']:
engine_t, session_t = make_sqlalchemy_handlers(db_section)
sa_engine[db_section] = engine_t
sa_session_maker[db_section] = session_t
@contextlib.contextmanager
def db_context(choice):
"""
Context manager for a transaction
ref:
https://docs.sqlalchemy.org/en/13/orm/session_basics.html
https://docs.python.org/3.5/library/contextlib.html#contextlib.contextmanager
"""
session = sa_session_maker[choice]()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()
这样以后每次需要和数据库打交道时,创建连接上下文,不需要的时候退出上下文、连接自然释放了:
from db import db_context
with db_context('db_a') as sa_session:
query = sa_session.query(*view_columns).filter(*filters)
result = query.all()
需要注意的是,建立会话也就是建立连接(大致相同,严格来说本句话可能还要斟酌。。。),较消耗资源(众所周知,在耗时上,CPU<内存<IO,建立连接就是在IO层面的操作),所以在这种为了安全性而设计的写法之下,需要仔细考虑建立会话的时机。
创建约束
参考 Table Configuration 和 UNIQUE Constraint ,延续声明式的建模方式,要在建表时对某几个字段组合的唯一性进行限制时,可在名为 __table_args__
的类变量中加入限制,例如:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, Date, Integer, UniqueConstraint
Base = declarative_base()
class MyTable(Base):
__tablename__ = 'mytable'
col1 = Column('col1', String(20))
col2 = Column('col2', Date)
col3 = Column('col3', Integer)
# 其他字段定义……
__table_args = (
UniqueConstraint('col1', 'col2', 'col3', name='uix_state'),
)
这里有个额外建议注意的点:注上述表定义中,字段名字面量被重复使用。因此这里建议在表格定义前将这些字面量存为变量(常量):
# blabla
_COLNAME_MyTable_col1 = 'col1'
_COLNAME_MyTable_col2 = 'col2'
_COLNAME_MyTable_col3= 'col3'
# blabla
class MyTable(Base):
__tablename__ = 'mytable'
col1 = Column(_COLNAME_MyTable_col1, String(20))
col2 = Column(_COLNAME_MyTable_col2, Date)
col3 = Column(_COLNAME_MyTable_col3, Integer)
# 其他字段定义……
__table_args = (
UniqueConstraint(
_COLNAME_MyTable_col1,
_COLNAME_MyTable_col2,
_COLNAME_MyTable_col3,
name='uix_state'
),
)
这么做是为了避免哪一天修改了某个字段名字面量,该字段正好被 UniqueConstraint 使用,但修改者又忘了去修改 UniqueConstraint 中的对应字面量,也就是下面这种超出用户预期的示例情况:修改了 col1 的命名为 col110
,但在 UniqueConstraint 中,该字段仍然命名为 col1
。
# blabla
class MyTable(Base):
__tablename__ = 'mytable'
col1 = Column('col110', String(20))
col2 = Column('col2', Date)
col3 = Column('col3', Integer)
# 其他字段定义……
__table_args = (
UniqueConstraint('col1', 'col2', 'col3', name='uix_state'),
)
网友评论