美文网首页
一篇入门SQLAlchemy

一篇入门SQLAlchemy

作者: 苏尚君 | 来源:发表于2019-05-06 20:18 被阅读0次

    changelog

    • [2019-05-06 16:49] 创建文档

    背景

    近期团队内部拟培训 SQLAlchemy,但我觉得组织培训太费资源了,只是入门教程的话,还是用文档+代码说话吧。所以有了这篇文章,相信工程师们一点就通,不懂的请留言/互相探讨切磋/自行根据文档尝试/google尝试等。

    历史

    有关关系型数据库和编程,我们知道几点:

    1. SQL 是一种编程语言,专用于数据库查询;Python 也是一种编程语言。
    2. 不同的编程语言需要由不同的后端来执行。SQL 需要有相应的数据库后端来执行,Python 则需要相应的解释器来执行。
    3. Python 解释器可以直接调用 SQL 执行后端提供的 API,但稍微了解数据库的同学就知道,涉及到的细节操作还挺多的(例如编码、网络等)。所有了若干 Python 包,用户可以直接用这些包提供的高级一点的 API 来和数据库打交道(而不用处理许多细节)。
    4. 准确地说,SQL 更像一种规范,即「官话」、「普通话」,有许多方言比如「北京话」、「东北话」等——MySQL,PostgreSQL,Oracle 等等,它们的共性也有各自的特性,如语法、如数据类型。
    5. 不同的方言需要不同的 Python 包来「翻译」和执行,比如 PostgreSQL 常用的 Python 后端是 psycopg2

    由上可知,我们在服务某些客户时将遇到这种情况:(不同的)客户的数据存储在不同的数据库中,如 A 客户有 MySQL,B 客户有 Oracle,我们给他们提供的服务相同,只是数据库不同;但我们希望只写一套 Python 代码,通过少数配置项的修改(而非逐一检查所有 SQL 并修改!),就能使这套代码同时运行在两个数据库上。这时候,我们的 ORM 就出场了。

    本文仅简单介绍 SQLAlchemy(以下简称 SA)

    基本用法

    截止文档撰写时,1.3 版本的 SA 是最新的稳定版,因此本文档主要基于 1.3 版撰写而成,主要参考材料是官方文档 https://docs.sqlalchemy.org/en/13/和项目实践

    套路:4步法

    对于初学者而言,最核心的文档就是这篇 https://docs.sqlalchemy.org/en/13/orm/tutorial.html,根据需要查询 API 即可。SA 使用套路主要就是以下 4 步:

    1. 照着关系型数据库的表结构,定义一个映射(mapping)关系 M
    2. 创建一个引擎(engine)和一个会话(session)
    3. 将会话(session)绑定到引擎(engine)上
    4. 使用会话(session)和映射(M)进行查询

    定义映射

    有 2 种定义方式:https://docs.sqlalchemy.org/en/13/orm/mapping_styles.html

    • 声明法(declarative)
    • 经典法(classical)

    除非有特殊需要,否则一般使用声明法即可,就像写一个类一样简单。

    以下是一个示例:

    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, String, Integer
    from sqlalchemy.dialects.postgresql import JSONB
    
    Base = declarative_base()
    class PaginationCache(Base):
        __tablename__ = 'pagination_cache'    # 对应数据库中的表名
        token = Column('token', String(50), primary_key=True)
        page = Column('page', Integer, primary_key=True)
        content = Column('content', JSONB)
    
    Base.metadata.create_all(sa_engine['db_user'])    # 建表
    

    关于定义映射所需要的数据类型,请见下一小节。

    数据类型

    一般只要了解基本数据类型即可:https://docs.sqlalchemy.org/en/13/core/type_basics.html

    from sqlalchemy.types import Date, String, Float, Numeric, Text
    

    一些特殊的数据类型需要从方言中导入:https://docs.sqlalchemy.org/en/13/core/type_basics.html#vendor-specific-types

    from sqlalchemy.dialects.postgresql import ARRAY
    

    更复杂的内容请自行查阅文档(一般用不上):https://docs.sqlalchemy.org/en/13/core/types.html

    创建引擎与会话

    基本的用法教程中都有,具体参数请自行查阅文档。给出一份示例代码并简要解释若干点:

    对于 engine:

    1. 第 1 个位置参数是数据库协议,具体请见 https://docs.sqlalchemy.org/en/13/core/engines.html
    2. creator 参数接受一个函数,该函数返回一个数据库连接。强调:creator 接受的是「函数」,而不是 「函数返回值」
    3. pool_size 限制了同时允许存在的连接数,设为 0 表示不限制连接数
    4. (可选)echo 为 True/"debug" 时,用于打印日志。详情看文档 https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine
    5. (可选)json_serializer 不是必要的:当连接 Postgres 且需要使用 JSON/JSONB 类型时,可以将一个自定义的函数传递给该参数,从而在数据传递进数据库前通过该函数进行 dump 操作 https://docs.sqlalchemy.org/en/13/dialects/postgresql.html#sqlalchemy.dialects.postgresql.JSON
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    import psycopg2  # 如果要使用下面的 pg 连接辅助函数 connect
    import json
    import datetime
    import decimal
    import mysql
    
    
    def make_sqlalchemy_handlers(db_choice):
        creator_table = {
            "postgresql://": lambda x=db_choice: pg_connect(x),
            "mysql+mysqlconnector://": lambda x=db_choice: mysql_connect(x)
        }
        kwargs = {
            "pool_size": 0,
            "json_serializer": json_stringify,
            "creator": creator_table[protocol],
            "encoding": "utf8",
            "pool_recycle": 600 # https://docs.sqlalchemy.org/en/13/core/pooling.html#setting-pool-recycle
    #        "echo": True
        }
        engine = create_engine(
                protocol,
                **kwargs)
        Session = sessionmaker(autoflush=False)
        Session.configure(bind=engine)
        sa_session = Session()
        return engine, sa_session
        
    
    # 用 psycopg2 连接中文数据库的辅助函数
    def pg_connect(db_choice):
        dbname = config.get(db_choice, 'database')
        host = config.get(db_choice, 'host')
        port = config.get(db_choice, 'port')
        user = config.get(db_choice, 'user')
        password = config.get(db_choice, 'password')
        conn = psycopg2.connect(
            "dbname='%(dbname)s' user='%(user)s' password='%(password)s' host='%(host)s' port='%(port)s'"
            % {'dbname': dbname, 'user': user, 'password': password, 'host': host, 'port': port}
        )
        return conn
    
    
    def mysql_connect(db_choice):
        dbname = config.get(db_choice, 'database')
        host = config.get(db_choice, 'host')
        port = config.get(db_choice, 'port')
        user = config.get(db_choice, 'user')
        password = config.get(db_choice, 'password')
        conn = mysql.connector.connect(
                user=user, password=password,
                host=host, port=port,
                database=dbname, buffered=True)
        return conn
    
    
    def json_default(o):
        if isinstance(o, datetime.datetime):
            return o.strftime('%Y-%m-%d %H:%M:%S')
        elif isinstance(o, datetime.date):
            return o.isoformat()
        elif isinstance(o, decimal.Decimal):
            return float(o)
    
    
    def json_stringify(some_dict):
        return json.dumps(some_dict,
                          default=json_default,
                          ensure_ascii=False,
                          indent=4
                          )
    

    对于 session: https://docs.sqlalchemy.org/en/13/orm/session_api.html#sqlalchemy.orm.session.Session.init

    1. autoflush 保证每次查询时能够自动把新加入的对象刷入数据库
    2. bind 用来绑定会话(session)与引擎(engine)

    查询

    以上面的 session 和类为例,如下的方法创建了 2 个查询对象

    # 同时查多个列
    orm_query_obj_0 = sa_session['db_user'].query(
                    PaginationCache.content, PaginationCache.page).filter(
                    PaginationCache.token == token)
    
    # 同时有多个筛选条件
    orm_query_obj_1 = sa_session['db_user'].query(PaginationCache.content).filter(
                    PaginationCache.token == token, PaginationCache.page > 2)
    

    先对上述代码做点解释:

    • sa_session['db_user'] 是从上面的 make_sqlalchemy_handlers 返回的 session
    • 在用声明法定义映射关系的前提下(即写了一个类来表达关系型数据库的表结构):

    构建好查询对象 q 后:

    • 直接 print(q) 会打印出这个查询对象 q 对应的 SQL;但具体的常量值会以占位符的形式出现而不会打印填充了值以后的SQL。这是对应于上面的 join 查询的查询对象的打印结果: SELECT A.x AS A_x, B.y AS B_y FROM A, B WHERE A.x = B.y AND A.x = %(A_x)s
    • 常规筛选:(更多 API 请查阅文档 https://docs.sqlalchemy.org/en/13/orm/query.html
      • q.filter(): 相当于 SQL WHERE
      • q.offset(): 偏移 SQL OFFSET
      • q.limit(): 限制数量 SQL LIMIT
      • q.order_by(): 排序 SQL ORDER BY
    • 获取结果:(更多 API 请查阅文档 https://docs.sqlalchemy.org/en/13/orm/query.html
      • q.all(): 所有结果列表(哪怕是空列表)
      • q.one(): 拿到 1 条结果,若拿不到(查不到)则抛出异常
      • q.first(): 拿到 1 条结果,若拿不到(查不到)则返回 None

    写入

    基本套路就是:

    • 把值传递给定义好的映射关系,创建若干对象
    • 把对象传给 ORM 会话,由会话提交操作
    • 提交操作失败时,可以使用「回滚」(rollback)操作

    有 2 种方法:

    • 批量添加,一次提交
    • 逐个添加,逐次提交

    示例代码如下:

    # 批量添加对象,一次性提交
    for i in range(pages):
        new_page = PaginationCache(
                        token=token,
                        page=i,
                        content=[{"a": i, "b": i*2}, {"a": i+1, "b": (i+1)*2}])
        new_pages.append(new_page)
        
    sa_session['db_user'].add_all(new_pages)
    sa_session['db_user'].commit()
    
    
    
    # 逐个添加对象,逐次提交;附上事务回滚示例
    for i in range(pages):
        new_page = PaginationCache(
                        token=token,
                        page=i,
                        content=[{"a": i, "b": i*2}, {"a": i+1, "b": (i+1)*2}])
        sa_session['db_user'].add(new_page)
        try:
            sa_session['db_user'].commit()
        except:
            print('Error when commit pagination cache.')
            print(traceback.format_exc())        # 打印错误栈
            sa_session['db_user'].rollback()     # 消除这次提交,回滚状态
    

    稍微进阶

    每当交互时才创建连接会话/连接上下文

    在上面最初的版本中,会话在一开始就创建了,然后多次使用:

    # in db.py
    def make_sqlalchemy_handlers(db_choice):
        # ...blabla...
        Session = sessionmaker(autoflush=False)
        Session.configure(bind=engine)
        sa_session = Session()
        return engine, sa_session
        
    engine, session = make_sqlalchemy_handlers('postgresql://')
    
    # in a.py
    from db import session
    orm_query_obj_0 = session.query(*args_0).all()
    
    # in b.py
    from db import session
    orm_query_obj_1 = session.query(*args_1).all()
    

    上述这种做法在特别简单的系统可能不会出大问题;但只要系统运行时间变长,系统复杂程度提高,那么这么做就有风险了。至少有 1 点风险:

    • 执行动作 A,通过会话 S 与数据库进行交互,过程中出错;出错后未回滚或更新会话 S,于是在动作 A 之后的动作都将延续这种脏状态,便一直无法正常访问数据库——除非重启整个应用。

    鉴于此,可以考虑做一个简单的上下文管理器:

    # db.py
    
    import contextlib
    from sqlalchemy.orm import sessionmaker
    
    # 已经定义了 make_sqlalchemy_handlers,同上 ...
    
    sa_engine = dict()
    sa_session_maker = dict()
    for db_section in ['db_a', 'db_b', 'db_c']:
        engine_t, session_t = make_sqlalchemy_handlers(db_section)
        sa_engine[db_section] = engine_t
        sa_session_maker[db_section] = session_t
    
    
    @contextlib.contextmanager
    def db_context(choice):
        """
            Context manager for a transaction
            ref:
    
            https://docs.sqlalchemy.org/en/13/orm/session_basics.html
            https://docs.python.org/3.5/library/contextlib.html#contextlib.contextmanager
        """
        session = sa_session_maker[choice]()
        try:
            yield session
            session.commit()
        except Exception as e:
            session.rollback()
            raise e
        finally:
            session.close()      
    

    这样以后每次需要和数据库打交道时,创建连接上下文,不需要的时候退出上下文、连接自然释放了:

    from db import db_context
    
    with db_context('db_a') as sa_session:
        query = sa_session.query(*view_columns).filter(*filters)
        result = query.all()
    

    需要注意的是,建立会话也就是建立连接(大致相同,严格来说本句话可能还要斟酌。。。),较消耗资源(众所周知,在耗时上,CPU<内存<IO,建立连接就是在IO层面的操作),所以在这种为了安全性而设计的写法之下,需要仔细考虑建立会话的时机。

    创建约束

    参考 Table ConfigurationUNIQUE Constraint ,延续声明式的建模方式,要在建表时对某几个字段组合的唯一性进行限制时,可在名为 __table_args__ 的类变量中加入限制,例如:

    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, String, Date, Integer, UniqueConstraint
    
    Base = declarative_base()
    
    class MyTable(Base):
        __tablename__ = 'mytable'
        
        col1 = Column('col1', String(20))
        col2 = Column('col2', Date)
        col3 = Column('col3', Integer)
        # 其他字段定义……
        
        __table_args = (
          UniqueConstraint('col1', 'col2', 'col3', name='uix_state'),
        )
    

    这里有个额外建议注意的点:注上述表定义中,字段名字面量被重复使用。因此这里建议在表格定义前将这些字面量存为变量(常量):

    # blabla
    
    _COLNAME_MyTable_col1 = 'col1'
    _COLNAME_MyTable_col2 = 'col2'
    _COLNAME_MyTable_col3= 'col3'
    
    # blabla
    class MyTable(Base):
        __tablename__ = 'mytable'
        
        col1 = Column(_COLNAME_MyTable_col1, String(20))
        col2 = Column(_COLNAME_MyTable_col2, Date)
        col3 = Column(_COLNAME_MyTable_col3, Integer)
        # 其他字段定义……
        
        __table_args = (
          UniqueConstraint(
            _COLNAME_MyTable_col1, 
            _COLNAME_MyTable_col2,
            _COLNAME_MyTable_col3,
            name='uix_state'
          ),
        )
    

    这么做是为了避免哪一天修改了某个字段名字面量,该字段正好被 UniqueConstraint 使用,但修改者又忘了去修改 UniqueConstraint 中的对应字面量,也就是下面这种超出用户预期的示例情况:修改了 col1 的命名为 col110,但在 UniqueConstraint 中,该字段仍然命名为 col1

    # blabla
    
    class MyTable(Base):
        __tablename__ = 'mytable'
        
        col1 = Column('col110', String(20))
        col2 = Column('col2', Date)
        col3 = Column('col3', Integer)
        # 其他字段定义……
        
        __table_args = (
          UniqueConstraint('col1', 'col2', 'col3', name='uix_state'),
        )
    

    相关文章

      网友评论

          本文标题:一篇入门SQLAlchemy

          本文链接:https://www.haomeiwen.com/subject/jdfjoqtx.html