美文网首页
Scrapy数据流转分析(三)

Scrapy数据流转分析(三)

作者: Len_8030 | 来源:发表于2018-06-26 22:04 被阅读0次

    上一篇文章简单介绍了下Scrapy的启动,我们知道了scrapy.crawler.CrawlerProcess这个类是启动爬虫的幕后黑手。本文将深入到CrawlerProcess这个类中,分析Scrapy的调度逻辑。

    class CrawlerProcess(CrawlerRunner):
        
        def __init__(self, settings=None, install_root_handler=True):
            super(CrawlerProcess, self).__init__(settings)
            install_shutdown_handlers(self._signal_shutdown)
            configure_logging(self.settings, install_root_handler)
            log_scrapy_info(self.settings)
    
        def _signal_shutdown(self, signum, _):
            install_shutdown_handlers(self._signal_kill)
            signame = signal_names[signum]
            logger.info("Received %(signame)s, shutting down gracefully. Send again to force ",
                        {'signame': signame})
            reactor.callFromThread(self._graceful_stop_reactor)
    
        def _signal_kill(self, signum, _):
            install_shutdown_handlers(signal.SIG_IGN)
            signame = signal_names[signum]
            logger.info('Received %(signame)s twice, forcing unclean shutdown',
                        {'signame': signame})
            reactor.callFromThread(self._stop_reactor)
    
        def start(self, stop_after_crawl=True):
    
            if stop_after_crawl:
                d = self.join()
                # Don't start the reactor if the deferreds are already fired
                if d.called:
                    return
                d.addBoth(self._stop_reactor)
    
            reactor.installResolver(self._get_dns_resolver())
            tp = reactor.getThreadPool()
            tp.adjustPoolsize(maxthreads=self.settings.getint('REACTOR_THREADPOOL_MAXSIZE'))
            reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
            reactor.run(installSignalHandlers=False)  # blocking call
    
        def _get_dns_resolver(self):
            if self.settings.getbool('DNSCACHE_ENABLED'):
                cache_size = self.settings.getint('DNSCACHE_SIZE')
            else:
                cache_size = 0
            return CachingThreadedResolver(
                reactor=reactor,
                cache_size=cache_size,
                timeout=self.settings.getfloat('DNS_TIMEOUT')
            )
    
        def _graceful_stop_reactor(self):
            d = self.stop()
            d.addBoth(self._stop_reactor)
            return d
    
        def _stop_reactor(self, _=None):
            try:
                reactor.stop()
            except RuntimeError:  # raised if already stopped or in shutdown stage
                pass
    

    来看一下这个类有些啥方法:

    1. __init__方法调用父类的初始化方法,并注册了爬虫停止的回调和日志组件,最后一行打印出了配置信息
    2. _signal_shutdown_graceful_stop_reactor接收一个信号,优雅的(等所有爬虫把手上事情搞完)停止爬虫。
    3. 另外一组函数_signal_kill_stop_reactor就是强制停止爬虫。
    4. _get_dns_resolver则是创建一个DNS缓存,提高请求速度的
    5. 剩下的一个start方法主要是初始化一个reactor事件管理器。

    它主要是完成了爬虫的启动和收尾工作。那么它的调度逻辑应该是实现在父类中了,我们看一下它的父类scrapy.crawler.CrawlerRunner

    class CrawlerRunner(object):
        """
        This is a convenient helper class that keeps track of, manages and runs
        crawlers inside an already setup Twisted `reactor`_.
    
        The CrawlerRunner object must be instantiated with a
        :class:`~scrapy.settings.Settings` object.
    
        This class shouldn't be needed (since Scrapy is responsible of using it
        accordingly) unless writing scripts that manually handle the crawling
        process. See :ref:`run-from-script` for an example.
        """
        crawlers = property(
            lambda self: self._crawlers,
            doc="Set of :class:`crawlers <scrapy.crawler.Crawler>` started by "
                ":meth:`crawl` and managed by this class."
        )
    
        def __init__(self, settings=None):
            if isinstance(settings, dict) or settings is None:
                settings = Settings(settings)
            self.settings = settings
            self.spider_loader = _get_spider_loader(settings)
            self._crawlers = set()
            self._active = set()
    
        @property
        def spiders(self):
            warnings.warn("CrawlerRunner.spiders attribute is renamed to "
                          "CrawlerRunner.spider_loader.",
                          category=ScrapyDeprecationWarning, stacklevel=2)
            return self.spider_loader
    
        def crawl(self, crawler_or_spidercls, *args, **kwargs):
            
            crawler = self.create_crawler(crawler_or_spidercls)
            return self._crawl(crawler, *args, **kwargs)
    
        def _crawl(self, crawler, *args, **kwargs):
            self.crawlers.add(crawler)
            d = crawler.crawl(*args, **kwargs)
            self._active.add(d)
    
            def _done(result):
                self.crawlers.discard(crawler)
                self._active.discard(d)
                return result
    
            return d.addBoth(_done)
    
        def create_crawler(self, crawler_or_spidercls):
    
            if isinstance(crawler_or_spidercls, Crawler):
                return crawler_or_spidercls
            return self._create_crawler(crawler_or_spidercls)
    
        def _create_crawler(self, spidercls):
            if isinstance(spidercls, six.string_types):
                spidercls = self.spider_loader.load(spidercls)
            return Crawler(spidercls, self.settings)
    
        def stop(self):
            """
            Stops simultaneously all the crawling jobs taking place.
    
            Returns a deferred that is fired when they all have ended.
            """
            return defer.DeferredList([c.stop() for c in list(self.crawlers)])
    
        @defer.inlineCallbacks
        def join(self):
            """
            join()
    
            Returns a deferred that is fired when all managed :attr:`crawlers` have
            completed their executions.
            """
            while self._active:
                yield defer.DeferredList(self._active)
    

    作者写的注释:This is a convenient helper class that keeps track of, manages and runs crawlers inside an already setup Twisted `reactor',看来是找对地方了。。先看一下初始化方法:__init__,这方法主要是设置了下setting,配置一个spider_loader,还有两个set。这个spider_loader顾名思义是用来加载爬虫的,根据入参类型,将我们传入的spider名字加载成scrapy.crawler.Crawler类实例。_active集合是正在爬取的爬虫集合。
    终于开始爬了,crawl(self, crawler_or_spidercls, *args, **kwargs)方法先是创建一个scrapy.crawler.Crawler实例,后面create_crawler_create_crawler方法都是为这一步服务的,然后再调用这个方法:

    def _crawl(self, crawler, *args, **kwargs):
            self.crawlers.add(crawler)
            d = crawler.crawl(*args, **kwargs)
            self._active.add(d)
    
            def _done(result):
                self.crawlers.discard(crawler)
                self._active.discard(d)
                return result
    
            return d.addBoth(_done)
    

    代码一部分是对前面两个set进行操作,更新爬虫状态。剩下是这一句d = crawler.crawl(*args, **kwargs),调用scrapy.crawler.Crawler的crawl方法,返回一个Deferred对象,又是一个入口,我们进去看下:

        @defer.inlineCallbacks
        def crawl(self, *args, **kwargs):
            assert not self.crawling, "Crawling already taking place"
            self.crawling = True
    
            try:
                self.spider = self._create_spider(*args, **kwargs)
                self.engine = self._create_engine()
                start_requests = iter(self.spider.start_requests())
                yield self.engine.open_spider(self.spider, start_requests)
                yield defer.maybeDeferred(self.engine.start)
            except Exception:
                # In Python 2 reraising an exception after yield discards
                # the original traceback (see https://bugs.python.org/issue7563),
                # so sys.exc_info() workaround is used.
                # This workaround also works in Python 3, but it is not needed,
                # and it is slower, so in Python 3 we use native `raise`.
                if six.PY2:
                    exc_info = sys.exc_info()
    
                self.crawling = False
                if self.engine is not None:
                    yield self.engine.close()
    
                if six.PY2:
                    six.reraise(*exc_info)
                raise
    

    终于看到架构概览上的组件名称了,从头开始分析:首先对爬虫状态做一个判断,没问题后修改状态,然后重头戏开始:首先是创建了spider和engine两个对象,然后将我们设置的start_url构造为requests作为参数传入engine.open_spider,接下来分析engine.open_spider函数:

        @defer.inlineCallbacks
        def open_spider(self, spider, start_requests=(), close_if_idle=True):
            assert self.has_capacity(), "No free spider slot when opening %r" % \
                spider.name
            logger.info("Spider opened", extra={'spider': spider})
            nextcall = CallLaterOnce(self._next_request, spider)
            scheduler = self.scheduler_cls.from_crawler(self.crawler)
            start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
            slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
            self.slot = slot
            self.spider = spider
            yield scheduler.open(spider)
            yield self.scraper.open_spider(spider)
            self.crawler.stats.open_spider(spider)
            yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
            slot.nextcall.schedule()
            slot.heartbeat.start(5)
    

    第一步构建了CallLaterOnce对象,该对象十分重要,是实现循环生成Request的关键,我们看一下这个函数:

    class CallLaterOnce(object):
    
        def __init__(self, func, *a, **kw):
            self._func = func
            self._a = a
            self._kw = kw
            self._call = None
        def schedule(self, delay=0):
            if self._call is None:
                self._call = reactor.callLater(delay, self)
        def cancel(self):
            if self._call:
                self._call.cancel()
        def __call__(self):
            self._call = None
            return self._func(*self._a, **self._kw)
    

    初始化的时候会传入一个函数(我们这里传入_next_request),schedule函数向reactor注册一个延迟回调,回调函数是它本身,那么调用的时候就会执行call方法,该方法调用了传入的函数func(这里是_next_request)。那么这个CallLaterOnce这个函数,我们每次调用一下schedule,就会向reactor注册一个延时调用,也就是调用一次_next_request方法,这样就可以通过schedule实现循环执行_next_request方法。下面是_next_request方法:

    def _next_request(self, spider):
    
        slot = self.slot
        if not slot:
            return
        if self.paused:
            return
        # 是否等待,取决于一下几个条件
        # 1. slot是否关闭
        # 2. Engine是否停止
        # 3. scraper中的response对象超过设值
        # 4. downloader中下载的请求数是否超过设值
        while not self._needs_backout(spider):
            # 第一次取request时,是没有的,第一次一定是执行start_url生成的请求
            # 以后取值会进到这里来
            if not self._next_request_from_scheduler(spider):
                break
        # 如果start_requests有数据且不需要等待
        if slot.start_requests and not self._needs_backout(spider):
            try:
                # 获取下一个种子请求
                request = next(slot.start_requests)
            except StopIteration:
                slot.start_requests = None
            except Exception:
                slot.start_requests = None
                logger.error('Error while obtaining start requests',
                             exc_info=True, extra={'spider': spider})
            else:
                # 这里并没有实际发起请求,只把request放到schedule队列中
                self.crawl(request, spider)
        if self.spider_is_idle(spider) and slot.close_if_idle:
            self._spider_idle(spider)
    

    上面代码已经添加了一些注释,真正发起请求的代码在_next_request_from_scheduler方法中,下面看它的代码:

    def _next_request_from_scheduler(self, spider):
        slot = self.slot
        # 获取下一个request
        request = slot.scheduler.next_request()
        if not request:
            return
        # 下载
        d = self._download(request, spider)
        # 注册回掉方法
        d.addBoth(self._handle_downloader_output, request, spider)
        d.addErrback(lambda f: logger.info('Error while handling downloader output',
                                           exc_info=failure_to_exc_info(f),
                                           extra={'spider': spider}))
        d.addBoth(lambda _: slot.remove_request(request))
        d.addErrback(lambda f: logger.info('Error while removing request from slot',
                                           exc_info=failure_to_exc_info(f),
                                           extra={'spider': spider}))
        d.addBoth(lambda _: slot.nextcall.schedule())
        d.addErrback(lambda f: logger.info('Error while scheduling new request',
                                           exc_info=failure_to_exc_info(f),
                                           extra={'spider': spider}))
        return d
    

    这的代码很清晰,我们直接看_download方法:

    def _download(self, request, spider):
        slot = self.slot
        slot.add_request(request)
        def _on_success(response):
            # 下载成功的回调方法
            assert isinstance(response, (Response, Request))
            if isinstance(response, Response):
                # 如果下载后结果为Response,返回Response
                response.request = request
                logkws = self.logformatter.crawled(request, response, spider)
                logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
                self.signals.send_catch_log(signal=signals.response_received, \
                    response=response, request=request, spider=spider)
            return response
        def _on_complete(_):
            # 下载完成的回调,不管如何进行下一次调度
            slot.nextcall.schedule()
            return _
        # 调用Downloader下载
        dwld = self.downloader.fetch(request, spider)
        # 注册回调函数
        dwld.addCallbacks(_on_success)
        dwld.addBoth(_on_complete)
        return dwld
    

    是对downloader的一些包装,可以看我写的注释,这里不多解释,直接看downloader.fetch函数:

    def fetch(self, request, spider):
        def _deactivate(response):
            self.active.remove(request)
            return response
        self.active.add(request)
        # 调用downloader middleaware
        dfd = self.middleware.download(self._enqueue_request, request, spider)
        # 注册结束回调
        return dfd.addBoth(_deactivate)
    

    倒数第三行调用了downloader middleaware的download方法:

    def download(self, download_func, request, spider):
        @defer.inlineCallbacks
        # 这里是对所有注册的下载器中间件的process_request方法做处理
        def process_request(request):
            # 依次执行下载器中间件中的process_request函数,对返回值做一个判断,如果返回为None,继续其他函数,否则直接返回
            for method in self.methods['process_request']:
                response = yield method(request=request, spider=spider)
                assert response is None or isinstance(response, (Response, Request)), \
                        'Middleware %s.process_request must return None, Response or Request, got %s' % \
                        (six.get_method_self(method).__class__.__name__, response.__class__.__name__)
                if response:
                    defer.returnValue(response)
            # 如果没有返回值,执行download handler的下载方法(直接去下载)
            defer.returnValue((yield download_func(request=request,spider=spider)))
        # 这里是对所有注册的下载器中间件的process_response方法做处理(实现逻辑跟上面的差不多)
        @defer.inlineCallbacks
        def process_response(response):
            assert response is not None, 'Received None in process_response'
            if isinstance(response, Request):
                defer.returnValue(response)
            # 如果下载器中间件有定义process_response,则依次执行
            for method in self.methods['process_response']:
                response = yield method(request=request, response=response,
                                        spider=spider)
                assert isinstance(response, (Response, Request)), \
                    'Middleware %s.process_response must return Response or Request, got %s' % \
                    (six.get_method_self(method).__class__.__name__, type(response))
                if isinstance(response, Request):
                    defer.returnValue(response)
            defer.returnValue(response)
        # 这里是对所有注册的下载器中间件的process_exception方法做处理
        @defer.inlineCallbacks
        def process_exception(_failure):
            exception = _failure.value
            # 如果下载器中间件有定义process_exception,则依次执行
            for method in self.methods['process_exception']:
                response = yield method(request=request, exception=exception,
                                        spider=spider)
                assert response is None or isinstance(response, (Response, Request)), \
                    'Middleware %s.process_exception must return None, Response or Request, got %s' % \
                    (six.get_method_self(method).__class__.__name__, type(response))
                if response:
                    defer.returnValue(response)
            defer.returnValue(_failure)
        # 注册执行、错误、回调方法
        deferred = mustbe_deferred(process_request, request)
        deferred.addErrback(process_exception)
        deferred.addCallback(process_response)
        return deferred
    

    代码很长,但逻辑比较清楚,请看注释。这里主要处理下载器中间件中声明的一系列方法,假如这一切都顺利(所有中间件process_request方法返回None)通过中间件后应该去下载器下载页面了,代码可以执行到defer.returnValue((yield download_func(request=request,spider=spider)))这里,我们之前传入的download_func为self._enqueue_request,那么看一下这个函数吧

        def _enqueue_request(self, request, spider):
            key, slot = self._get_slot(request, spider)
            request.meta['download_slot'] = key
    
            # 处理完成的回调函数
            def _deactivate(response):
                slot.active.remove(request)
                return response
    
            slot.active.add(request)
            deferred = defer.Deferred().addBoth(_deactivate)
            slot.queue.append((request, deferred))
            self._process_queue(spider, slot)
            return deferred
    

    按照我们的设想,这里应该是下载页面的部分 ,那么这个函数就是下载请求队列了,每一个通过一系列下载器中间件的请求,都被入队到这里,并且被_process_queue这个函数拿去执行,来看下它:

        def _process_queue(self, spider, slot):
            if slot.latercall and slot.latercall.active():
                return
    
            # 如果设置了download_delay,就延迟处理队列
            now = time()
            delay = slot.download_delay()
            if delay:
                penalty = delay - now + slot.lastseen
                if penalty > 0:
                    slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
                    return
    
            # Process enqueued requests if there are free slots to transfer for this slot
            # 下载请求
            while slot.queue and slot.free_transfer_slots() > 0:
                slot.lastseen = now
                request, deferred = slot.queue.popleft()
                dfd = self._download(slot, request, spider)
                dfd.chainDeferred(deferred)
                # prevent burst if inter-request delays were configured
                if delay:
                    self._process_queue(spider, slot)
                    break
    

    在下载请求前做了download_delay的处理,通过_download函数下载:

        def _download(self, slot, request, spider):
            # The order is very important for the following deferreds. Do not change!
    
            # 1. Create the download deferred
            dfd = mustbe_deferred(self.handlers.download_request, request, spider)
    
            # 2. Notify response_downloaded listeners about the recent download
            # before querying queue for next request
            def _downloaded(response):
                self.signals.send_catch_log(signal=signals.response_downloaded,
                                            response=response,
                                            request=request,
                                            spider=spider)
                return response
            dfd.addCallback(_downloaded)
    
            # 3. After response arrives,  remove the request from transferring
            # state to free up the transferring slot so it can be used by the
            # following requests (perhaps those which came from the downloader
            # middleware itself)
            slot.transferring.add(request)
    
            def finish_transferring(_):
                slot.transferring.remove(request)
                self._process_queue(spider, slot)
                return _
    
            return dfd.addBoth(finish_transferring)
    

    终于是调用到了download handler,这里给它添加了一些回调,完成下载后去下载队列中另外的请求。
    一个请求下载完成后,是如何交给Item Pipeline处理的呢?response经过reactor事件管理中心的层层回调,回到了这里:

        def _next_request_from_scheduler(self, spider):
            slot = self.slot
            request = slot.scheduler.next_request()
            if not request:
                return
            d = self._download(request, spider)
            # 添加一个回调 _handle_downloader_output 来处理downloader的输出
            d.addBoth(self._handle_downloader_output, request, spider)
            d.addErrback(lambda f: logger.info('Error while handling downloader output',
                                               exc_info=failure_to_exc_info(f),
                                               extra={'spider': spider}))
            d.addBoth(lambda _: slot.remove_request(request))
            d.addErrback(lambda f: logger.info('Error while removing request from slot',
                                               exc_info=failure_to_exc_info(f),
                                               extra={'spider': spider}))
            d.addBoth(lambda _: slot.nextcall.schedule())
            d.addErrback(lambda f: logger.info('Error while scheduling new request',
                                               exc_info=failure_to_exc_info(f),
                                               extra={'spider': spider}))
            return d
    

    看到注释的部分了吗?这里添加了一个回调来处理response,在这里面完成了Item Pipelines的process_item方法:

        def _handle_downloader_output(self, response, request, spider):
            assert isinstance(response, (Request, Response, Failure)), response
            # 如果downloader middleware返回的是一个request对象,就把这个request重新调度
            if isinstance(response, Request):
                self.crawl(response, spider)
                return
            # response is a Response or Failure
            d = self.scraper.enqueue_scrape(response, request, spider)
            d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
                                                exc_info=failure_to_exc_info(f),
                                                extra={'spider': spider}))
            return d
    

    这里对返回的response做了下判断,见注释。如果是真的Response对象,就把他交给scraper来处理,下面是一系列scrape相关的函数:

        def enqueue_scrape(self, response, request, spider):
            slot = self.slot
            dfd = slot.add_response_request(response, request)
            def finish_scraping(_):
                slot.finish_response(response, request)
                self._check_if_closing(spider, slot)
                self._scrape_next(spider, slot)
                return _
            dfd.addBoth(finish_scraping)
            dfd.addErrback(
                lambda f: logger.error('Scraper bug processing %(request)s',
                                       {'request': request},
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': spider}))
            self._scrape_next(spider, slot)
            return dfd
    
        def _scrape_next(self, spider, slot):
            while slot.queue:
                response, request, deferred = slot.next_response_request_deferred()
                self._scrape(response, request, spider).chainDeferred(deferred)
    
        def _scrape(self, response, request, spider):
            """Handle the downloaded response or failure through the spider
            callback/errback"""
            assert isinstance(response, (Response, Failure))
    
            dfd = self._scrape2(response, request, spider) # returns spiders processed output
            dfd.addErrback(self.handle_spider_error, request, response, spider)
            dfd.addCallback(self.handle_spider_output, request, response, spider)
            return dfd
    
        def _scrape2(self, request_result, request, spider):
            """Handle the different cases of request's result been a Response or a
            Failure"""
            if not isinstance(request_result, Failure):
                return self.spidermw.scrape_response(
                    self.call_spider, request_result, request, spider)
            else:
                # FIXME: don't ignore errors in spider middleware
                dfd = self.call_spider(request_result, request, spider)
                return dfd.addErrback(
                    self._log_download_errors, request_result, request, spider)
    
        def handle_spider_output(self, result, request, response, spider):
            if not result:
                return defer_succeed(None)
            it = iter_errback(result, self.handle_spider_error, request, response, spider)
            dfd = parallel(it, self.concurrent_items,
                self._process_spidermw_output, request, response, spider)
            return dfd
    
        def _process_spidermw_output(self, output, request, response, spider):
            """Process each Request/Item (given in the output parameter) returned
            from the given spider
            """
            if isinstance(output, Request):
                self.crawler.engine.crawl(request=output, spider=spider)
            elif isinstance(output, (BaseItem, dict)):
                self.slot.itemproc_size += 1
                # 重点在这里,调用process_item方法
                dfd = self.itemproc.process_item(output, spider)
                dfd.addBoth(self._itemproc_finished, output, response, spider)
                return dfd
            elif output is None:
                pass
            else:
                typename = type(output).__name__
                logger.error('Spider must return Request, BaseItem, dict or None, '
                             'got %(typename)r in %(request)s',
                             {'request': request, 'typename': typename},
                             extra={'spider': spider})
    

    跟下载器中间件一个套路,只不过这里用的是迭代器,就不解释了,各位看注释就很明了了。还有一个重要的函数是call_spider,这个函数的作用就是跟爬虫模块交互,它可以帮我们调用Spider.parse函数(默认),也可以调用request.callback。
    至此,scrapy的一个请求的生命周期已经走完了,被schedule从数据库/硬盘/内存/redis加载,交给downloader下载,完成后交给spider和item pipeline,中间由engine统揽全局。当然scrapy远不止这么简单,本文仅仅介绍了最简单的数据流转,后续还会介绍一些更高级的功能。

    相关文章

      网友评论

          本文标题:Scrapy数据流转分析(三)

          本文链接:https://www.haomeiwen.com/subject/eerpyftx.html