对于非分布式的scrapy爬虫而言,不能共享爬虫队列,不能实现分布式。RedisSpider是依赖Redis存储中介,来实现多台主机多爬虫之间的通信,RedisSpider是去重是内部的queue.py文件实现的,内部实现了队列、堆栈、优先级队列,在调度的统一协调下最终实现分布式协同工作。
queue.py的源码
from scrapy.utils.reqser import request_to_dict, request_from_dict
from . import picklecompat
class Base(object):
"""Per-spider base queue class"""
def __init__(self, server, spider, key, serializer=None):
"""Initialize per-spider redis queue.
Parameters
----------
server : StrictRedis
Redis client instance.
spider : Spider
Scrapy spider instance.
key: str
Redis key where to put and get messages.
serializer : object
Serializer object with ``loads`` and ``dumps`` methods.
"""
if serializer is None:
# Backward compatibility.
# TODO: deprecate pickle.
serializer = picklecompat
if not hasattr(serializer, 'loads'):
raise TypeError("serializer does not implement 'loads' function: %r"
% serializer)
if not hasattr(serializer, 'dumps'):
raise TypeError("serializer '%s' does not implement 'dumps' function: %r"
% serializer)
self.server = server
self.spider = spider
self.key = key % {'spider': spider.name}
self.serializer = serializer
def _encode_request(self, request):
"""Encode a request object"""
obj = request_to_dict(request, self.spider)
return self.serializer.dumps(obj)
def _decode_request(self, encoded_request):
"""Decode an request previously encoded"""
obj = self.serializer.loads(encoded_request)
return request_from_dict(obj, self.spider)
def __len__(self):
"""Return the length of the queue"""
raise NotImplementedError
def push(self, request):
"""Push a request"""
raise NotImplementedError
def pop(self, timeout=0):
"""Pop a request"""
raise NotImplementedError
def clear(self):
"""Clear queue/stack"""
self.server.delete(self.key)
class FifoQueue(Base):
"""Per-spider FIFO queue"""
def __len__(self):
"""Return the length of the queue"""
return self.server.llen(self.key)
def push(self, request):
"""Push a request"""
self.server.lpush(self.key, self._encode_request(request))
def pop(self, timeout=0):
"""Pop a request"""
if timeout > 0:
data = self.server.brpop(self.key, timeout)
if isinstance(data, tuple):
data = data[1]
else:
data = self.server.rpop(self.key)
if data:
return self._decode_request(data)
class PriorityQueue(Base):
"""Per-spider priority queue abstraction using redis' sorted set"""
def __len__(self):
"""Return the length of the queue"""
return self.server.zcard(self.key)
def push(self, request):
"""Push a request"""
data = self._encode_request(request)
score = -request.priority
# We don't use zadd method as the order of arguments change depending on
# whether the class is Redis or StrictRedis, and the option of using
# kwargs only accepts strings, not bytes.
self.server.execute_command('ZADD', self.key, score, data)
def pop(self, timeout=0):
"""
Pop a request
timeout not support in this queue class
"""
# use atomic range/remove using multi/exec
pipe = self.server.pipeline()
pipe.multi()
pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
results, count = pipe.execute()
if results:
return self._decode_request(results[0])
class LifoQueue(Base):
"""Per-spider LIFO queue."""
def __len__(self):
"""Return the length of the stack"""
return self.server.llen(self.key)
def push(self, request):
"""Push a request"""
self.server.lpush(self.key, self._encode_request(request))
def pop(self, timeout=0):
"""Pop a request"""
if timeout > 0:
data = self.server.blpop(self.key, timeout)
if isinstance(data, tuple):
data = data[1]
else:
data = self.server.lpop(self.key)
if data:
return self._decode_request(data)
# TODO: Deprecate the use of these names.
SpiderQueue = FifoQueue
SpiderStack = LifoQueue
SpiderPriorityQueue = PriorityQueue
其中一个基类声明有那些接口,实现了三个子类分别是SpiderQueue、SpiderStack、SpiderPriorityQueue ,并在该文件引入了序列化方法picklecompat用来处理数据存储中的兼容编码问题。
三个之类分别实现先进先出、先进后出、优先级先出的Request调度方法其中三种方式储存Rquest对象方法如下:
SpiderQueue:lpush(self.key, self._encode_request(request)),即按照key:序列化Request方式储存
SpiderStack:lpush(self.key, self._encode_request(request)),存储方式同上,不同的是获取request对象是先进后出
SpiderPriorityQueue:execute_command('ZADD', self.key, score, data),按照key、优先级、序列化Request对象方式存储,实现优先级的调度。
当在多台主机上运行相同的爬虫,self.key = key % {'spider': spider.name}爬虫名相同就决定了同一个爬虫的调度队列是相同的key,进而实现了不同主机爬虫之间的分布式。
关于分布式爬虫,此前说过三种架构思想,这里验证了RedisSpider的架构思想《三种分布式爬虫系统的架构方式》感兴趣的可用其他两种方式实现。
image
网友评论