美文网首页
Tornado基础: 异步与WebSocket

Tornado基础: 异步与WebSocket

作者: 木叶苍蓝 | 来源:发表于2023-05-23 17:46 被阅读0次

    认识异步

    同步

    我们用两个函数来模拟两个客户端请求,并依次进行处理:

    # coding:utf-8
    
    def req_a():
        " 模拟请求a "
        print("开始处理请求 req_a")
        print("完成处理请求 req_a")
    
    
    def req_b():
        " 模拟请求b "
        print("开始处理请求 req_b")
        print("完成处理请求 req_b")
    
    
    def main():
        " 模拟 tornado 框架,处理两个请求 "
        req_a()
        req_b()
    
    if __name__ == "__main__":
        main()
    

    执行结果

    开始处理请求req_a
    完成处理请求req_a
    开始处理请求req_b
    完成处理请求req_b
    

    同步是按部就班的依次执行,始终按照同一步调执行,上一个步骤未执行完不会执行下一步。
    如果在处理请求 req_a 时需要执行一个耗时的工作(如IO),其执行过程如何?

    # coding:utf-8
    import time
    
    
    def long_io():
        " 模拟耗时 IO 操作 "
        print("开始执行 IO 操作")
        time.sleep(5)
        print("完成 IO 操作")
        return "IO result"
    
    
    def req_a():
        print("开始处理请求 req_a ")
        ret = long_io()
        print("ret: %s" % ret)
        print("完成处理请求 req_a")
    
    
    def req_b():
        print("开始处理请求 req_b")
        print("完成处理请求 req_b")
    
    
    def main():
        req_a()
        req_b()
    
    
    if __name__ == "__main__":
        main()
    

    执行过程:

    开始处理请求req_a
    开始执行IO操作
    完成IO操作
    完成处理请求req_a
    开始处理请求req_b
    完成处理请求req_b
    

    在上面的测试中,我们看到耗时的操作会将代码执行阻塞住,即 req_a 未处理完 req_b 是无法执行的。

    异步

    对于耗时的过程,我们将其交给别人(其他的线程)去执行,而我们继续往下处理,当别人执行完耗时操作后再将结果反馈给我们,这就是我们所说的异步。我们用容易理解的线程机制来实现异步。

    回调写法实现原理
    import time
    import thread
    
    def long_io(callback):
        """ 将耗时的操作交给另一个线程来处理 """
        def fun(cb):
            """ 耗时操作 """
            print("开始执行 IO 操作")
            time.sleep(5)
            print("完成 IO 操作,并执行回调函数")
            cb("IO result")  # 执行回调函数
        thread.start_new_thread(fun, (callback,))  # 开启线程执行耗时操作
    
    
    def on_finish(ret):
        """ 回调函数 """
        print("开始执行回调函数 on_finish")
        print("ret: %s" % ret)
        print("完成执行回调函数 on_finish")
    
    
    def req_a():
        print("开始处理请求 req_a")
        long_io(on_finish)
        print("离开处理请求 req_a")
    
    
    def req_b():
        print("开始处理请求 req_b")
        time.sleep(2)
        print("完成处理请求 req_b")
    
    
    def main():
        req_a()
        req_b()
        while 1:  # 添加此句防止程序退出,保证线程可以执行完
            pass
    
    if __name__ == "__main__":
        main()
    

    执行过程

    开始处理请求req_a
    离开处理请求req_a
    开始处理请求req_b
    开始执行IO操作
    完成处理请求req_b
    完成IO操作,并执行回调函数
    开始执行回调函数on_finish
    ret: io result
    完成执行回调函数on_finish
    

    异步的特点是程序存在多个步调,即本属于同一个过程的代码可能在不同的步调上同时执行。

    协程写法实现原理

    在使用回调函数写异步程序时,需将本属于一个执行逻辑(处理请求a)的代码拆分成两个函数 req_a 和 on_finish,这与同步程序的写法相差很大。而同步程序更便于理解业务逻辑,所以我们能否用同步代码的写法来编写异步程序?

    import time
    import thread
    
    gen = None  # 全局生成器,供 long_io 使用
    
    def long_io():
        def fun():
            print("开始执行 IO 操作")
            global gen
            time.sleep(5)
            try:
                print("完成IO操作,并 send 结果唤醒挂起程序继续执行")
                gen.send("IO result")  # 使用 send 返回结果并唤醒程序继续执行
            except StopIteration:  # 捕获生成器完成迭代,防止程序退出
                pass
    
        thread.start_new_thread(fun, ())
    
    
    def req_a():
        print("开始处理请求 req_a")
        ret = yield long_io()
        print("ret: %s" % ret)
        print("完成处理请求 req_a")
    
    
    def req_b():
        print("开始处理请求 req_b")
        time.sleep(2)
        print("完成处理请求 req_b")
    
    
    def main():
        global gen
        gen = req_a()
        gen.next()
        req_b()
        while 1:
            pass
    
    
    if __name__ == "__main__":
        main()
    

    执行过程

    开始处理请求req_a
    开始处理请求req_b
    开始执行IO操作
    完成处理请求req_b
    完成IO操作,并send结果唤醒挂起程序继续执行
    ret: io result
    完成处理请求req_a
    
    升级版本

    我们在上面编写出的版本虽然 req_a 的编写方式很类似于同步代码,但是在 main 中调用 req_a 的时候却不能将其简单的视为普通函数,而是需要当成生成器对待。

    现在,我们尝试修改,让 req_a 与 main 的编写都类似与同步代码。

    # coding:utf-8
    
    import time
    import thread
    
    gen = None  # 全局生成器,供 long_io 使用
    
    def gen_coroutine(f):
        def wrapper(*args, **kwargs):
            global gen
            gen = f()
            gen.next()
        return wrapper
    
    def long_io():
        def fun():
            print("开始执行 IO 操作")
            global gen
            time.sleep(5)
            try:
                print("完成 IO 操作,并 send 结果唤醒挂起程序继续执行")
                gen.send("IO result")
            except StopIteration:  # 捕获生成器完成迭代,防止程序退出
                pass  
        thread.start_new_thread(fun, ())
    
    
    @gen_coroutine
    def req_a():
        print("开始处理请求 req_a")
        ret = yield long_io()
        print("ret: %s " % ret)
        print("完成处理请求 req_a")
    
    def req_b():
        print("开始处理请求 req_b")
        time.sleep(2)
        print("开始处理请求 req_b")
    
    def main():
        req_a()
        req_b()
        while 1:
            pass
    
    if __name__ == "__main__":
        main()
    

    执行结果

    开始处理请求req_a
    开始处理请求req_b
    开始执行IO操作
    完成处理请求req_b
    完成IO操作,并send结果唤醒挂起程序继续执行
    ret: io result
    完成处理请求req_a
    

    ####### 最终版本
    刚刚完成的版本依然不理想,因为存在一个全局变量来供 long_io 使用,我们现在再次改写程序,消除全局变量 gen。

    # coding:utf-8
    
    import time
    import thread
    
    def gen_coroutine(f):
        def wrapper(*args, **kwargs):
            gen_f = f()  # gen_f 为生成器 req_a
            r = gen_f.next()  # r 为生成器 long_io
            def fun(g):
                ret = g.next()  # 执行生成器 long_io
                try:
                    gen_f.send(ret)
                except StopIteration:
                    pass
    
            thread.start_new_thread(fun, (r,))
        return wrapper
    
    def long_io():
        print("开始执行 IO 操作")
        time.sleep(5)
        print("完成 IO 操作,yield 回操作结果")
        yield "io result"
    
    @gen_coroutine
    def req_a():
        print("开始处理请求 req_a")
        ret = yield long_io()
        print("ret: %s" % ret)
        print("完成处理请求 req_a")
    
    def req_b():
        print("开始处理请求 req_b")
        time.sleep(2)
        print("完成处理请求 req_b")
    
    def main():
        req_a()
        req_b()
    
        while 1:
            pass
    
    if __name__ == "__main__":
        main()
    
    执行过程
    开始处理请求req_a
    开始处理请求req_b
    开始执行IO操作
    完成处理请求req_b
    完成IO操作,yield回操作结果
    ret: io result
    完成处理请求req_a
    

    这个最终版本就是理解 Tornado 异步编程原理的最简单模型。但是,Tornado 实现异步的机制不是线程,而是 epoll,即将异步过程交给 epoll 执行并进行监视回调。
    需要注意的一点是,我们实现的版本严格意义上来说,不能算是协程,因为两个程序的挂起与唤醒是在两个线程上实现的,而 Tornado 利用 epoll 来实现异步,程序的挂起与唤醒始终在一个线程上,由 Tornado 自己来调度,属于意义上的协程。虽然如此,但并不妨碍我们理解 Tornado 异步编程原理。

    Tornado 异步

    因为 epoll 主要是用来解决网络 IO 的并发问题,所以 Tornado 的异步编程也主要体现在网络 IO 的异步上,即异步 Web 请求。

    tornado.httpclient.AsyncHTTPClient

    Tornado 提供了一个异步 Web 请求客户端 tornado.httpclient.AsyncHTTPClient 用来进行异步 Web 请求。

    fetch(request, callback=None)

    用于执行一个 web 请求 request,并异步返回一个 tornado.httpclient.HTTPResponse 响应。
    request 可以是一个 url,也可以是一个 tornado.httpclient.HTTPRequest 对象。如果是 url,fetch 会自己构造一个 HTTPRequest 对象。
    ####### HTTPRequest
    HTTP 请求类,HTTPRequest 的构造函数可以接收众多构造参数,最常用的如下:

    • url(string): 要访问的 url,此参数必传,除此之外均可为可选参数。
    • method(string):HTTP 访问方式,如 "GET" 或 “POST”,默认是 GET
    • headers(HTTPHeader or dict):附加的 HTTP 协议头
    • body:HTTP 请求的请求体
    HTTPResponse

    HTTP 响应类,其常用属性如下:

    • code: HTTP 状态码,如 200 或 404
    • reason: 状态码描述信息
    • body: 响应体字符串
    • error: 异常(可有可无)

    WebSocket

    WebSocket 是 HTML5 规范中的新提出来的客户端-服务器通讯协议,协议本身使用新的 ws://URL 格式
    WebSocket 是独立的,创建在 TCP 上的协议,和 HTTP 的唯一关联是使用 HTTP 协议的 101 状态码进行协议切换,使用的 TCP 端口是 80 ,可以用于绕过大多数防火墙的限制。
    WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端直接向客户端推送数据而不需客户端进行请求,两者之间可以创建持久性的连接,并允许进行双向传送。
    目前常见的浏览器如 Chrome、IE、Firefox、Safari、Opera 等都支持 WebSocket,同时需要服务端程序支持 WebSocket。

    Tornado 的 WebSocket 模块

    Tornado 提供支持 WebSocket 的模块是 tornado.websocket ,其中提供了一个 WebSocketHandler 类用于处理通讯。

    WebSocketHandler.open()

    当一个 WebSocket 连接建立后被调用。

    WebSocketHandler.on_message(message)

    当客户端发送消息 message 过来时被调用,注意此方法必须被重写。

    WebSocketHandler.on_close()

    当 WebSocket 连接关闭后调用。

    WebSocketHandler.write_message(message, binary=False)

    向客户端发送消息 messagea,message 可以是字符串或字典(字典会被转为 json 字符串)。若 binary 为 False,则 message 以 utf8 编码发送;二进制模式(binary=True)时,可发送任何字节码。

    WebSocketHandler.close()

    关闭 WebSocket 连接。

    WebSocketHandler.check_origin(origin)

    判断源 origin,对于符合条件(返回判断结果为 True)的请求源 origin 允许其连接,否则返回 403。可以重写此方法来解决 WebSocket 的跨域请求(如始终 return True)。

    前端 JavaScript 编写
    var ws = new WebSocket("ws://127.0.0.1:8888/websocket"); // 新建一个ws连接
    ws.onopen = function() {  // 连接建立好后的回调
       ws.send("Hello, world");  // 向建立的连接发送消息
    };
    ws.onmessage = function (evt) {  // 收到服务器发送的消息后执行的回调
       alert(evt.data);  // 接收的消息内容在事件参数evt的data属性中
    };
    

    相关文章

      网友评论

          本文标题:Tornado基础: 异步与WebSocket

          本文链接:https://www.haomeiwen.com/subject/yqutedtx.html