美文网首页
异步 IO,多路复用学习+生成器/协程

异步 IO,多路复用学习+生成器/协程

作者: vckah | 来源:发表于2018-07-19 14:55 被阅读0次

    异步 IO:遇到 IO 请求不等待,IO 请求完成后自动调用回调函数即可。
    IO 多路复用:监听多个 socket 对象,当其有数据时,自动通知。有 select,poll 和 epoll 模型。

    # socket 不阻塞时候该怎么写
    import socket
    sk = socket.socket()
    
    sk.bind(('127.0.0.1', 8000))
    sk.setblocking(False)
    sk.listen()
    conn_l = []
    del_conn = []
    while True:
        try:
            conn, addr = sk.accept()
            print('建立连接了')
            # msg = conn.recv(1024)   # 不阻塞 ,但是没有消息会报错
            # print(msg)
            conn_l.append(conn)
        except BlockingIOError as e:
            for con in conn_l: 
                try:
                    msg = conn.recv(1024)
                    if msg == b'':
                        del_conn.append(conn)
                    print(msg)
                    conn.send(b'byebye')
                except BlockingIOError as e:
                    pass
            for con in del_conn:
                if con in conn_l:
                    conn_l.remove(conn)
            del_conn.clear()
    

    select 模型

    # select
    import socket
    import select
    socket = socket()
    sk.bind(('127.0.0.1', 8000))
    sk.setblocking(False)
    sk.listen()
    
    read_lst = []
    while  True:
        r_lst, w_lst, x_lst = select.select(read_lst, [], [])
        for i in r_lst:
            if i is sk:
                conn, addr = i.accept()
                read_lst.append(conn)
            else:
                ret= i.recv(1024)
                if ret == b'':
                    i.close()
                    read_lst.remove(i)
                    continue
                print(ret)
    # rlist 表示有人给我发送数据
    # wlist 表示我已经和别人建立连接
    # 对象必须有 fileno 方法,只要对 socket 对象进行一次封装即可
    
    import socket
    import select
    
    
    class HttpReqeust:
        def __init__(self, sk, host, callback):
            self.socket = sk
            self.host = host
            self.callback= callback
        def fileno(slef):
            return self.socket.fileno()
    
    
    class AsyncRequest:
        def __init__(self):
            self.conn = []
            self.connection = []    # 用于检测是否连接是否成功
    
        def add_reqeust(self, host, callback):
            try:
                sk = socket.socket()
                sk.setblocking(0)
                sk.connect((host, 80))
            except BlockingIOError as e:
                pass
    
            request = HttpReqeust(sk, host, callback)
            self.conn.append(request)
            self.connection.append(request)
    
        def run(self):
            while True:
                rlist, wlst, elist  = select.select(self,conn, self.connection, self.conn, 0.05)
                for w in wlist:
                    tpl = "get/ http/1.0\r\nHost:%s\r\n\r\n"%(w.host,)
                    w.socket.send(bytes(tpl, encoding="utf-8"))
                    self.connection.remove(w)
                for r in rlist:
                    recv_data = bytes()
                    while True:
                        try:
                            chunk = r.socket.recv(8096)
                            recv_data += chunk
                        except Exception as e:
                            break
                    # print(r.host, recv_data)
                    r.callback(recv_data)
                    r.socket.close()
                    self.conn.remove(r)
    
                if len(self.conn) == 0:
                    break
    
    def f1(data):
        pass
    def f2(data):
       pass
    
    url_list = [
      {'host': '', 'callback': f1},
      {'host': '', 'callback':  f2},
    ]
    req = AsyncRequest()
    for itemin url_list:
        req.add_request(item['host'], item['callback'])
    
    req.run()
    
     #  协成 + 异步 IO ----> 1 个线程发送 N 个 Http 请求
     #      - asyncio 不支持 http,只支持 socket 请求,封装字符串,只需需要封装 http 数据包
     #      - aiohttp 模块,封装了 http 数据包  asyncio + aiohttp
     #      - requests   asyncio + requests
     #      - gevent + requests --> grequests
     #      - Twisted   -> scrapy 基于 Twisted  defer 对象 getPage reactor
     #      - tornado 
     #      gevent - Twisted > Tornado > asynico
    

    新篇章

    基于 select( poll, epoll) + 回调 + 事件循环

    import socket
    from urllib.parse import urlparse
    from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
    
    selector = DefaultSelector()
    #使用select完成http请求
    urls = []
    stop = False
    
    class Fetcher:
        def connected(self, key):
            selector.unregister(key.fd)
            self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
            selector.register(self.client.fileno(), EVENT_READ, self.readable)
    
        def readable(self, key):
            d = self.client.recv(1024)
            if d:
                self.data += d
            else:
                selector.unregister(key.fd)
                data = self.data.decode("utf8")
                html_data = data.split("\r\n\r\n")[1]
                print(html_data)
                self.client.close()
                urls.remove(self.spider_url)
                if not urls:
                    global stop
                    stop = True
    
        def get_url(self, url):
            self.spider_url = url
            url = urlparse(url)
            self.host = url.netloc
            self.path = url.path
            self.data = b""
            if self.path == "":
                self.path = "/"
    
            # 建立socket连接
            self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.client.setblocking(False)
    
            try:
                self.client.connect((self.host, 80))  # 阻塞不会消耗cpu
            except BlockingIOError as e:
                pass
    
            #注册
            selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
    
    def loop():
        #事件循环,不停的请求socket的状态并调用对应的回调函数
        #1. select本身是不支持register模式
        #2. socket状态变化以后的回调是由程序员完成的
        while not stop:
            ready = selector.select()
            for key, mask in ready:
                call_back = key.data
                call_back(key)
        #回调+事件循环+select(poll\epoll)
    
    if __name__ == "__main__":
        fetcher = Fetcher()
        import time
        start_time = time.time()
        for url in range(20):
            url = "...".format(url)
            urls.append(url)
            fetcher = Fetcher()
            fetcher.get_url(url)
        loop()
        print(time.time()-start_time)
    

    这种方式有三个缺点:
    可读性差,共享状态管理困难,异常处理困难
    所以后来 Python 出现了支持协程的生成器,进而出现 asyncio 这样一种异步解决方案。

    生成器

    启动一个生成器有两种方法,一种是调用 next,另一种方法是使用 .send(None)

    def te():
        a = yield 'no sense'
        yield 1
        return 'Ok'
    gen = te()
    gen.send(None)    <===> next(gen)
    print(next(gen))      ----> 抛出异常
    

    输出

    no sense
    1
    .....  ...  StopIteration: bobby
    

    不过只要将最后一个 next(gen) 调用改为

    try:
        print(next(gen))
    except StopIteration as e:
        print(e.value)
    

    这样就正常了

    • 生成器的 close() 方法
    def te():
        yield 1
        yield 2
        yield 3
        return 4
    gen = te()
    gen.send(None)
    gen.close()   
    如果在生成器总不捕获异常那么下一行会抛出异常
    如果生成器中处理了,如果之后还有 yield 语句,会在此处抛出一个 RuntimeError 异常,显示 忽略 GeneratorExit 异常,
    如果之后没有 yield,直接return 了,那么下一行会抛异常
    next(gen)        ----> 这一行会抛出异常 StopItertation
    

    调用 close 的话,需要处理异常

    try:
        yield 1
    excepe GeneratorExit:
        raise StopIteration
    

    GeneratorExit 是继承自 BaseException,它是更基础的类,与 Exception 不同。

    • 生成器的 throw() 方法
      可以向生成器扔一个异常,需要在里面捕获异常
    • yield from
      python 3.3 新加特性
    # 模仿 itertools.chain 方法
    def te(*args, **kwargs):
        for i in args:
            yield from i
            # for val in i:
                # yield val
    
    l = [1,2,3]
    d = {'a': 2, 'b':2}
    for value in te(l, d, range(5,10)):
        print(value)
    # 会依次输出各项
    
    def gen():
        yield 1
        pass
    # 委托生成器
    def g1(gen):
        yield from gen
    # 调用方
    def main():
        g = g1()
        g.send(None)
    

    相关文章

      网友评论

          本文标题:异步 IO,多路复用学习+生成器/协程

          本文链接:https://www.haomeiwen.com/subject/nzdtmftx.html