美文网首页
peewee与异步操作

peewee与异步操作

作者: yuchanns | 来源:发表于2018-12-18 23:50 被阅读105次

《peewee用法考察》中,我初步掌握peewee的一些常用操作。而做这些准备,是为了在tornado中使用。
tornado是一个异步网络IO非阻塞框架,这意味着涉及到IO阻塞操作,我们都应该以异步的形式去进行。而peewee本身并不是异步的,因此我们还需要引入另外一些库才能更好的契合tornado。

>>> 阅读原文获得最佳体验-yuchanns'Atelier

我们需要什么类库

  • peewee-async
  • aiomysql

peewee-async是一个基于asyncio的封装成的库,用于进行peewee异步操作。而peewee是基于pymysql进行操作的,pymysql异步操作mysql则需要aiomysql库的支持。
注意:目前支持tornado3.0+的peewee-async属于预发行版本,需要添加-pre参数下载。

Version 0.6.0a is published as pre-release, mind the "a" in version identifier. That means in order to install it you should specify --pre flag for pip.

pip install -pre peewee-async
pip install aiomysql

与tornado结合:同步阻塞下的情况

# -*- coding: utf-8 -*-
from tornado.web import Application, RequestHandler
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from models import *
import sys


class MainHandler(RequestHandler):
    def get(self):
        uid = self.get_argument('uid', 10000)
        query = User.select(User.uname).where(User.uid == uid).get()
        self.write('uname:%s' % query.uname)


class TestHandler(RequestHandler):
    def get(self):
        self.write('test here')


def main():
    app = Application([
        (r'/', MainHandler),
        (r'/test', TestHandler),
    ])
    try:
        server = HTTPServer(app)
        server.listen(8000)
        IOLoop.current().start()
    except KeyboardInterrupt:
        print('Exit')


if __name__ == '__main__':
    sys.exit(main())

启动mysql交互界面,锁住user表的读写操作,模拟硬盘IO阻塞。

use peewee;
lock table user write;

然后先后访问localhost:8000/和localhost:8000/test,我们会发现两个访问都阻塞在请求状态,直到我们进行

unlock table;

解锁写锁之后才能获得数据。

与tornado结合:异步硬盘IO非阻塞下的情况

为了使用peewee-async,首先我们需要对models.py文件一点修改:

# -*- coding: utf-8 -*-

from peewee import *
"""引入peewee_async相关类库"""
from peewee_async import MySQLDatabase as AsyncMySQLDatabase, Manager

# database = MySQLDatabase('peewee', **{'charset': 'utf8', 'use_unicode': True, 'host': 'localhost', 'port': 3306, 'user': 'root', 'password': ''})
"""将数据库连接改成peewee_async.MySQLDatabase"""
database = AsyncMySQLDatabase('peewee', **{'charset': 'utf8', 'use_unicode': True, 'host': 'localhost', 'port': 3306, 'user': 'root', 'password': '87260333as'})


class UnknownField(object):
    def __init__(self, *_, **__): pass


class BaseModel(Model):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        """将事务改成atomic_async"""
        self.trans = database.atomic_async
        """添加一个Manager类"""
        self.object = Manager(database)

    class Meta:
        database = database

# 下略……

peewee-async官方文档写得很简略,使用方法也不难,只是在peewee原有的基础上多加了一点嵌套操作。
比如,查询操作:(其中async/await是python3.5+异步操作的新特性,若不明白请自行查阅官方文档)

class MainHandler(RequestHandler):
    """使用async关键字标识异步操作"""
    async def get(self):
        user = User()
        uname = self.get_argument('uname', 'j')
        uid = self.get_argument('uid', 10002)
        try:
            """使用await关键字等待阻塞操作"""
            """单条查询使用object.get()方法将原本的peewee操作包裹起来"""
            row = await user.object.get(
                User.select(User.uname).where(User.uid == uid)
            )
            """多条查询"""
            query = await user.object.execute(
                User.select(User.uname).where(User.uname.startswith(uname))
            )
        except User.DoesNotExist:
            """当查不到数据的时候会抛出DoesNotExist错误"""
            self.write('查无数据')
        else:
            self.write('row uname: %s and query unames:%s' % (row.uname, ', '.join([row.uname for row in query])))

启动进程,当我们再次锁住user表写锁,然后先后访问localhost:8000/和localhost:8000/test,就会发现,虽然第一个请求还在阻塞状态,第二个请求却已经成功看到屏幕上打印出来的"test here"文字。而当我们解锁user表之后,第一个请求也能顺利完成。

其余常规操作

插入/更新数据

async def post():
    """md5加密密码"""
    m = hashlib.md5()
    m.update('123456'.encode())
    pwd = m.hexdigest()
    """头像"""
    avartar = 'https://avatars2.githubusercontent.com/u/25029451'
    user = User()
    data_source = [
        (avartar, 'Doreis', pwd),
        (avartar, 'Dorein', pwd),
        (avartar, 'Doreiv', pwd),
    ]
    field = (User.avartar, User.uname, User.password)
    try:
        """插入多条数据"""
        await user.object.execute(
            User.insert_many(data_source, field)
        )
        result = await user.object.get(User.select(User.uid, User.integral).where(User.uname == 'Doreis'))
        """使用async关键字开启事务"""
        async with user.trans():
            """更新数据"""
            await user.object.execute(User.update(
                integral=result.integral+100
            ).where(User.uid == result.uid))
            """插入单条数据"""
            await user.object.execute(Integral.insert(
                uid=result.uid,
                change=100,
                total=result.integral + 100
            ))
            self.write('insert complete')
    except Exception as e:
        print(repr(e))
        self.write('insert failed')

jion查询

async def get(self):
   user = User()
   try:
       """join查询"""
       query_join = await user.object.execute(
           Balance.select(
               User.uname, Balance.change,
               Balance.total, Balance.addtime
           ).join(
               User, JOIN.RIGHT_OUTER,
               on=(Balance.uid == User.uid)
           ).where(
               Balance.uid == 10012
           ).order_by(-Balance.addtime)
       )
       for row_join in query_join:
           print('%s:%.2f:%.2f:%s' % (
               row_join.user.uname, row_join.change,
               row_join.total, row_join.addtime
           ))
   except User.DoesNotExist:
       """当查不到数据的时候会抛出DoesNotExist错误"""
       self.write('查无数据')
   else:
       self.write('query unames:%s' % (row.uname, ', '.join([row.uname for row in query])))

union查询

关于union操作,peewee_async原本并不支持。我开了个issue询问了作者,作者表示做出一些修改就可以支持。

Not sure if select() coroutine could handle that but you can try it peewee_async.select(union_query))
If it works, then we can just add handling ModelCompoundSelectQuery to execute() via select()

所以,首先,我们对peewee_async库的源文件peewee_async.py中的
execute()做出一点修改:

async def execute(query):
    """Execute *SELECT*, *INSERT*, *UPDATE* or *DELETE* query asyncronously.

    :param query: peewee query instance created with ``Model.select()``,
                  ``Model.update()`` etc.
    :return: result depends on query type, it's the same as for sync
        ``query.execute()``
    """
    """在这里,添加ModelCompoundSelectQuery类"""
    if isinstance(query, (peewee.Select, peewee.ModelCompoundSelectQuery)):
        coroutine = select
    elif isinstance(query, peewee.Update):
        coroutine = update
    elif isinstance(query, peewee.Insert):
        coroutine = insert
    elif isinstance(query, peewee.Delete):
        coroutine = delete
    else:
        coroutine = raw_query

    return (await coroutine(query))

这样在使用execute()方法时,peewee_async就能识别union。

"""union查询"""
b_query = (Balance.select(
    Balance.b.alias('id'), Balance.change,
    Balance.total, Balance.addtime
))
i_query = (Integral.select(
    Integral.i.alias('id'), Integral.change,
    Integral.total, Integral.addtime
))
query_union = await user.object.execute(
    (b_query + i_query).order_by(SQL('addtime desc')).limit(2).offset(1)
)

聚合查询

"""聚合函数"""
fn_max = await user.object.scalar(User.select(fn.MAX(User.balance)))

删除操作

await user.object.execute(User.delete().where(User.uid == 10012))

简单的操作先记录到这里,其余操作未完待续~

相关文章

  • peewee与异步操作

    在《peewee用法考察》中,我初步掌握peewee的一些常用操作。而做这些准备,是为了在tornado中使用。t...

  • python 操作 mysql

    python 操作 mysql python ORM peewee 安装:pip install peewee p...

  • peewee 数据库操作

    peewee 数据库操作

  • Peewee中文文档

    Peewee中文文档【一】:安装与测试 Peewee中文文档【二】:快速开始 Peewee中文文档【三】:应用实例...

  • SQLAlchemy选型实战

    peewee 与sqlalchemy选型对比 peewee 在github5000星sqlalchemy在gith...

  • Peewee 使用

    Peewee系列:Peewee 使用Peewee使用之事务Peewee批量插入数据Peewee 使用(二)——增删...

  • Peewee使用之事务

    Peewee系列:Peewee 使用Peewee使用之事务Peewee批量插入数据Peewee 使用(二)——增删...

  • Peewee批量插入数据

    Peewee系列:Peewee 使用Peewee使用之事务Peewee批量插入数据Peewee 使用(二)——增删...

  • IOS多线程总结

    目录 简述 NSThread GCD操作与队列异步操作并行队列同步操作并行队列同步操作串行队列异步操作串行队列队列...

  • 异步编程方法

    前言 同步与异步最直观的理解:等到操作执行完成才返回执行结果的是同步操作;反之,则是异步操作 传统的异步编程方法:...

网友评论

      本文标题:peewee与异步操作

      本文链接:https://www.haomeiwen.com/subject/qkuckqtx.html