美文网首页
基于asyncio实现的异步协程爬虫

基于asyncio实现的异步协程爬虫

作者: 辰辰沉沉沉 | 来源:发表于2019-01-20 12:55 被阅读0次
    前言

    以下内容是看500 lines or less中 A Web Crawler With asyncio Coroutines这个章节后做的一些记录。

    一个最简单的爬虫

    一个非常简单的get请求,爬取获取xkcd.com,

    import socket
    
    
    def crawl():
        sock = socket.socket()
        sock.connect(('xkcd.com', 80))
        request = 'GET / HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'
        sock.send(request.encode('ascii'))
        response = b''
        chunk = sock.recv(4096)
        while chunk:
            response += chunk
            chunk = sock.recv(4096)
    
        print(response)
    

    一个很简单的小的获取页面的方法,甚至说不上是爬虫。在单线程的情况下,这个方法一次只能爬取一个页面,因为socket的connect 和recv两个方法都是阻塞的。一般再配合上多线程使用,多线程再配合上线程池,爬取速度也不慢。

    但是,这就满足了吗?线程这玩意还是蛮昂贵的,多线程搞爬虫总不算那么回事。

    异步爬虫

    简单的Event loop
    import socket
    
    sock = socket.socket()
    sock.setblocking(False)
    try:
        sock.connect(('xkcd.com', 80))
    except BlockingIOError:  # nonblocking的情况下,会抛出这个异常,无视掉就好
        pass
    request = 'GET / HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'
    encoded = request.encode('ascii')
    while True:
        try:
            # 尝试去发送,如果连接还没连接就会抛错,直到连接成功并发送成功,再break出去
            sock.send(encoded)
            break  # Done.
        except OSError as e:
            pass
    
    

    这边我们使用了非阻塞的socket。在C里边,socket会使用errnoEINPROGRESS来判断连接是否完成,但是python呢,我们只能用while True去一遍遍的查看,直到发送成功了再退出。

    上边的方法完全就是扯淡,因为需要不停的遍历,跟最开始阻塞的没啥区别。因此我们引入了Python3.4的DefaultSelector。它提供了select相关的api,可以注册相应的IO事件,在事件完成之后回调。

    import socket
    from selectors import DefaultSelector, EVENT_WRITE
    
    selector = DefaultSelector()
    
    sock = socket.socket()
    sock.setblocking(False)
    try:
        sock.connect(('xkcd.com', 80))
    except BlockingIOError:  # nonblocking的情况下,会抛出这个异常,无视掉就好
        pass
    
    
    def connected():
        selector.unregister(sock.fileno())
        print("connected")
    
    
    selector.register(sock.fileno(), EVENT_WRITE, connected)
    
    

    上边例子就是注册了EVENT_WRITE事件,selector.register方法告诉selector我们在等待EVENT_WRITE的状态。并把connected方法作为data传了进去。

    为了在socket达到预定的状态时,回调connected方法,我们需要一个循环来获取。

    def loop():
        while True:
            events = selector.select()
            for event_key, event_mask in events:
                callback = event_key.data
                callback()
    

    可以理解为,在socket变为可写状态后,会自动加到一个队列里边去,而我们需要做的,就是不断从队列里边读取出来,并做相应的处理。

    至此,我们完成了一个简单的事件循环。可以想象,如要要爬取多个页面,只需要不停的注册到selector里边,再配合loop方法,可以高效率的处理网络请求。

    配以回调的异步爬虫

    首先,我们需要两个set,分别保存需要爬取的url以及一个已经见过的url(用来去重)

    urls_todo = set(["/"])
    seen_urls = set(["/"])
    

    在爬取的过程中,会涉及到很多回调。在连接完成时,我们要回调connected方法,然后发送一个GET请求过去。然后开始等待响应,同样的,这里也需要注册一个回调。为了更方便的管理回调,我们设计一个Fetcher类。

    class Fetcher:
        def __init__(self, url):
            self.response = b''  # Empty array of bytes.
            self.url = url
            self.sock = None
    

    这个类传入一个url,并调用fetch方法启动,完成后,响应会写在response中。

       # Method on Fetcher class.
        def fetch(self):
            self.sock = socket.socket()
            self.sock.setblocking(False)
            try:
                self.sock.connect(('xkcd.com', 80))
            except BlockingIOError:
                pass
    
            # Register next callback.
            selector.register(self.sock.fileno(),
                              EVENT_WRITE,
                              self.connected)
    

    首先,建立一个非阻塞的socket,注册下connected方法。然后,会有一个event loop去处理并回调。

    fetcher = Fetcher('/1/')
    fetcher.fetch()
    
    while True:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback(event_key, event_mask)
    

    如上,在连接建立后,便能从selector.select()中获取到已经连接的socket,获取其回调,并执行。

        # Method on Fetcher class.
        def connected(self, key, mask):
            print('connected!')
            selector.unregister(key.fd)
            request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)
            self.sock.send(request.encode('ascii'))
    
            # Register the next callback.
            selector.register(key.fd,
                              EVENT_READ,
                              self.read_response)
    

    一般我们在send之前要先检查请求体的大小,看看是否可以一次性发送。但我们这发送的内容确实少,就免了这步了。在最后,注册EVENT_READ,event loop会在socket可读或者已经关闭的时候调用read_response方法。

        # Method on Fetcher class.
        def read_response(self, key, mask):
            global stopped
    
            chunk = self.sock.recv(4096)  # 4k chunk size.
            if chunk:
                self.response += chunk
            else:
                selector.unregister(key.fd)  # Done reading.
                links = self.parse_links()
    
                # Python set-logic:
                for link in links.difference(seen_urls):
                    urls_todo.add(link)
                    Fetcher(link).fetch()  # <- New Fetcher.
    
                seen_urls.update(links)
                urls_todo.remove(self.url)
                if not urls_todo:
                    stopped = True
    

    这里我们做了两件事,如果socket中还有没读取完的数据,就读出来加到response中,然后控制权交回给event loop,当然,这种情况下socket依旧是可读状态,会再次调用该回调方法。另外,如果对方已经发送完毕,socket被关闭了,且socket.recv方法已经读不出新的数据了(已经读取完毕),我们便可以对已经保存下来的response做一些处理了。这边做的就是简单的去了个重,然后挨个建立新的fetcher并开始抓取。

    另外,我们加了个全局变量stopped来控制event loop。

    stopped = False
    
    def loop():
        while not stopped:
            events = selector.select()
            for event_key, event_mask in events:
                callback = event_key.data
                callback()
    

    整个程序会在所有页面爬取完成后结束。

    简单的回调异步爬虫到这似乎告一段落了,但是似乎还有哪里不太好。基于回调的方式一直是比较反人类,不友好的。设想一下,如果我们的异步爬虫在parse部分报了错

    Traceback (most recent call last):
      File "loop-with-callbacks.py", line 111, in <module>
        loop()
      File "loop-with-callbacks.py", line 106, in loop
        callback(event_key, event_mask)
      File "loop-with-callbacks.py", line 51, in read_response
        links = self.parse_links()
      File "loop-with-callbacks.py", line 67, in parse_links
        raise Exception('parse error')
    Exception: parse error
    

    光看这个error trace,谁能直接看出来是哪个页面出了错呢?基于回调的方式实现的异步爬虫有一个致命的缺点,那就是非常难debug,非常难以理解。

    Coroutines

    Python 3.4引入了一个叫asyncio,以及一个aiohttp的包,我们可以很简洁的重构之前的代码。

        @asyncio.coroutine
        def fetch(self, url):
            response = yield from self.session.get(url)
            body = yield from response.read()
    

    coroutine是这么一个概念,它就是一个子程序,在执行的过程中,可以在子程序内部终端,然后转而去执行别的子程序,在适当的时候再接回来执行。python中有很多可以做协程的包。在Python3.4的时候引入了一个基于generators、Future以及yield from语法实现的标准协程库,叫asyncio,

    为了更好的理解,接下来将会手把手用generator实现一个协程。

    python generators是如何工作的

    在讨论生成器之前,先看下一般的python方法是如何工作的。

    >>> def foo():
    ...     bar()
    ...
    >>> def bar():
    ...     pass
    

    一般的,python解释器(一个C的程序)会调用PyEval_EvalFrameEx去逐帧执行。

    >>> import dis
    >>> dis.dis(foo)
      2           0 LOAD_GLOBAL              0 (bar)
                  3 CALL_FUNCTION            0 (0 positional, 0 keyword pair)
                  6 POP_TOP
                  7 LOAD_CONST               0 (None)
                 10 RETURN_VALUE
    

    这是foo方法的字节码,foo把bar方法加载到栈中,并调用,而后pop出值,加载None到栈中,最后返回None。

    PyEval_EvalFrameEx遇到CALL_FUNCTION时,他会创建一个新的Python stack frame,然后PyEval_EvalFrameEx递归的去调用新的frame。

    这边有个很有意思的点,python帧栈是分配在堆内存中的,这个帧栈也是指一个很普通的栈结构,这意味着我们可以在方法之外去操作这个栈,在方法之外,我们也可以很方便的去操作其中的某一帧。

    >>> import inspect
    >>> frame = None
    >>> def foo():
    ...     bar()
    ...
    >>> def bar():
    ...     global frame
    ...     frame = inspect.currentframe()
    ...
    >>> foo()
    >>> # The frame was executing the code for 'bar'.
    >>> frame.f_code.co_name
    'bar'
    >>> # Its back pointer refers to the frame for 'foo'.
    >>> caller_frame = frame.f_back
    >>> caller_frame.f_code.co_name
    'foo'
    

    接下来我们来看生成器方法。

    >>> def gen_fn():
    ...     result = yield 1
    ...     print('result of yield: {}'.format(result))
    ...     result2 = yield 2
    ...     print('result of 2nd yield: {}'.format(result2))
    ...     return 'done'
    ...     
    

    当解释器看到yield时,它会知道gen_fn方法是一个生成器方法,然后会给他打一个标记

    >>> # The generator flag is bit position 5.
    >>> generator_bit = 1 << 5
    >>> bool(gen_fn.__code__.co_flags & generator_bit)
    True
    

    然后,在调用生成器方法时,解释器通过这个标记发现这不是一个普通的方法,于是乎,创建了一个生成器。

    >>> gen = gen_fn()
    >>> type(gen)
    <class 'generator'>
    

    每个生成器对象都指向同样额代码,但是分别拥有自己的帧。这个帧并不在任何真正的栈上,二十在堆内存中等待被使用。

    这个帧有个“最后个指令”的指针,指向他最近一次执行的命令。最一开始,这个指针的值是-1,意味着生成器没有开始。

    >>> gen.gi_frame.f_lasti
    -1
    

    当我们调用了send后,生成器首次执行到yield,然后暂停,返回一个

    >>> gen.send(None)
    1
    

    现在生成器对象的指令指针的位置离已开有隔了3字节,完成了编译好的56字节python代码的一部分。

    >>> gen.gi_frame.f_lasti
    3
    >>> len(gen.gi_code.co_code)
    56
    

    生成器能在任何时候被唤醒,因为它的帧并不在栈中,而是在堆上。它在调用层级中的位置并不固定,不需要执行先入后出的顺序。

    接着我们发送一个hello过去,hello就成了yield表达式的值。

    >>> gen.send('hello')
    result of yield: hello
    2
    

    这时候,在这个帧中包含了本地变量result

    >>> gen.gi_frame.f_locals
    {'result': 'hello'}
    

    其他由gen_fn产生的生成器也拥有自己的栈帧以及自己的本地变量。

    当再一次调用send方法,生成器到达第二个yield,并抛出StopIteration

    >>> gen.send('goodbye')
    result of 2nd yield: goodbye
    Traceback (most recent call last):
      File "<input>", line 1, in <module>
    StopIteration: done
    

    基于生成器的协程

    如我们上边说的,一个生成器可以暂停,然后在任何我们想要的地方继续执行,并接受一个值,返回一个值,用来做协程再好不过。

    接下来,我们要建立一个简易版的“asyncio”。

    首先需要一个future对象,我们希望的future是这么一个存在:一个生成器在暂停的时候yield一个future,然后在future获取到值(set_result)的时候继续这个生成器。

    class Future:
        def __init__(self):
            self.result = None
            self._callbacks = []
    
        def add_done_callback(self, fn):
            self._callbacks.append(fn)
    
        def set_result(self, result):
            self.result = result
            for fn in self._callbacks:
                fn(self)
    

    future初始化时会是一个"pending"状态,在调用了set_result方法后继续执行生成器。

    接下来我们要用future来改造fetch方法。原本的方法如下:

    class Fetcher:
        def fetch(self):
            self.sock = socket.socket()
            self.sock.setblocking(False)
            try:
                self.sock.connect(('xkcd.com', 80))
            except BlockingIOError:
                pass
            selector.register(self.sock.fileno(),
                              EVENT_WRITE,
                              self.connected)
    
        def connected(self, key, mask):
            print('connected!')
            # And so on....
    

    原本的方法会先创建连接,注册一个connected的回调,然后会在连接创建后回调。我们用future和生成器对其改造。

        def fetch(self):
            sock = socket.socket()
            sock.setblocking(False)
            try:
                sock.connect(('xkcd.com', 80))
            except BlockingIOError:
                pass
    
            f = Future()
    
            def on_connected():
                f.set_result(None)
    
            selector.register(sock.fileno(),
                              EVENT_WRITE,
                              on_connected)
            yield f
            selector.unregister(sock.fileno())
            print('connected!')
    

    因为包含了yield,该fetch方法是一个生成器。另外我们创建了一个future,并把它yield出来。我们希望这个生成器暂停,一直到连接创建成功后再继续,那么谁来做继续生成器的操作呢?我们引入了Task对象。

    class Task:
        def __init__(self, coro):
            self.coro = coro
            f = Future()
            f.set_result(None)
            self.step(f)
    
        def step(self, future):
            try:
                next_future = self.coro.send(future.result)
            except StopIteration:
                return
    
            next_future.add_done_callback(self.step)
    
    # Begin fetching http://xkcd.com/353/
    fetcher = Fetcher('/353/')
    Task(fetcher.fetch())
    
    loop()
    

    在给fetch生成的生成器发送None后,fetch生成器启动,然后yield一个future,也就是Task中的next_future。当socket连接创建成功后,事件循环会调用on_connected方法,然后触发future中的回调,也就是step方法,从而继续fetch生成器。

    用yield from来重构协程

    在socket连接王城后,我们可以继续发送get请求以及获取响应,这些都可以在同一个生成器方法中实现。

        def fetch(self):
            # ... connection logic from above, then:
            sock.send(request.encode('ascii'))
    
            while True:
                f = Future()
    
                def on_readable():
                    f.set_result(sock.recv(4096))
    
                selector.register(sock.fileno(),
                                  EVENT_READ,
                                  on_readable)
                chunk = yield f
                selector.unregister(sock.fileno())
                if chunk:
                    self.response += chunk
                else:
                    # Done reading.
                    break
    

    这个方法显得很长,因为我们把发的过程,分段收的过程都放在同一个方法中了,显得很乱,用yield from语法可以让方法看上去美观很多。

    这边我们廷议两个常用的方法,一个是read方法,可以获取一定大小的数据,另一个是read_all方法,可以获取全部数据。

    def read(sock):
        f = Future()
    
        def on_readable():
            f.set_result(sock.recv(4096))
    
        selector.register(sock.fileno(), EVENT_READ, on_readable)
        chunk = yield f  # Read one chunk.
        selector.unregister(sock.fileno())
        return chunk
        
    
    def read_all(sock):
        response = []
        # Read whole response.
        chunk = yield from read(sock)
        while chunk:
            response.append(chunk)
            chunk = yield from read(sock)
    
        return b''.join(response)
    

    以及使用这两个方法重构fetch

    class Fetcher:
        def fetch(self):
             # ... connection logic from above, then:
            sock.send(request.encode('ascii'))
            self.response = yield from read_all(sock)
    

    总结一下,我们上边实现了最最最简单版本的future和task,并且也知道了为什么asyncio在并发I/O上会比多线程版本有更好的性能,以及比回调版本更加清晰易懂。当然,这只是最简单的基础版,真正标准库中的asyncio实现了更多的东西,也更容易去写。

    对于一个熟悉asyncio的人来说,实现上边的功能会更加简单。

        @asyncio.coroutine
        def fetch(self, url):
            response = yield from self.session.get(url)
            body = yield from response.read()
    

    接下来,真正回到主题,实现一个异步的爬虫。

    爬虫

    接下来我们要用asyncio来实现一个异步的爬虫。

    首先描述下这个爬虫会做哪些事。首先爬虫爬取第一个页面,并解析其中的链接,加入到队列中,再继续爬取队列中的链接。另外还需要考虑以下几点:限制并发数量,让服务器和客户端压力不至于太大;在爬取完一个页面后,立马从队列中获取下个要爬去的页面;但队列中剩余url数量小于并发数量时,一部分worker需要先暂停,等到有足够数量url后继续;在全部爬取完成后退出。

    想象下,如果是一个多线程爬虫,我们会如何去做?可能我们会使用一个标准库的队列,然后把url都丢到队列中去,然后记录下任务的数量;当线程完成爬取后,调用task_done方法。主线程使用Queue.join()方法阻塞,直到所有的worker都结束。

    协程爬虫也是类似的,使用的是asyncio中的queue

    try:
        from asyncio import JoinableQueue as Queue
    except ImportError:
        # In Python 3.5, asyncio.JoinableQueue is
        # merged into Queue.
        from asyncio import Queue
    

    另外,我们会有一个crawler的类,并有crawl方法,然后在事件驱动中启动crawl

    loop = asyncio.get_event_loop()
    
    crawler = crawling.Crawler('http://xkcd.com',
                               max_redirect=10)
    
    loop.run_until_complete(crawler.crawl())
    

    爬虫最开始输入起始url和最大重定向数,以元祖的方式存在,并放到队列中

    class Crawler:
        def __init__(self, root_url, max_redirect):
            self.max_tasks = 10
            self.max_redirect = max_redirect
            self.q = Queue()
            self.seen_urls = set()
    
            # aiohttp's ClientSession does connection pooling and
            # HTTP keep-alives for us.
            self.session = aiohttp.ClientSession(loop=loop)
    
            # Put (URL, max_redirect) in the queue.
            self.q.put((root_url, self.max_redirect))
    

    现在在队列中未完成的任务数只有一个,我们用事件驱动方式启动

    loop.run_until_complete(crawler.crawl())
    

    crawl方法会启动workers,类似与多线程中的主线程,会用join方法阻塞,直到所有任务完成。

        @asyncio.coroutine
        def crawl(self):
            """Run the crawler until all work is done."""
            workers = [asyncio.Task(self.work())
                       for _ in range(self.max_tasks)]
    
            # When all work is done, exit.
            yield from self.q.join()
            for w in workers:
                w.cancel()
    

    worker会从队列中获取url,爬取,解析。每个worker都会运行worker方法。

        @asyncio.coroutine
        def work(self):
            while True:
                url, max_redirect = yield from self.q.get()
    
                # Download page and add new links to self.q.
                yield from self.fetch(url, max_redirect)
                self.q.task_done()
    

    python看到work方法包含yield from,会返回一个生成器。所以在crawl方法中,调用self.work方法诗词,并没有真正执行方法,而是创建了十个生成器。并且每个生成器都包装为一个Task。一个Task会获取生成器yield出的future,并用send的方式驱动生成器,以及触发future(和我们之前的future类似)。

    worker用以下方式去从队列中获取url

        url, max_redirect = yield from self.q.get()
    

    queue的get方法本身也是一个协程:如果队列为空会暂停,直到有新的任务进入队列。

    每当一个任务完成,都会调用task_done方法,该方法会把queue内部的未完成任务数减一。当所有的页面都爬取完毕,队列中任务为零,join方法继续执行,crawl方法也会继续,然后关闭所有的worker。

    接下来说说重定向。重定向可能导致A页面指向B,又重定向到C的情况,形成一条链 A—>B —>C,而这个B或者C可能使我们曾经访问过的。aiohttp会自动重定向,这样我们可能爬到重复的页面。因此,我们需要禁止自动重定向,并手动重定向并去重。

      @asyncio.coroutine
        def fetch(self, url, max_redirect):
            # Handle redirects ourselves.
            response = yield from self.session.get(
                url, allow_redirects=False)
    
            try:
                if is_redirect(response):
                    if max_redirect > 0:
                        next_url = response.headers['location']
                        if next_url in self.seen_urls:
                            # We have been down this path before.
                            return
    
                        # Remember we have seen this URL.
                        self.seen_urls.add(next_url)
    
                        # Follow the redirect. One less redirect remains.
                        self.q.put_nowait((next_url, max_redirect - 1))
                 else:
                     links = yield from self.parse_links(response)
                     # Python set-logic:
                     for link in links.difference(self.seen_urls):
                        self.q.put_nowait((link, self.max_redirect))
                    self.seen_urls.update(links)
            finally:
                # Return connection to pool.
                yield from response.release()
    

    如果是多线程版本,上述代码会变得异常复杂,需要考虑线程安全的问题:A线程判断url没爬过,不在seen_urls中,正准备把url插入seen_urls中,B线程也刚好爬到了相同链接,同样判断url没爬过,也加入到seen_urls中,导致一个url出现了两次,解决这个问题可能需要引入锁之类的,而协程代码就简单许多,无需考虑线程安全的问题。

    当fetch方法发现了新的链接,加入到seen_urls中,并加入到queue中,queue会把其内部维护的未完成任务数加一,然后继续执行主协程,join方法也会因为还有任务没完成会继续阻塞。如果没有新的链接,且queue也已经为空,最后一次调用task_done后,队列内部维护的计数器变为零,join方法不再阻塞。

    class Queue:
        def __init__(self):
            self._join_future = Future()
            self._unfinished_tasks = 0
            # ... other initialization ...
    
        def put_nowait(self, item):
            self._unfinished_tasks += 1
            # ... store the item ...
    
        def task_done(self):
            self._unfinished_tasks -= 1
            if self._unfinished_tasks == 0:
                self._join_future.set_result(None)
    
        @asyncio.coroutine
        def join(self):
            if self._unfinished_tasks > 0:
                yield from self._join_future
    

    之前被join阻塞的主协程继续执行,并最终结束。

    整个协程起始于crawl

    loop.run_until_complete(self.crawler.crawl())
    

    crawl是个生成器,为了驱动这个生成器,asyncio用task来包装了一下

    class EventLoop:
        def run_until_complete(self, coro):
            """Run until the coroutine is done."""
            task = Task(coro)
            task.add_done_callback(stop_callback)
            try:
                self.run_forever()
            except StopError:
                pass
    
    class StopError(BaseException):
        """Raised to stop the event loop."""
    
    def stop_callback(future):
        raise StopError
    

    当一个task完成,会抛出StopError,作为event loop的一个退出信号。

    那么,具体什么是task呢?看add_done_callback方法有点像我们之前说的future。确实,它就是future的子类。

    class Task(Future):
        """A coroutine wrapped in a Future."""
    

    一般情况下,future会在被调用set_result的时候被激活,而task会在协程退出的时候激活。

        # Method of class Task.
        def step(self, future):
            try:
                next_future = self.coro.send(future.result)
            except CancelledError:
                self.cancelled = True
                return
            except StopIteration as exc:
    
                # Task resolves itself with coro's return
                # value.
                self.set_result(exc.value)
                return
    
            next_future.add_done_callback(self.step)
    

    task.add_done_callback(stop_callback)就意味着将会在task完成的时候调用stop_callback,抛出StopError,致使整个循环结束。

    结论

    其实讲了那么多,感觉跟爬虫确实没什么关系,大多是跟作者走了一遍asyncio协程的一些思想。仔细读了好几遍,结果写下来还是不伦不类的,既不像翻译也不像是总结,自己水平还是次了点。

    相关文章

      网友评论

          本文标题:基于asyncio实现的异步协程爬虫

          本文链接:https://www.haomeiwen.com/subject/vojxjqtx.html