美文网首页
scrapy分布式去重组件源码及其实现过程

scrapy分布式去重组件源码及其实现过程

作者: Python之战 | 来源:发表于2019-03-15 21:09 被阅读0次

    scrapy_redis在继承scrapy去重组件的基础上覆盖了某些方法,原scrapy去重是基于单机情况下的内部去重,但是分布式是多机条件下的多爬虫协同去重,因此需要让不同及其上的同一个爬虫能够在同一个地方进行去重,这就是Redis的集合。

    先看看scrapy_redis 去重组件dupefilter的源码:

    import logging
    import time
    
    from scrapy.dupefilters import BaseDupeFilter
    from scrapy.utils.request import request_fingerprint
    
    from .connection import get_redis_from_settings
    
    DEFAULT_DUPEFILTER_KEY = "dupefilter:%(timestamp)s"
    
    logger = logging.getLogger(__name__)
    
    # TODO: Rename class to RedisDupeFilter.
    class RFPDupeFilter(BaseDupeFilter):
        """Redis-based request duplicates filter.
        This class can also be used with default Scrapy's scheduler.
        """
    
        logger = logger
    
        def __init__(self, server, key, debug=False):
            """Initialize the duplicates filter.
            Parameters
            ----------
            server : redis.StrictRedis
                The redis server instance.
            key : str
                Redis key Where to store fingerprints.
            debug : bool, optional
                Whether to log filtered requests.
            """
            self.server = server
            self.key = key
            self.debug = debug
            self.logdupes = True
    
        @classmethod
        def from_settings(cls, settings):
            """Returns an instance from given settings.
            This uses by default the key ``dupefilter:<timestamp>``. When using the
            ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as
            it needs to pass the spider name in the key.
            Parameters
            ----------
            settings : scrapy.settings.Settings
            Returns
            -------
            RFPDupeFilter
                A RFPDupeFilter instance.
            """
            server = get_redis_from_settings(settings)
            # XXX: This creates one-time key. needed to support to use this
            # class as standalone dupefilter with scrapy's default scheduler
            # if scrapy passes spider on open() method this wouldn't be needed
            # TODO: Use SCRAPY_JOB env as default and fallback to timestamp.
            key = DEFAULT_DUPEFILTER_KEY % {'timestamp': int(time.time())}
            debug = settings.getbool('DUPEFILTER_DEBUG')
            return cls(server, key=key, debug=debug)
    
        @classmethod
        def from_crawler(cls, crawler):
            """Returns instance from crawler.
            Parameters
            ----------
            crawler : scrapy.crawler.Crawler
            Returns
            -------
            RFPDupeFilter
                Instance of RFPDupeFilter.
            """
            return cls.from_settings(crawler.settings)
    
        def request_seen(self, request):
            """Returns True if request was already seen.
            Parameters
            ----------
            request : scrapy.http.Request
            Returns
            -------
            bool
            """
            fp = self.request_fingerprint(request)
            # This returns the number of values added, zero if already exists.
            added = self.server.sadd(self.key, fp)
            return added == 0
    
        def request_fingerprint(self, request):
            """Returns a fingerprint for a given request.
            Parameters
            ----------
            request : scrapy.http.Request
            Returns
            -------
            str
            """
            return request_fingerprint(request)
    
        def close(self, reason=''):
            """Delete data on close. Called by Scrapy's scheduler.
            Parameters
            ----------
            reason : str, optional
            """
            self.clear()
    
        def clear(self):
            """Clears fingerprints data."""
            self.server.delete(self.key)
    
        def log(self, request, spider):
            """Logs given request.
            Parameters
            ----------
            request : scrapy.http.Request
            spider : scrapy.spiders.Spider
            """
            if self.debug:
                msg = "Filtered duplicate request: %(request)s"
                self.logger.debug(msg, {'request': request}, extra={'spider': spider})
            elif self.logdupes:
                msg = ("Filtered duplicate request %(request)s"
                       " - no more duplicates will be shown"
                       " (see DUPEFILTER_DEBUG to show all duplicates)")
                msg = "Filtered duplicate request: %(request)s"
                self.logger.debug(msg, {'request': request}, extra={'spider': spider})
                self.logdupes = False
    
    

    from_settings、from_crawler方法不用解释,就是读取配置文件连接Redis设置key,关键在request_seen、request_fingerprint这两个方法;request_seen调用self.request_fingerprint进而调用from scrapy.utils.request import request_fingerprint生成request的指纹。

    再看scrapy.utils.request 中的request_fingerprint源码:

    """
    This module provides some useful functions for working with
    scrapy.http.Request objects
    """
    
    from __future__ import print_function
    import hashlib
    import weakref
    from six.moves.urllib.parse import urlunparse
    
    from w3lib.http import basic_auth_header
    from scrapy.utils.python import to_bytes, to_native_str
    
    from w3lib.url import canonicalize_url
    from scrapy.utils.httpobj import urlparse_cached
    
    _fingerprint_cache = weakref.WeakKeyDictionary()
    def request_fingerprint(request, include_headers=None):
        """
        Return the request fingerprint.
        The request fingerprint is a hash that uniquely identifies the resource the
        request points to. For example, take the following two urls:
        http://www.example.com/query?id=111&cat=222
        http://www.example.com/query?cat=222&id=111
        Even though those are two different URLs both point to the same resource
        and are equivalent (ie. they should return the same response).
        Another example are cookies used to store session ids. Suppose the
        following page is only accesible to authenticated users:
        http://www.example.com/members/offers.html
        Lot of sites use a cookie to store the session id, which adds a random
        component to the HTTP Request and thus should be ignored when calculating
        the fingerprint.
        For this reason, request headers are ignored by default when calculating
        the fingeprint. If you want to include specific headers use the
        include_headers argument, which is a list of Request headers to include.
        """
        if include_headers:
            include_headers = tuple(to_bytes(h.lower())
                                     for h in sorted(include_headers))
        cache = _fingerprint_cache.setdefault(request, {})
        if include_headers not in cache:
            fp = hashlib.sha1()
            fp.update(to_bytes(request.method))
            fp.update(to_bytes(canonicalize_url(request.url)))
            fp.update(request.body or b'')
            if include_headers:
                for hdr in include_headers:
                    if hdr in request.headers:
                        fp.update(hdr)
                        for v in request.headers.getlist(hdr):
                            fp.update(v)
            cache[include_headers] = fp.hexdigest()
        return cache[include_headers]
    
    def request_authenticate(request, username, password):
        """Autenticate the given request (in place) using the HTTP basic access
        authentication mechanism (RFC 2617) and the given username and password
        """
        request.headers['Authorization'] = basic_auth_header(username, password)
    
    def request_httprepr(request):
        """Return the raw HTTP representation (as bytes) of the given request.
        This is provided only for reference since it's not the actual stream of
        bytes that will be send when performing the request (that's controlled
        by Twisted).
        """
        parsed = urlparse_cached(request)
        path = urlunparse(('', '', parsed.path or '/', parsed.params, parsed.query, ''))
        s = to_bytes(request.method) + b" " + to_bytes(path) + b" HTTP/1.1\r\n"
        s += b"Host: " + to_bytes(parsed.hostname or b'') + b"\r\n"
        if request.headers:
            s += request.headers.to_string() + b"\r\n"
        s += b"\r\n"
        s += request.body
        return s
    
    def referer_str(request):
        """ Return Referer HTTP header suitable for logging. """
        referrer = request.headers.get('Referer')
        if referrer is None:
            return referrer
        return to_native_str(referrer, errors='replace')
    
    

    request_fingerprint用于返回唯一指纹,并且对于携带参数顺序不同的URL返回相同的指纹,通过哈希计算实现,参与哈希计算的有request.method、request.url、request.body,同时提供了一个备选列表参数,该列表存放需要加入计算的request.headers中的字段信息,也就是cookie也可以参与指纹计算,针对同一个URL但是header中的字段不同那么也可以视为不同的请求,从而不被去重。

    当然获得一个请求的指纹之后,便可与Redis中的去重列表比对,如果存在,那么抛弃,如果不存在那么将指纹加入去重集合,同时通知调度器该request可以进入队列,这便是去重的实现过程。

    image

    相关文章

      网友评论

          本文标题:scrapy分布式去重组件源码及其实现过程

          本文链接:https://www.haomeiwen.com/subject/uzcdmqtx.html