美文网首页
RedisSpider的调度队列实现过程及其源码

RedisSpider的调度队列实现过程及其源码

作者: Python之战 | 来源:发表于2019-03-11 22:31 被阅读0次

    对于非分布式的scrapy爬虫而言,不能共享爬虫队列,不能实现分布式。RedisSpider是依赖Redis存储中介,来实现多台主机多爬虫之间的通信,RedisSpider是去重是内部的queue.py文件实现的,内部实现了队列、堆栈、优先级队列,在调度的统一协调下最终实现分布式协同工作。

    queue.py的源码

    from scrapy.utils.reqser import request_to_dict, request_from_dict
    
    from . import picklecompat
    
    class Base(object):
        """Per-spider base queue class"""
    
        def __init__(self, server, spider, key, serializer=None):
            """Initialize per-spider redis queue.
    
            Parameters
            ----------
            server : StrictRedis
                Redis client instance.
            spider : Spider
                Scrapy spider instance.
            key: str
                Redis key where to put and get messages.
            serializer : object
                Serializer object with ``loads`` and ``dumps`` methods.
    
            """
            if serializer is None:
                # Backward compatibility.
                # TODO: deprecate pickle.
                serializer = picklecompat
            if not hasattr(serializer, 'loads'):
                raise TypeError("serializer does not implement 'loads' function: %r"
                                % serializer)
            if not hasattr(serializer, 'dumps'):
                raise TypeError("serializer '%s' does not implement 'dumps' function: %r"
                                % serializer)
    
            self.server = server
            self.spider = spider
            self.key = key % {'spider': spider.name}
            self.serializer = serializer
    
        def _encode_request(self, request):
            """Encode a request object"""
            obj = request_to_dict(request, self.spider)
            return self.serializer.dumps(obj)
    
        def _decode_request(self, encoded_request):
            """Decode an request previously encoded"""
            obj = self.serializer.loads(encoded_request)
            return request_from_dict(obj, self.spider)
    
        def __len__(self):
            """Return the length of the queue"""
            raise NotImplementedError
    
        def push(self, request):
            """Push a request"""
            raise NotImplementedError
    
        def pop(self, timeout=0):
            """Pop a request"""
            raise NotImplementedError
    
        def clear(self):
            """Clear queue/stack"""
            self.server.delete(self.key)
    
    class FifoQueue(Base):
        """Per-spider FIFO queue"""
    
        def __len__(self):
            """Return the length of the queue"""
            return self.server.llen(self.key)
    
        def push(self, request):
            """Push a request"""
            self.server.lpush(self.key, self._encode_request(request))
    
        def pop(self, timeout=0):
            """Pop a request"""
            if timeout > 0:
                data = self.server.brpop(self.key, timeout)
                if isinstance(data, tuple):
                    data = data[1]
            else:
                data = self.server.rpop(self.key)
            if data:
                return self._decode_request(data)
    
    class PriorityQueue(Base):
        """Per-spider priority queue abstraction using redis' sorted set"""
    
        def __len__(self):
            """Return the length of the queue"""
            return self.server.zcard(self.key)
    
        def push(self, request):
            """Push a request"""
            data = self._encode_request(request)
            score = -request.priority
            # We don't use zadd method as the order of arguments change depending on
            # whether the class is Redis or StrictRedis, and the option of using
            # kwargs only accepts strings, not bytes.
            self.server.execute_command('ZADD', self.key, score, data)
    
        def pop(self, timeout=0):
            """
            Pop a request
            timeout not support in this queue class
            """
            # use atomic range/remove using multi/exec
            pipe = self.server.pipeline()
            pipe.multi()
            pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
            results, count = pipe.execute()
            if results:
                return self._decode_request(results[0])
    
    class LifoQueue(Base):
        """Per-spider LIFO queue."""
    
        def __len__(self):
            """Return the length of the stack"""
            return self.server.llen(self.key)
    
        def push(self, request):
            """Push a request"""
            self.server.lpush(self.key, self._encode_request(request))
    
        def pop(self, timeout=0):
            """Pop a request"""
            if timeout > 0:
                data = self.server.blpop(self.key, timeout)
                if isinstance(data, tuple):
                    data = data[1]
            else:
                data = self.server.lpop(self.key)
    
            if data:
                return self._decode_request(data)
    
    # TODO: Deprecate the use of these names.
    SpiderQueue = FifoQueue
    SpiderStack = LifoQueue
    SpiderPriorityQueue = PriorityQueue
    
    

    其中一个基类声明有那些接口,实现了三个子类分别是SpiderQueue、SpiderStack、SpiderPriorityQueue ,并在该文件引入了序列化方法picklecompat用来处理数据存储中的兼容编码问题。

    三个之类分别实现先进先出、先进后出、优先级先出的Request调度方法其中三种方式储存Rquest对象方法如下:

    SpiderQueue:lpush(self.key, self._encode_request(request)),即按照key:序列化Request方式储存

    SpiderStack:lpush(self.key, self._encode_request(request)),存储方式同上,不同的是获取request对象是先进后出

    SpiderPriorityQueue:execute_command('ZADD', self.key, score, data),按照key、优先级、序列化Request对象方式存储,实现优先级的调度。

    当在多台主机上运行相同的爬虫,self.key = key % {'spider': spider.name}爬虫名相同就决定了同一个爬虫的调度队列是相同的key,进而实现了不同主机爬虫之间的分布式。

    关于分布式爬虫,此前说过三种架构思想,这里验证了RedisSpider的架构思想《三种分布式爬虫系统的架构方式》感兴趣的可用其他两种方式实现。

    image

    相关文章

      网友评论

          本文标题:RedisSpider的调度队列实现过程及其源码

          本文链接:https://www.haomeiwen.com/subject/iiaipqtx.html