美文网首页
Tornado应用笔记03-协程与异步示例

Tornado应用笔记03-协程与异步示例

作者: simplue | 来源:发表于2017-06-14 20:50 被阅读0次

索引

本节内容以日常开发中常见的异步场景为基础, 给出Tornado定义的协程和异步示例, 其中的代码稍加修改就可以用到实际项目中. 另外, 本节内容不会对其中原理做进一步说明, 原理分析将放到下一节.

常用异步应用示例

  • 非阻塞 sleep
  • 用线程池处理阻塞操作
  • 异步HTTP请求
  • IOLoop事件(定时, 回调)
  • 长连接输出(RequestHandler.flush)
  • 后台定时任务
  • 循环
非阻塞 sleep
# 下面三种方法实现的功能都是, 异步sleep 2秒, 然后输出 "i sleep 2s"

# 推荐的写法, `.gen.sleep`是`tornado.gen`对`IOLoop`操作的封装
class NonBlockSleep(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        yield tornado.gen.sleep(2)
        self.finish("i sleep 2s")


# 本质上和第一个方法几乎没差别, 相当于上面的原始版
class NonBlockSleep(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        yield tornado.gen.Task(tornado.ioloop.IOLoop.current().add_timeout, time.time() + 2)
        self.finish("i sleep 2s")


# 采用异步回调
class NonBlockSleep(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 2, callback=self.awake)

    def awake(self):
        self.finish("i sleep 2s")
用线程池处理阻塞操作

这里需要用到一个新的包futures, 通过pip install futures安装即可.

单任务, 无回调, 需要用到阻塞操作结果

两种方式实现非阻塞计算, 完成计算后输出结果(不需要操作结果时, 把yield@coroutine去掉即可)

# 使用 submit, 较原始的方式, 未经过Tornado封装 
class CoroutineWithThreadPool(tornado.web.RequestHandler):
    @property
    def executor(self):
        # 下面两种实际上是一样的
        # return concurrent.futures.ThreadPoolExecutor(2)
        return tornado.concurrent.futures.ThreadPoolExecutor(2)

    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        s = time.time()
        result = yield self.executor.submit(self._calculate, *(1,))
        used_time = time.time() - s
        self.finish('calculate completed, used %.3f s, result is %s' % (used_time, result))

    def _calculate(self, num=0):
        for i in xrange(100000000):
            num += 1
        return num


# 使用 run_on_executor , 更推荐这种做法
class CoroutineWithThreadPool(tornado.web.RequestHandler):
    executor = concurrent.futures.ThreadPoolExecutor(2)

    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        s = time.time()
        result = yield self._calculate(1)
        used_time = time.time() - s
        self.finish('calculate completed, used %.3f s, result is %s' % (used_time, result))

    @tornado.concurrent.run_on_executor
    def _calculate(self, num=0):
        for i in xrange(100000000):
            num += 1
        return num
单任务, 带回调, 需要用到阻塞操作结果, 蹩脚原始实现
class CoroutineWithThreadPool(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self, *args, **kwargs):
        future = self.executor.submit(self._calculate, *(1,))
        tornado.ioloop.IOLoop.current().add_future(future, self.result_callback)

    # 阻塞操作的回调
    def block_callback(self):
        print 'after block func callback'

    # 获取阻塞操作的结果
    def result_callback(self, future):
        tornado.ioloop.IOLoop.current().add_callback(self.block_callback)
        self.finish('the calculate result is |%s|' % future.result())

    def _calculate(self, num=0):
        for i in xrange(100000000):
            num += 1
        return num

多任务, 带回调, 需要用到阻塞操作结果
class CoroutineWithThreadPool(tornado.web.RequestHandler):
    @property
    def executor(self):
        return concurrent.futures.ThreadPoolExecutor(2)

    @property
    def io_loop(self):
        '''
        使用run_on_executor并为future添加callback的时候, 需要设置`self.io_loop`属性
        实际上`run_on_executor`也提供了给`io_loop`和`executor`改名的功能, 使用方法:
            @property
            def my_io_loop(self):
                return tornado.ioloop.IOLoop.current()

            @property
            def my_executor(self):
                return self.application.executor

            @tornado.concurrent.run_on_executor(io_loop='my_io_loop', executor='my_executor')
            def block_func(*args, **kwargs):
                pass

        callback直接在调用需要执行的函数时, 当做普通参数传入即可,
        `run_on_executor`这个装饰器使用后会`pop`掉, 无须担心报错
        '''
        return tornado.ioloop.IOLoop.current()

    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        s = time.time()

        calculate_result, sleep_result = yield [
            self._calculate(2, callback=self.executor_callback),
            self._sleep(3),
        ]
        '''
        使用字典实现
        multi_task_result = yield {
            'calculate': self._calculate(1),
            'sleep': self._sleep(3),
        }

        calculate_result, sleep_result = multi_task_result['calculate'], multi_task_result['sleep']
        '''
        print sleep_result
        used_time = time.time() - s
        self.finish('calculate and sleep completed used %.3f s, %s, the calculate result is %s' %
                    (used_time, sleep_result, calculate_result))

    def executor_callback(self, future_result):
        print 'future is done, and the result is |%s|.' % future_result

    @tornado.concurrent.run_on_executor
    def _calculate(self, num=0):
        for i in xrange(100000000):
            num += 1
        return num

    @tornado.concurrent.run_on_executor
    def _sleep(self, seconds=0):
        time.sleep(seconds)
        return 'sleep used %s seconds' % seconds

异步HTTP请求
# 异步回调
class AsyncFetch(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        http_client = tornado.httpclient.AsyncHTTPClient()
        http_client.fetch("http://www.baidu.com", callback=self.on_response)

    def on_response(self, response):
        r = response
        # body, 状态码, 请求耗时, headers
        print r.body, r.code, r.request_time
        print {k: v for k, v in r.headers.items()}
        self.finish('fetch completed')


# 协程
class AsyncFetch(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        http_client = tornado.httpclient.AsyncHTTPClient()
        response = yield http_client.fetch("http://example.com")
        self.on_response(response)
        self.finish('fetch completed')

    def on_response(self, response):
        print response


# 原始实现
class AsyncFetch(tornado.web.RequestHandler):
    def get(self, *args, **kwargs):
        self._auto_finish = False

        tornado.httpclient.AsyncHTTPClient.configure(
            None,
            defaults=dict(
                user_agent="MyUserAgent"
            ),
            max_clients=20,
        )
        client = tornado.httpclient.AsyncHTTPClient()

        fetch_future = client.fetch('http://www.baidu.com', request_timeout=2)
        # 下面两种方法均可以实现future done回调, 不过tornado更推荐`add_future`的做法
        tornado.ioloop.IOLoop.current().add_future(fetch_future, callback=self.on_response)
        # fetch_future.add_done_callback(self.on_response)

    def on_response(self, future):
        http_response = future.result()
        print http_response
        result = dict(http_response.headers)
        result.update({'content': http_response.body})
        # raise ValueError  # 异常情况下,
        self.finish(result)

IOLoop事件(定时, 回调)
class IOLoopCallback(tornado.web.RequestHandler):
    def get(self, *args, **kwargs):
        print time.time()

        io_loop = tornado.ioloop.IOLoop.current()

        # 定时任务, 将任务丢给IOLoop, 3秒后执行
        io_loop.add_timeout(io_loop.time() + 3, callback=functools.partial(self.callback_timeout))

        # 回调任务, 将任务丢给IOLoop, 由下一个Loop调用
        io_loop.add_callback(self.callback_next_loop, None)

        # sleep 会阻塞 IOLoop, 所以上面的 `IOLoop.add_timeout` 是相对的, 
        # 如果一直阻塞, 就不可能及时响应
        # time.sleep(4) # 阻塞实验

    def callback_timeout(self):
        print 'callback_timeout at the time %s' % time.time()

    def callback_next_loop(self, useless=None):
        print 'callback_next_loop at the time %s' % time.time()

长连接输出(RequestHandler.flush)
class Flush(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        self.write('<h1>sleeping...</h1>')
        self.flush()
        yield tornado.gen.sleep(2)
        self.finish('<h1>awake</h1>')
后台定时任务

方式1:

@tornado.gen.coroutine
def do_something(func_name):
    print 'from %s n do_something at %s' % (func_name, int(time.time()))


@tornado.gen.coroutine
def minute_loop1():
    """实际上循环周期是(60 + n)秒, n为`do_something`执行时间, 非严格60s"""
    while True:
        yield do_something(minute_loop1.__name__)
        yield tornado.gen.sleep(1)  # 开始计时, 并等待计时完成


@tornado.gen.coroutine
def minute_loop2():
    """比较严格的60s周期循环"""
    while True:
        sleep = tornado.gen.sleep(2)  # 开始计时
        yield do_something(minute_loop2.__name__)  # 执行间隔协程任务
        yield sleep  # "等待"计时结束


# 启动方法
tornado.ioloop.IOLoop.current().spawn_callback(minute_loop1)
tornado.ioloop.IOLoop.current().spawn_callback(minute_loop2)

方式2:

# tornado.ioloop.PeriodicCallback(callback, callback_time, io_loop=None)

# 需要注意`callback_time`的单位是`微秒`, 一般`PeriodicCallback`是不执行`协程`任务的,
# 另外如果执行的`callback`耗时比`callback_time`还要长, 那么
# 应该到点执行的下一次`callback`会被跳过,并放回到执行列表中, 在下一次到点的时候执行

COUNT = 0

def periodic_callback_print():
    global COUNT
    if COUNT < 3:
        COUNT += 1
        time.sleep(2)
        print 'i have been call back %s times and now is %s' % (COUNT, int(time.time()))

ms_loop_time = 1000

# 启动方法, 需要先创建任务, 然后才能启动
# 创建任务
periodic_schedules_one = tornado.ioloop.PeriodicCallback(periodic_callback_print, ms_loop_time)
# 启动
periodic_schedules_one.start()
# 确认状态
assert periodic_schedules_one.is_running()
# 停止
periodic_schedules_one.stop()

循环/迭代

Python 3.5之前, 在协程中实现迭代会比较麻烦, 你需要将循环的条件与yield结果分离. 例如下面这个使用Motor(异步MongoDB驱动)的例子. 不过在Python 3.5+里面, 新增的async for可以实现异步迭代.

import motor
db = motor.MotorClient().test

# Python 3.5- 实现
@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()
        ...

# Python 3.5+ 实现
async def loop_example(collection):
    cursor = db.collection.find()
    async for doc in cursor:
        ...

本节内容就是这些, 下节内容将分析Tornado协程和异步实现的部分源码.

NEXT ===> Tornado应用笔记04-浅析源码

相关文章

网友评论

      本文标题:Tornado应用笔记03-协程与异步示例

      本文链接:https://www.haomeiwen.com/subject/htpeqxtx.html