美文网首页
scrapy入门(二)

scrapy入门(二)

作者: 听你讲故事啊 | 来源:发表于2019-03-27 16:11 被阅读0次

    暂停和重启

    https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/jobs.html

    要启用持久化支持,你只需要通过 JOBDIR 设置 job directory 选项。这个路径将会存储 所有的请求数据来保持一个单独任务的状态(例如:一次spider爬取(a spider run))。必须要注意的是,这个目录不允许被不同的spider 共享,甚至是同一个spider的不同jobs/runs也不行。也就是说,这个目录就是存储一个 单独 job的状态信息。

    scrapy crawl somespider -s JOBDIR=crawls/somespider-1
    

    然后,你就能在任何时候安全地停止爬虫(按Ctrl-C或者发送一个信号)。恢复这个爬虫也是同样的命令:

    scrapy crawl somespider -s JOBDIR=crawls/somespider-1
    

    去重原理

    在scrapy源码中找到scrapy/dupefilters.py文件,部分源码

    class RFPDupeFilter(BaseDupeFilter):
        """Request Fingerprint duplicates filter"""
    
        def __init__(self, path=None, debug=False):
            self.file = None
            self.fingerprints = set()
            self.logdupes = True
            self.debug = debug
            self.logger = logging.getLogger(__name__)
            if path:
                self.file = open(os.path.join(path, 'requests.seen'), 'a+')
                self.file.seek(0)
                self.fingerprints.update(x.rstrip() for x in self.file)
    
        @classmethod
        def from_settings(cls, settings):
            debug = settings.getbool('DUPEFILTER_DEBUG')
            return cls(job_dir(settings), debug)
    
        def request_seen(self, request):
            fp = self.request_fingerprint(request)
            if fp in self.fingerprints:
                return True
            self.fingerprints.add(fp)
            if self.file:
                self.file.write(fp + os.linesep)
    
        def request_fingerprint(self, request):
            return request_fingerprint(request)
    
        def close(self, reason):
            if self.file:
                self.file.close()
    
        def log(self, request, spider):
            if self.debug:
                msg = "Filtered duplicate request: %(request)s"
                self.logger.debug(msg, {'request': request}, extra={'spider': spider})
            elif self.logdupes:
                msg = ("Filtered duplicate request: %(request)s"
                       " - no more duplicates will be shown"
                       " (see DUPEFILTER_DEBUG to show all duplicates)")
                self.logger.debug(msg, {'request': request}, extra={'spider': spider})
                self.logdupes = False
    
            spider.crawler.stats.inc_value('dupefilter/filtered', spider=spider)
    

    里面有一个request_seen方法,这个方法在scrapy/core/scheduler.py中被调用

    class Scheduler(object):
        ...
    
        def enqueue_request(self, request):
            if not request.dont_filter and self.df.request_seen(request):
                self.df.log(request, self.spider)
                return False
            dqok = self._dqpush(request)
            if dqok:
                self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)
            else:
                self._mqpush(request)
                self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)
            self.stats.inc_value('scheduler/enqueued', spider=self.spider)
            return True
        ...
    

    回到request_seen方法继续查看

        def request_seen(self, request):
            fp = self.request_fingerprint(request)
            if fp in self.fingerprints:
                return True
            self.fingerprints.add(fp)
            if self.file:
                self.file.write(fp + os.linesep)
    
        # 返回的`request_fingerprint`是`from scrapy.utils.request import request_fingerprint`            
        def request_fingerprint(self, request):
            return request_fingerprint(request)
    

    scrapy\utils\request.py 这个函数将request进行hash,最后生成摘要(fp.hexdigest())

    def request_fingerprint(request, include_headers=None):
        """
        Return the request fingerprint.
    
        The request fingerprint is a hash that uniquely identifies the resource the
        request points to. For example, take the following two urls:
    
        http://www.example.com/query?id=111&cat=222
        http://www.example.com/query?cat=222&id=111
    
        Even though those are two different URLs both point to the same resource
        and are equivalent (ie. they should return the same response).
    
        Another example are cookies used to store session ids. Suppose the
        following page is only accesible to authenticated users:
    
        http://www.example.com/members/offers.html
    
        Lot of sites use a cookie to store the session id, which adds a random
        component to the HTTP Request and thus should be ignored when calculating
        the fingerprint.
    
        For this reason, request headers are ignored by default when calculating
        the fingeprint. If you want to include specific headers use the
        include_headers argument, which is a list of Request headers to include.
    
        """
        if include_headers:
            include_headers = tuple(to_bytes(h.lower())
                                     for h in sorted(include_headers))
        cache = _fingerprint_cache.setdefault(request, {})
        if include_headers not in cache:
            fp = hashlib.sha1()
            fp.update(to_bytes(request.method))
            fp.update(to_bytes(canonicalize_url(request.url)))
            fp.update(request.body or b'')
            if include_headers:
                for hdr in include_headers:
                    if hdr in request.headers:
                        fp.update(hdr)
                        for v in request.headers.getlist(hdr):
                            fp.update(v)
            cache[include_headers] = fp.hexdigest()
        return cache[include_headers]
    

    我们可以看到,去重指纹是sha1(method + url + body + header)
    所以,实际能够去掉重复的比例并不大。
    如果我们需要自己提取去重的finger,需要自己实现Filter,并配置上它。
    下面这个Filter只根据url去重:

    from scrapy.dupefilter import RFPDupeFilter
    class SeenURLFilter(RFPDupeFilter):
          """A dupe filter that considers the URL"""
          def __init__(self, path=None):
            self.urls_seen = set()
            RFPDupeFilter.__init__(self, path)
          def request_seen(self, request):
            if request.url in self.urls_seen:
                  return True
            else:
                  self.urls_seen.add(request.url)
    

    不要忘记配置上:

    DUPEFILTER_CLASS ='scraper.custom_filters.SeenURLFilter'
    

    Telnet

    https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/telnetconsole.html

    Scrapy运行的有telnet服务,我们可以通过这个功能来得到一些性能指标。通过telnet命令连接到6023端口,然后就会得到一个在爬虫内部环境的Python命令行。要小心的是,如果你在这里运行了一些阻塞的操作,比如time.sleep(),正在运行的爬虫就会被中止。通过内建的est()函数可以打印出一些性能指标。
    打开第一个命令行,运行以下代码:

    $ telnet localhost 6023
     
    >>> est()
    ...
    len(engine.downloader.active) : 16
    ...
    len(engine.slot.scheduler.mqs) : 4475
    ...
    len(engine.scraper.slot.active) : 115
    engine.scraper.slot.active_size : 117760
    engine.scraper.slot.itemproc_size : 105
    

    在这里我们忽略了dqs指标,如果你启用了持久化支持的功能,亦即设置了JOBDIR设置项,你也
    会得到非零的dqs(len(engine.slot.scheduler.dqs)
    )值,这时候就应当把dqs加到mqs上去,以便后续的分析。

    • mqs
      意味着在调度器中有很多请求等待处理(4475个请求)。这是没问题的。
    • len(engine.downloader.active)
      表示着现在有16个请求正被下载器下载。这和我们设置的CONCURRENT_REQUESTS值是一样的,所以也没问题。
    • len(engine.scraper.slot.active)
      告诉我们现在正有115个响应在scraper中处理,这些响应的总的大小可以从engine.scraper.slot.active_size指标得到,共是115kb。除了这些响应,pipeline中正有105个
    • Item
      被处理——从engine.scraper.slot.itemproc_size中得知,也就是说,还有10个正在爬虫中进行处理。总的来说,可以确定下载器就是系统的瓶颈,因为在下载器之前有很多请求(mqs)在队列中等待处理,下载器已经被充分地利用了;在下载器之后,我们有一个或多或少比较很稳定的工作量(可以通过多次调用est()函数来证实这一点)。

    另一个信息来源是stats对象,它一般情况下会在爬虫运行结束后打印出来。而在telnet中,我们可以随时通过stats.get_stats()得到一个dict
    对象,并用p()函数打印出来:

    $ p(stats.get_stats())
    {'downloader/request_bytes': 558330,
    ...
        'item_scraped_count': 2485,
    ...}
    

    数据收集

    https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/stats.html

    scrapy/statscollectors.py

    """
    Scrapy extension for collecting scraping stats
    """
    import pprint
    import logging
    
    logger = logging.getLogger(__name__)
    
    
    class StatsCollector(object):
    
        def __init__(self, crawler):
            self._dump = crawler.settings.getbool('STATS_DUMP')
            self._stats = {}
    
        def get_value(self, key, default=None, spider=None):
            return self._stats.get(key, default)
    
        def get_stats(self, spider=None):
            return self._stats
    
        def set_value(self, key, value, spider=None):
            self._stats[key] = value
    
        def set_stats(self, stats, spider=None):
            self._stats = stats
    
        def inc_value(self, key, count=1, start=0, spider=None):
            d = self._stats
            d[key] = d.setdefault(key, start) + count
    
        def max_value(self, key, value, spider=None):
            self._stats[key] = max(self._stats.setdefault(key, value), value)
    
        def min_value(self, key, value, spider=None):
            self._stats[key] = min(self._stats.setdefault(key, value), value)
    
        def clear_stats(self, spider=None):
            self._stats.clear()
    
        def open_spider(self, spider):
            pass
    
        def close_spider(self, spider, reason):
            if self._dump:
                logger.info("Dumping Scrapy stats:\n" + pprint.pformat(self._stats),
                            extra={'spider': spider})
            self._persist_stats(self._stats, spider)
    
        def _persist_stats(self, stats, spider):
            pass
    
    
    class MemoryStatsCollector(StatsCollector):
    
        def __init__(self, crawler):
            super(MemoryStatsCollector, self).__init__(crawler)
            self.spider_stats = {}
    
        def _persist_stats(self, stats, spider):
            self.spider_stats[spider.name] = stats
    
    
    class DummyStatsCollector(StatsCollector):
    
        def get_value(self, key, default=None, spider=None):
            return default
    
        def set_value(self, key, value, spider=None):
            pass
    
        def set_stats(self, stats, spider=None):
            pass
    
        def inc_value(self, key, count=1, start=0, spider=None):
            pass
    
        def max_value(self, key, value, spider=None):
            pass
    
        def min_value(self, key, value, spider=None):
            pass
    

    404页面收集

    class JobboleSpider(scrapy.Spider):
        name = 'jobbole'
        allowed_domains = ['blog.jobbole.com']
        start_urls = ['http://blog.jobbole.com/all-posts/']
    
    
        # 收集404的url和数量
        handle_httpstatus_list = [404,]
    
        def __init__(self):
            self.fail_urls = []
            super(JobboleSpider, self).__init__()
    
    
        def parse(self, response):
    
            if response.status == 404:
                self.fail_urls.append(response.url)
                self.crawler.stats.inc_value('failed_url')
            ...
    

    信号

    https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/signals.html

    在spider关闭时对fail_urls进行处理

        def __init__(self):
            self.fail_urls = []
            super(JobboleSpider, self).__init__()
            dispatcher.connect(self.handle_spider_closed, signal=signals.spider_closed)
    
        def handle_spider_closed(self, spider, response):
            self.crawler.stats.set_value('failed_urls', ','.join(self.fail_urls))
            ...
    

    扩展

    https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/extensions.html

    scrapy/extensions包里有一些扩展实例

    分布式爬虫

    http://doc.scrapy.org/en/master/topics/practices.html#distributed-crawls

    Scrapy并没有提供内置的机制支持分布式(多服务器)爬取。不过还是有办法进行分布式爬取, 取决于您要怎么分布了。
    如果您有很多spider,那分布负载最简单的办法就是启动多个Scrapyd,并分配到不同机器上。
    如果想要在多个机器上运行一个单独的spider,那您可以将要爬取的url进行分块,并发送给spider。 例如:
    首先,准备要爬取的url列表,并分配到不同的文件url里:

    http://somedomain.com/urls-to-crawl/spider1/part1.list
    http://somedomain.com/urls-to-crawl/spider1/part2.list
    http://somedomain.com/urls-to-crawl/spider1/part3.list
    

    接着在3个不同的Scrapd服务器中启动spider。spider会接收一个(spider)参数 part , 该参数表示要爬取的分块:

    curl http://scrapy1.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=1
    curl http://scrapy2.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=2
    curl http://scrapy3.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=3
    

    scrapy-redis分布式爬虫

    https://github.com/rmax/scrapy-redis
    Redis 命令参考
    http://redisdoc.com/

    pip install scrapy-redis
    

    Scrapy 是一个通用的爬虫框架,但是不支持分布式,Scrapy-redis是为了更方便地实现Scrapy分布式爬取,而提供了一些以redis为基础的组件(仅有组件)。

    Scrapy-redis提供了下面四种组件(components):(四种组件意味着这四个模块都要做相应的修改)

    • Scheduler
    • Duplication Filter
    • Item Pipeline
    • Base Spider
    image

    如上图所⽰示,scrapy-redis在scrapy的架构上增加了redis,基于redis的特性拓展了如下组件:

    • Scheduler
      Scrapy改造了python本来的collection.deque(双向队列)形成了自己的Scrapy queue(https://github.com/scrapy/queuelib/blob/master/queuelib/queue.py)),但是Scrapy多个spider不能共享待爬取队列Scrapy queue, 即Scrapy本身不支持爬虫分布式,scrapy-redis 的解决是把这个Scrapy queue换成redis数据库(也是指redis队列),从同一个redis-server存放要爬取的request,便能让多个spider去同一个数据库里读取。

    Scrapy中跟“待爬队列”直接相关的就是调度器Scheduler,它负责对新的request进行入列操作(加入Scrapy queue),取出下一个要爬取的request(从Scrapy queue中取出)等操作。它把待爬队列按照优先级建立了一个字典结构,比如:

        {
            优先级0 : 队列0
            优先级1 : 队列1
            优先级2 : 队列2
        }
    

    然后根据request中的优先级,来决定该入哪个队列,出列时则按优先级较小的优先出列。为了管理这个比较高级的队列字典,Scheduler需要提供一系列的方法。但是原来的Scheduler已经无法使用,所以使用Scrapy-redisscheduler组件。

    • Duplication Filter
      Scrapy中用集合实现这个request去重功能,Scrapy中把已经发送的request指纹放入到一个集合中,把下一个request的指纹拿到集合中比对,如果该指纹存在于集合中,说明这个request发送过了,如果没有则继续操作。这个核心的判重功能是这样实现的:
        def request_seen(self, request):
            # self.request_figerprints就是一个指纹集合  
            fp = self.request_fingerprint(request)
    
            # 这就是判重的核心操作  
            if fp in self.fingerprints:
                return True
            self.fingerprints.add(fp)
            if self.file:
                self.file.write(fp + os.linesep)
    

    scrapy-redis中去重是由Duplication Filter组件来实现的,它通过redisset不重复的特性,巧妙的实现了Duplication Filter去重。scrapy-redis调度器从引擎接受request,将request的指纹存⼊redisset检查是否重复,并将不重复的request push写⼊redisrequest queue

    引擎请求request(Spider发出的)时,调度器从redisrequest queue队列⾥里根据优先级pop 出⼀个request返回给引擎,引擎将此request发给spider处理。

    • Item Pipeline
      引擎将(Spider返回的)爬取到的Item给Item Pipelinescrapy-redisItem Pipeline将爬取到的 Item 存⼊redisitems queue
      修改后的Item Pipeline可以很方便的根据 keyitems queue提取item,从⽽实现 items processes集群。

    • Base Spider
      不在使用scrapy原有的Spider类,重写的RedisSpider继承了SpiderRedisMixin这两个类,RedisMixin是用来从redis读取url的类。
      当我们生成一个Spider继承RedisSpider时,调用setup_redis函数,这个函数会去连接redis数据库,然后会设置signals(信号):

    一个是当spider空闲时候的signal,会调用spider_idle函数,这个函数调用schedule_next_request函数,保证spider是一直活着的状态,并且抛出DontCloseSpider异常。

    一个是当抓到一个item时的signal,会调用item_scraped函数,这个函数会调用schedule_next_request函数,获取下一个request

    scrapy-redis源码分析参考

    scrapy-redis的源码并不多,工程的主体还是是redis和scrapy两个库,工程本身实现的东西不是很多,这个工程就像胶水一样,把这两个插件粘结了起来。下面我们来看看,scrapy-redis的每一个源代码文件都实现了什么功能,最后如何实现分布式的爬虫系统

    connection.py

    负责根据setting中配置实例化redis连接。被dupefilterscheduler调用,总之涉及到redis存取的都要使用到这个模块。

    import six
    
    from scrapy.utils.misc import load_object
    
    from . import defaults
    
    # 连接redis数据库
    # Shortcut maps 'setting name' -> 'parmater name'.
    SETTINGS_PARAMS_MAP = {
        'REDIS_URL': 'url',
        'REDIS_HOST': 'host',
        'REDIS_PORT': 'port',
        'REDIS_ENCODING': 'encoding',
    }
    
    
    def get_redis_from_settings(settings):
        """Returns a redis client instance from given Scrapy settings object.
    
        This function uses ``get_client`` to instantiate the client and uses
        ``defaults.REDIS_PARAMS`` global as defaults values for the parameters. You
        can override them using the ``REDIS_PARAMS`` setting.
    
        Parameters
        ----------
        settings : Settings
            A scrapy settings object. See the supported settings below.
    
        Returns
        -------
        server
            Redis client instance.
    
        Other Parameters
        ----------------
        REDIS_URL : str, optional
            Server connection URL.
        REDIS_HOST : str, optional
            Server host.
        REDIS_PORT : str, optional
            Server port.
        REDIS_ENCODING : str, optional
            Data encoding.
        REDIS_PARAMS : dict, optional
            Additional client parameters.
    
        """
        params = defaults.REDIS_PARAMS.copy()
        params.update(settings.getdict('REDIS_PARAMS'))
        # XXX: Deprecate REDIS_* settings.
        for source, dest in SETTINGS_PARAMS_MAP.items():
            val = settings.get(source)
            if val:
                params[dest] = val
    
        # Allow ``redis_cls`` to be a path to a class.
        if isinstance(params.get('redis_cls'), six.string_types):
            params['redis_cls'] = load_object(params['redis_cls'])
    
        return get_redis(**params)
    
    
    # Backwards compatible alias.
    from_settings = get_redis_from_settings
    
    
    def get_redis(**kwargs):
        """Returns a redis client instance.
    
        Parameters
        ----------
        redis_cls : class, optional
            Defaults to ``redis.StrictRedis``.
        url : str, optional
            If given, ``redis_cls.from_url`` is used to instantiate the class.
        **kwargs
            Extra parameters to be passed to the ``redis_cls`` class.
    
        Returns
        -------
        server
            Redis client instance.
    
        """
        redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS)
        url = kwargs.pop('url', None)
        if url:
            return redis_cls.from_url(url, **kwargs)
        else:
            return redis_cls(**kwargs)
    
    defaults.py

    scrapy-redis默认配置

    import redis
    
    
    # For standalone use.
    DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'
    
    PIPELINE_KEY = '%(spider)s:items'
    
    REDIS_CLS = redis.StrictRedis
    REDIS_ENCODING = 'utf-8'
    # Sane connection defaults.
    # 套接字的超时时间、等待时间等
    REDIS_PARAMS = {
        'socket_timeout': 30,
        'socket_connect_timeout': 30,
        'retry_on_timeout': True,
        'encoding': REDIS_ENCODING,
    }
    
    SCHEDULER_QUEUE_KEY = '%(spider)s:requests'
    SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
    SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
    SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
    
    START_URLS_KEY = '%(name)s:start_urls'
    START_URLS_AS_SET = False
    
    dupefilter.py

    负责执行requst的去重,实现的很有技巧性,使用redisset数据结构。但是注意scheduler并不使用其中用于在这个模块中实现的dupefilter键做request的调度,而是使用queue.py模块中实现的queue

    request不重复时,将其存入到queue中,调度时将其弹出。

    import logging
    import time
    
    from scrapy.dupefilters import BaseDupeFilter
    from scrapy.utils.request import request_fingerprint
    
    from . import defaults
    from .connection import get_redis_from_settings
    
    
    logger = logging.getLogger(__name__)
    
    
    # TODO: Rename class to RedisDupeFilter.
    class RFPDupeFilter(BaseDupeFilter):
        """Redis-based request duplicates filter.
    
        This class can also be used with default Scrapy's scheduler.
    
        """
    
        logger = logger
    
        def __init__(self, server, key, debug=False):
            """Initialize the duplicates filter.
    
            Parameters
            ----------
            server : redis.StrictRedis
                The redis server instance.
            key : str
                Redis key Where to store fingerprints.
            debug : bool, optional
                Whether to log filtered requests.
    
            """
            self.server = server
            self.key = key
            self.debug = debug
            self.logdupes = True
    
        @classmethod
        def from_settings(cls, settings):
            """Returns an instance from given settings.
    
            This uses by default the key ``dupefilter:<timestamp>``. When using the
            ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as
            it needs to pass the spider name in the key.
    
            Parameters
            ----------
            settings : scrapy.settings.Settings
    
            Returns
            -------
            RFPDupeFilter
                A RFPDupeFilter instance.
    
    
            """
            server = get_redis_from_settings(settings)
            # XXX: This creates one-time key. needed to support to use this
            # class as standalone dupefilter with scrapy's default scheduler
            # if scrapy passes spider on open() method this wouldn't be needed
            # TODO: Use SCRAPY_JOB env as default and fallback to timestamp.
            key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
            debug = settings.getbool('DUPEFILTER_DEBUG')
            return cls(server, key=key, debug=debug)
    
        @classmethod
        def from_crawler(cls, crawler):
            """Returns instance from crawler.
    
            Parameters
            ----------
            crawler : scrapy.crawler.Crawler
    
            Returns
            -------
            RFPDupeFilter
                Instance of RFPDupeFilter.
    
            """
            return cls.from_settings(crawler.settings)
    
        def request_seen(self, request):
            """Returns True if request was already seen.
    
            Parameters
            ----------
            request : scrapy.http.Request
    
            Returns
            -------
            bool
    
            """
            fp = self.request_fingerprint(request)
            # This returns the number of values added, zero if already exists.
            added = self.server.sadd(self.key, fp)
            return added == 0
    
        def request_fingerprint(self, request):
            """Returns a fingerprint for a given request.
    
            Parameters
            ----------
            request : scrapy.http.Request
    
            Returns
            -------
            str
    
            """
            return request_fingerprint(request)
    
        @classmethod
        def from_spider(cls, spider):
            settings = spider.settings
            server = get_redis_from_settings(settings)
            dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY)
            key = dupefilter_key % {'spider': spider.name}
            debug = settings.getbool('DUPEFILTER_DEBUG')
            return cls(server, key=key, debug=debug)
    
        def close(self, reason=''):
            """Delete data on close. Called by Scrapy's scheduler.
    
            Parameters
            ----------
            reason : str, optional
    
            """
            self.clear()
    
        def clear(self):
            """Clears fingerprints data."""
            self.server.delete(self.key)
    
        def log(self, request, spider):
            """Logs given request.
    
            Parameters
            ----------
            request : scrapy.http.Request
            spider : scrapy.spiders.Spider
    
            """
            if self.debug:
                msg = "Filtered duplicate request: %(request)s"
                self.logger.debug(msg, {'request': request}, extra={'spider': spider})
            elif self.logdupes:
                msg = ("Filtered duplicate request %(request)s"
                       " - no more duplicates will be shown"
                       " (see DUPEFILTER_DEBUG to show all duplicates)")
                self.logger.debug(msg, {'request': request}, extra={'spider': spider})
                self.logdupes = False
    

    这个文件看起来比较复杂,重写了scrapy本身已经实现的request判重功能。因为本身scrapy单机跑的话,只需要读取内存中的request队列或者持久化的request队列,就能判断这次要发出的request url是否已经请求过或者正在调度(本地读就行了)。而分布式跑的话,就需要各个主机上的scheduler都连接同一个数据库的同一个request池来判断这次的请求是否是重复的了。

    在这个文件中,通过继承BaseDupeFilter重写他的方法,实现了基于redis的判重。根据源代码来看,scrapy-redis使用了scrapy本身的一个fingerprintrequest_fingerprint,这个函数在前面去重原理中已经说过了.

    这个类通过连接redis,使用一个key来向redis的一个set中插入fingerprint(这个key对于同一种spider是相同的,redis是一个key-value的数据库,如果key是相同的,访问到的值就是相同的,这里使用spider名字+DupeFilter的key就是为了在不同主机上的不同爬虫实例,只要属于同一种spider,就会访问到同一个set,而这个set就是他们的url判重池),如果返回值为0,说明该set中该fingerprint已经存在(因为集合是没有重复值的),则返回False,如果返回值为1,说明添加了一个fingerprintset中,则说明这个request没有重复,于是返回True,还顺便把新fingerprint加入到数据库中了。 DupeFilter判重会在scheduler类中用到,每一个request在进入调度之前都要进行判重,如果重复就不需要参加调度,直接舍弃就好了,不然就是白白浪费资源。

    picklecompat.py
    """A pickle wrapper module with protocol=-1 by default."""
    
    try:
        import cPickle as pickle  # PY2
    except ImportError:
        import pickle
    
    
    def loads(s):
        return pickle.loads(s)
    
    
    def dumps(obj):
        return pickle.dumps(obj, protocol=-1)
    

    这里实现了loadsdumps两个函数,其实就是实现了一个序列化器。

    因为redis数据库不能存储复杂对象(key部分只能是字符串,value部分只能是字符串,字符串列表,字符串集合和hash),所以我们存啥都要先串行化成文本才行。

    这里使用的就是pythonpickle模块,一个兼容py2和py3的串行化工具。这个serializer主要用于一会的schedulerreuqest对象。

    pipelines.py

    这是是用来实现分布式处理的作用。它将Item存储在redis中以实现分布式处理。由于在这里需要读取配置,所以就用到了from_crawler()函数。

    from scrapy.utils.misc import load_object
    from scrapy.utils.serialize import ScrapyJSONEncoder
    from twisted.internet.threads import deferToThread
    
    from . import connection, defaults
    
    
    default_serialize = ScrapyJSONEncoder().encode
    
    
    class RedisPipeline(object):
        """Pushes serialized item into a redis list/queue
    
        Settings
        --------
        REDIS_ITEMS_KEY : str
            Redis key where to store items.
        REDIS_ITEMS_SERIALIZER : str
            Object path to serializer function.
    
        """
    
        def __init__(self, server,
                     key=defaults.PIPELINE_KEY,
                     serialize_func=default_serialize):
            """Initialize pipeline.
    
            Parameters
            ----------
            server : StrictRedis
                Redis client instance.
            key : str
                Redis key where to store items.
            serialize_func : callable
                Items serializer function.
    
            """
            self.server = server
            self.key = key
            self.serialize = serialize_func
    
        @classmethod
        def from_settings(cls, settings):
            params = {
                'server': connection.from_settings(settings),
            }
            if settings.get('REDIS_ITEMS_KEY'):
                params['key'] = settings['REDIS_ITEMS_KEY']
            if settings.get('REDIS_ITEMS_SERIALIZER'):
                params['serialize_func'] = load_object(
                    settings['REDIS_ITEMS_SERIALIZER']
                )
    
            return cls(**params)
    
        @classmethod
        def from_crawler(cls, crawler):
            return cls.from_settings(crawler.settings)
    
        def process_item(self, item, spider):
            return deferToThread(self._process_item, item, spider)
    
        def _process_item(self, item, spider):
            key = self.item_key(item, spider)
            data = self.serialize(item)
            self.server.rpush(key, data)
            return item
    
        def item_key(self, item, spider):
            """Returns redis key based on given spider.
    
            Override this function to use a different key depending on the item
            and/or spider.
    
            """
            return self.key % {'spider': spider.name}
    

    pipelines文件实现了一个item pipieline类,和scrapyitem pipeline是同一个对象,通过从settings中拿到我们配置的REDIS_ITEMS_KEY作为key,把item串行化之后存入redis数据库对应的value中(这个value可以看出出是个list,我们的每个item是这个list中的一个结点),这个pipeline把提取出的item存起来,主要是为了方便我们后续处理数据。(集中处理放在同一台服务器,还是各自保存各自的)

    queue.py

    该文件实现了几个容器类,这些容器与redis进行交互,在交互时,会对request请求进行编码和解码操作(序列化和反序列化)

    from scrapy.utils.reqser import request_to_dict, request_from_dict
    
    from . import picklecompat
    
    
    class Base(object):
        """Per-spider base queue class"""
    
        def __init__(self, server, spider, key, serializer=None):
            """Initialize per-spider redis queue.
    
            Parameters
            ----------
            server : StrictRedis
                Redis client instance.
            spider : Spider
                Scrapy spider instance.
            key: str
                Redis key where to put and get messages.
            serializer : object
                Serializer object with ``loads`` and ``dumps`` methods.
    
            """
            if serializer is None:
                # Backward compatibility.
                # TODO: deprecate pickle.
                serializer = picklecompat
            if not hasattr(serializer, 'loads'):
                raise TypeError("serializer does not implement 'loads' function: %r"
                                % serializer)
            if not hasattr(serializer, 'dumps'):
                raise TypeError("serializer '%s' does not implement 'dumps' function: %r"
                                % serializer)
    
            self.server = server
            self.spider = spider
            self.key = key % {'spider': spider.name}
            self.serializer = serializer
    
        def _encode_request(self, request):
            """Encode a request object"""
            obj = request_to_dict(request, self.spider)
            return self.serializer.dumps(obj)
    
        def _decode_request(self, encoded_request):
            """Decode an request previously encoded"""
            obj = self.serializer.loads(encoded_request)
            return request_from_dict(obj, self.spider)
    
        def __len__(self):
            """Return the length of the queue"""
            raise NotImplementedError
    
        def push(self, request):
            """Push a request"""
            raise NotImplementedError
    
        def pop(self, timeout=0):
            """Pop a request"""
            raise NotImplementedError
    
        def clear(self):
            """Clear queue/stack"""
            self.server.delete(self.key)
    
    # 先进先出, 队列
    class FifoQueue(Base):
        """Per-spider FIFO queue"""
    
        def __len__(self):
            """Return the length of the queue"""
            return self.server.llen(self.key)
    
        # 压头, 出尾 
        def push(self, request):
            """Push a request"""
            self.server.lpush(self.key, self._encode_request(request))
    
        def pop(self, timeout=0):
            """Pop a request"""
            if timeout > 0:
                data = self.server.brpop(self.key, timeout)
                if isinstance(data, tuple):
                    data = data[1]
            else:
                data = self.server.rpop(self.key)
            if data:
                return self._decode_request(data)
    
    # 有序队列
    class PriorityQueue(Base):
        """Per-spider priority queue abstraction using redis' sorted set"""
    
        def __len__(self):
            """Return the length of the queue"""
            return self.server.zcard(self.key)
    
        def push(self, request):
            """Push a request"""
            data = self._encode_request(request)
            score = -request.priority
            # We don't use zadd method as the order of arguments change depending on
            # whether the class is Redis or StrictRedis, and the option of using
            # kwargs only accepts strings, not bytes.
            self.server.execute_command('ZADD', self.key, score, data)
    
        def pop(self, timeout=0):
            """
            Pop a request
            timeout not support in this queue class
            """
            # use atomic range/remove using multi/exec
            pipe = self.server.pipeline()
            pipe.multi()
            pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
            results, count = pipe.execute()
            if results:
                return self._decode_request(results[0])
    
    # 后进先出 栈
    class LifoQueue(Base):
        """Per-spider LIFO queue."""
    
        def __len__(self):
            """Return the length of the stack"""
            return self.server.llen(self.key)
    
        # 压头, 出头
        def push(self, request):
            """Push a request"""
            self.server.lpush(self.key, self._encode_request(request))
    
        def pop(self, timeout=0):
            """Pop a request"""
            if timeout > 0:
                data = self.server.blpop(self.key, timeout)
                if isinstance(data, tuple):
                    data = data[1]
            else:
                data = self.server.lpop(self.key)
    
            if data:
                return self._decode_request(data)
    
    
    # TODO: Deprecate the use of these names.
    SpiderQueue = FifoQueue
    SpiderStack = LifoQueue
    SpiderPriorityQueue = PriorityQueue
    
    scheduler.py

    此扩展是对scrapy中自带的scheduler的替代(在settingsSCHEDULER变量中指出),正是利用此扩展实现crawler的分布式调度。其利用的数据结构来自于queue中实现的数据结构。

    scrapy-redis所实现的两种分布式:爬虫分布式以及item处理分布式就是由模块scheduler和模块pipelines实现。上述其它模块作为为二者辅助的功能模块

    import importlib
    import six
    
    from scrapy.utils.misc import load_object
    
    from . import connection, defaults
    
    
    # TODO: add SCRAPY_JOB support.
    class Scheduler(object):
        """Redis-based scheduler
    
        Settings
        --------
        SCHEDULER_PERSIST : bool (default: False)
            Whether to persist or clear redis queue.
        SCHEDULER_FLUSH_ON_START : bool (default: False)
            Whether to flush redis queue on start.
        SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)
            How many seconds to wait before closing if no message is received.
        SCHEDULER_QUEUE_KEY : str
            Scheduler redis key.
        SCHEDULER_QUEUE_CLASS : str
            Scheduler queue class.
        SCHEDULER_DUPEFILTER_KEY : str
            Scheduler dupefilter redis key.
        SCHEDULER_DUPEFILTER_CLASS : str
            Scheduler dupefilter class.
        SCHEDULER_SERIALIZER : str
            Scheduler serializer.
    
        """
    
        def __init__(self, server,
                     persist=False,
                     flush_on_start=False,
                     queue_key=defaults.SCHEDULER_QUEUE_KEY,
                     queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
                     dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
                     dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
                     idle_before_close=0,
                     serializer=None):
            """Initialize scheduler.
    
            Parameters
            ----------
            server : Redis
                The redis server instance.
            persist : bool
                Whether to flush requests when closing. Default is False.
            flush_on_start : bool
                Whether to flush requests on start. Default is False.
            queue_key : str
                Requests queue key.
            queue_cls : str
                Importable path to the queue class.
            dupefilter_key : str
                Duplicates filter key.
            dupefilter_cls : str
                Importable path to the dupefilter class.
            idle_before_close : int
                Timeout before giving up.
    
            """
            if idle_before_close < 0:
                raise TypeError("idle_before_close cannot be negative")
    
            self.server = server
            self.persist = persist
            self.flush_on_start = flush_on_start
            self.queue_key = queue_key
            self.queue_cls = queue_cls
            self.dupefilter_cls = dupefilter_cls
            self.dupefilter_key = dupefilter_key
            self.idle_before_close = idle_before_close
            self.serializer = serializer
            self.stats = None
    
        def __len__(self):
            return len(self.queue)
    
        @classmethod
        def from_settings(cls, settings):
            kwargs = {
                'persist': settings.getbool('SCHEDULER_PERSIST'),
                'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
                'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
            }
    
            # If these values are missing, it means we want to use the defaults.
            optional = {
                # TODO: Use custom prefixes for this settings to note that are
                # specific to scrapy-redis.
                'queue_key': 'SCHEDULER_QUEUE_KEY',
                'queue_cls': 'SCHEDULER_QUEUE_CLASS',
                'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
                # We use the default setting name to keep compatibility.
                'dupefilter_cls': 'DUPEFILTER_CLASS',
                'serializer': 'SCHEDULER_SERIALIZER',
            }
            for name, setting_name in optional.items():
                val = settings.get(setting_name)
                if val:
                    kwargs[name] = val
    
            # Support serializer as a path to a module.
            if isinstance(kwargs.get('serializer'), six.string_types):
                kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
    
            server = connection.from_settings(settings)
            # Ensure the connection is working.
            server.ping()
    
            return cls(server=server, **kwargs)
    
        @classmethod
        def from_crawler(cls, crawler):
            instance = cls.from_settings(crawler.settings)
            # FIXME: for now, stats are only supported from this constructor
            instance.stats = crawler.stats
            return instance
    
        def open(self, spider):
            self.spider = spider
    
            try:
                self.queue = load_object(self.queue_cls)(
                    server=self.server,
                    spider=spider,
                    key=self.queue_key % {'spider': spider.name},
                    serializer=self.serializer,
                )
            except TypeError as e:
                raise ValueError("Failed to instantiate queue class '%s': %s",
                                 self.queue_cls, e)
    
            self.df = load_object(self.dupefilter_cls).from_spider(spider)
    
            if self.flush_on_start:
                self.flush()
            # notice if there are requests already in the queue to resume the crawl
            if len(self.queue):
                spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
    
        def close(self, reason):
            if not self.persist:
                self.flush()
    
        def flush(self):
            self.df.clear()
            self.queue.clear()
    
        def enqueue_request(self, request):
            if not request.dont_filter and self.df.request_seen(request):
                self.df.log(request, self.spider)
                return False
            if self.stats:
                self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
            self.queue.push(request)
            return True
    
        def next_request(self):
            block_pop_timeout = self.idle_before_close
            request = self.queue.pop(block_pop_timeout)
            if request and self.stats:
                self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
            return request
    
        def has_pending_requests(self):
            return len(self) > 0
    

    这个文件重写了scheduler类,用来代替scrapy.core.scheduler的原有调度器。其实对原有调度器的逻辑没有很大的改变,主要是使用了redis作为数据存储的媒介,以达到各个爬虫之间的统一调度。scheduler负责调度各个spiderrequest请求scheduler初始化时,通过settings文件读取queuedupefilters的类型(一般就用上边默认的),配置queuedupefilters使用的key(一般就是spider name加上queue或者dupefilters,这样对于同一种spider的不同实例,就会使用相同的数据块了)。每当一个request要被调度时,enqueue_request被调用,scheduler使用dupefilters来判断这个url是否重复,如果不重复,就添加到queue的容器中(先进先出,先进后出和优先级都可以,可以在settings中配置)。当调度完成时,next_request被调用,scheduler就通过queue容器的接口,取出一个request,把他发送给相应的spider,让spider进行爬取工作。

    spiders.py

    设计的这个spider从redis中读取要爬的url,然后执行爬取,若爬取过程中返回更多的url,那么继续进行直至所有的request完成。之后继续从redis中读取url,循环这个过程。

    分析:在这个spider中通过signals.spider_idle(空闲)信号实现对crawler状态的监视。当idle时,返回新的make_requests_from_url(url)给引擎,进而交给调度器调度。

    from scrapy import signals
    from scrapy.exceptions import DontCloseSpider
    from scrapy.spiders import Spider, CrawlSpider
    
    from . import connection, defaults
    from .utils import bytes_to_str
    
    
    class RedisMixin(object):
        """Mixin class to implement reading urls from a redis queue."""
        redis_key = None
        redis_batch_size = None
        redis_encoding = None
    
        # Redis client placeholder.
        server = None
    
        def start_requests(self):
            """Returns a batch of start requests from redis."""
            return self.next_requests()
    
        def setup_redis(self, crawler=None):
            """Setup redis connection and idle signal.
    
            This should be called after the spider has set its crawler object.
            """
            if self.server is not None:
                return
    
            if crawler is None:
                # We allow optional crawler argument to keep backwards
                # compatibility.
                # XXX: Raise a deprecation warning.
                crawler = getattr(self, 'crawler', None)
    
            if crawler is None:
                raise ValueError("crawler is required")
    
            settings = crawler.settings
    
            if self.redis_key is None:
                self.redis_key = settings.get(
                    'REDIS_START_URLS_KEY', defaults.START_URLS_KEY,
                )
    
            self.redis_key = self.redis_key % {'name': self.name}
    
            if not self.redis_key.strip():
                raise ValueError("redis_key must not be empty")
    
            if self.redis_batch_size is None:
                # TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE).
                self.redis_batch_size = settings.getint(
                    'REDIS_START_URLS_BATCH_SIZE',
                    settings.getint('CONCURRENT_REQUESTS'),
                )
    
            try:
                self.redis_batch_size = int(self.redis_batch_size)
            except (TypeError, ValueError):
                raise ValueError("redis_batch_size must be an integer")
    
            if self.redis_encoding is None:
                self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING)
    
            self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
                             "(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",
                             self.__dict__)
    
            self.server = connection.from_settings(crawler.settings)
            # The idle signal is called when the spider has no requests left,
            # that's when we will schedule new requests from redis queue
            crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
    
        def next_requests(self):
            """Returns a request to be scheduled or none."""
            use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
            fetch_one = self.server.spop if use_set else self.server.lpop
            # XXX: Do we need to use a timeout here?
            found = 0
            # TODO: Use redis pipeline execution.
            while found < self.redis_batch_size:
                data = fetch_one(self.redis_key)
                if not data:
                    # Queue empty.
                    break
                req = self.make_request_from_data(data)
                if req:
                    yield req
                    found += 1
                else:
                    self.logger.debug("Request not made from data: %r", data)
    
            if found:
                self.logger.debug("Read %s requests from '%s'", found, self.redis_key)
    
        def make_request_from_data(self, data):
            """Returns a Request instance from data coming from Redis.
    
            By default, ``data`` is an encoded URL. You can override this method to
            provide your own message decoding.
    
            Parameters
            ----------
            data : bytes
                Message from redis.
    
            """
            url = bytes_to_str(data, self.redis_encoding)
            return self.make_requests_from_url(url)
    
        def schedule_next_requests(self):
            """Schedules a request if available"""
            # TODO: While there is capacity, schedule a batch of redis requests.
            for req in self.next_requests():
                self.crawler.engine.crawl(req, spider=self)
    
        def spider_idle(self):
            """Schedules a request if available, otherwise waits."""
            # XXX: Handle a sentinel to close the spider.
            self.schedule_next_requests()
            raise DontCloseSpider
    
    
    class RedisSpider(RedisMixin, Spider):
        """Spider that reads urls from redis queue when idle.
    
        Attributes
        ----------
        redis_key : str (default: REDIS_START_URLS_KEY)
            Redis key where to fetch start URLs from..
        redis_batch_size : int (default: CONCURRENT_REQUESTS)
            Number of messages to fetch from redis on each attempt.
        redis_encoding : str (default: REDIS_ENCODING)
            Encoding to use when decoding messages from redis queue.
    
        Settings
        --------
        REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
            Default Redis key where to fetch start URLs from..
        REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
            Default number of messages to fetch from redis on each attempt.
        REDIS_START_URLS_AS_SET : bool (default: False)
            Use SET operations to retrieve messages from the redis queue. If False,
            the messages are retrieve using the LPOP command.
        REDIS_ENCODING : str (default: "utf-8")
            Default encoding to use when decoding messages from redis queue.
    
        """
    
        @classmethod
        def from_crawler(self, crawler, *args, **kwargs):
            obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
            obj.setup_redis(crawler)
            return obj
    
    
    class RedisCrawlSpider(RedisMixin, CrawlSpider):
        """Spider that reads urls from redis queue when idle.
    
        Attributes
        ----------
        redis_key : str (default: REDIS_START_URLS_KEY)
            Redis key where to fetch start URLs from..
        redis_batch_size : int (default: CONCURRENT_REQUESTS)
            Number of messages to fetch from redis on each attempt.
        redis_encoding : str (default: REDIS_ENCODING)
            Encoding to use when decoding messages from redis queue.
    
        Settings
        --------
        REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
            Default Redis key where to fetch start URLs from..
        REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
            Default number of messages to fetch from redis on each attempt.
        REDIS_START_URLS_AS_SET : bool (default: True)
            Use SET operations to retrieve messages from the redis queue.
        REDIS_ENCODING : str (default: "utf-8")
            Default encoding to use when decoding messages from redis queue.
    
        """
    
        @classmethod
        def from_crawler(self, crawler, *args, **kwargs):
            obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)
            obj.setup_redis(crawler)
            return obj
    

    spider的改动也不是很大,主要是通过connect接口,给spider绑定了spider_idle信号,spider初始化时,通过setup_redis函数初始化和redis的连接,之后通过next_requests函数从redis中取出strat url,使用的keysettingsREDIS_START_URLS_AS_SET定义的(注意了这里的初始化url池和我们上边的queueurl池不是一个东西,queue的池是用于调度的,初始化url池是存放入口url的,他们都存在redis中,但是使用不同的key来区分,就当成是不同的表吧),spider使用少量的start url,可以发展出很多新的url,这些url会进入scheduler进行判重和调度。直到spider跑到调度池内没有url的时候,会触发spider_idle信号,从而触发spidernext_requests函数,再次从redisstart url池中读取一些url

    utils.py

    py2和py3字符串兼容

    import six
    
    
    def bytes_to_str(s, encoding='utf-8'):
        """Returns a str if a bytes object is given."""
        if six.PY3 and isinstance(s, bytes):
            return s.decode(encoding)
        return s
    

    总结

    这个工程通过重写schedulerspider类,实现了调度spider启动redis的交互。实现新的dupefilterqueue类,达到了判重调度容器redis的交互,因为每个主机上的爬虫进程都访问同一个redis数据库,所以调度和判重都统一进行统一管理,达到了分布式爬虫的目的。 当spider被初始化时,同时会初始化一个对应的scheduler对象,这个调度器对象通过读取settings,配置好自己的调度容器queue和判重工具dupefilter。每当一个spider产出一个request的时候,scrapy内核会把这个reuqest递交给这个spider对应的scheduler对象进行调度,scheduler对象通过访问redisrequest进行判重,如果不重复就把他添加进redis中的调度池。当调度条件满足时,scheduler对象就从redis的调度池中取出一个request发送给spider,让他爬取。当spider爬取的所有暂时可用url之后,scheduler发现这个spider对应的redis的调度池空了,于是触发信号spider_idlespider收到这个信号之后,直接连接redis读取strart url池,拿去新的一批url入口,然后再次重复上边的工作。

    Scrapy-Redis调度的任务是Request对象,里面信息量比较大(不仅包含url,还有callback函数、headers等信息),可能导致的结果就是会降低爬虫速度、而且会占用Redis大量的存储空间,所以如果要保证效率,那么就需要一定硬件水平,尤其是主机。

    Bloom Filter

    https://piaosanlang.gitbooks.io/spiders/09day/section9.1.html

    https://pypi.org/project/pybloomfiltermmap3/#description

    https://pypi.org/project/pybloom_live

    scrapy-redis去重

    scrapy_redis是利用set数据结构来去重的,去重的对象是requestfingerprint
    去重原理说过了.

     def request_seen(self, request):
            fp = self.request_fingerprint(request)
            # This returns the number of values added, zero if already exists.
            added = self.server.sadd(self.key, fp)
            return added == 0
    

    如果要使用Bloomfilter优化,可以修改去重函数request_seen

    def request_seen(self, request):
        fp = self.request_fingerprint(request)
        if self.bf.isContains(fp):    # 如果已经存在
            return True
        else:
            self.bf.insert(fp)
            return False
    

    self.bf是类Bloomfilter()的实例化

    # encoding=utf-8
    
    import redis
    from hashlib import md5
    
    
    class SimpleHash(object):
        def __init__(self, cap, seed):
            self.cap = cap
            self.seed = seed
    
        def hash(self, value):
            ret = 0
            for i in range(len(value)):
                ret += self.seed * ret + ord(value[i])
            return (self.cap - 1) & ret
    
    
    class BloomFilter(object):
        def __init__(self, host='localhost', port=6379, db=0, blockNum=1, key='bloomfilter'):
            """
            :param host: the host of Redis
            :param port: the port of Redis
            :param db: witch db in Redis
            :param blockNum: one blockNum for about 90,000,000; if you have more strings for filtering, increase it.
            :param key: the key's name in Redis
            """
            self.server = redis.Redis(host=host, port=port, db=db)
            self.bit_size = 1 << 31  # Redis的String类型最大容量为512M,现使用256M= 2^8 *2^20 字节 = 2^28 * 2^3bit
            self.seeds = [5, 7, 11, 13, 31, 37, 61]
            self.key = key
            self.blockNum = blockNum
            self.hashfunc = []
            for seed in self.seeds:
                self.hashfunc.append(SimpleHash(self.bit_size, seed))
    
        def isContains(self, str_input):
            if not str_input:
                return False
            m5 = md5()
            m5.update(str_input)
            str_input = m5.hexdigest()
            ret = True
            name = self.key + str(int(str_input[0:2], 16) % self.blockNum)
            for f in self.hashfunc:
                loc = f.hash(str_input)
                ret = ret & self.server.getbit(name, loc)
            return ret
    
        def insert(self, str_input):
            m5 = md5()
            m5.update(str_input)
            str_input = m5.hexdigest()
            name = self.key + str(int(str_input[0:2], 16) % self.blockNum)
            for f in self.hashfunc:
                loc = f.hash(str_input)
                self.server.setbit(name, loc, 1)
    
    
    if __name__ == '__main__':
    """ 第一次运行时会显示 not exists!,之后再运行会显示 exists! """
        bf = BloomFilter()
        if bf.isContains('http://www.baidu.com'):   # 判断字符串是否存在
            print 'exists!'
        else:
            print 'not exists!'
            bf.insert('http://www.baidu.com')
    

    基于RedisBloomfilter去重,既用上了Bloomfilter的海量去重能力,又用上了Redis的可持久化能力,基于Redis也方便分布式机器的去重

    scrapyd部署scrapy

    https://github.com/scrapy/scrapyd

    扩展

    如何防止死循环

    在Scrapy的默认配置中,是根据url进行去重的。这个对付一般网站是够的。但是有一些网站的SEO做的很变态:为了让爬虫多抓,会根据request,动态的生成一些链接,导致爬虫 在网站上抓取大量的随机页面,甚至是死循环。。
    为了解决这个问题,有2个方案:
    (1) 在setting.py中,设定爬虫的嵌套次数上限(全局设定,实际是通过DepthMiddleware实现的):

    DEPTH_LIMIT = 20
    

    (2) 在parse中通过读取response来自行判断(spider级别设定) :

    def parse(self, response):
        if response.meta['depth'] > 100:
            print ('Loop?')
    

    相关文章

      网友评论

          本文标题:scrapy入门(二)

          本文链接:https://www.haomeiwen.com/subject/bqazvqtx.html