scrapy_redis.scheduler取代了scrapy自带的scheduler调度,scheduler实现队列、url去重、Request管理的功能, 负责调度各个spider的request请求,scheduler初始化时,通过settings文件读取queue和dupefilters的类型(一般就用上边默认的),配置queue和dupefilters使用的key(一般就是spider name加上queue或者dupefilters,这样对于同一种spider的不同实例,就会使用相同的数据块了)
scrapy_redis.scheduler源码如下:
import importlib
import six
from scrapy.utils.misc import load_object
from . import connection, defaults
# TODO: add SCRAPY_JOB support.
class Scheduler(object):
"""Redis-based scheduler
Settings
--------
SCHEDULER_PERSIST : bool (default: False)
Whether to persist or clear redis queue.
SCHEDULER_FLUSH_ON_START : bool (default: False)
Whether to flush redis queue on start.
SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)
How many seconds to wait before closing if no message is received.
SCHEDULER_QUEUE_KEY : str
Scheduler redis key.
SCHEDULER_QUEUE_CLASS : str
Scheduler queue class.
SCHEDULER_DUPEFILTER_KEY : str
Scheduler dupefilter redis key.
SCHEDULER_DUPEFILTER_CLASS : str
Scheduler dupefilter class.
SCHEDULER_SERIALIZER : str
Scheduler serializer.
"""
def __init__(self, server,
persist=False,
flush_on_start=False,
queue_key=defaults.SCHEDULER_QUEUE_KEY,
queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
idle_before_close=0,
serializer=None):
"""Initialize scheduler.
Parameters
----------
server : Redis
The redis server instance.
persist : bool
Whether to flush requests when closing. Default is False.
flush_on_start : bool
Whether to flush requests on start. Default is False.
queue_key : str
Requests queue key.
queue_cls : str
Importable path to the queue class.
dupefilter_key : str
Duplicates filter key.
dupefilter_cls : str
Importable path to the dupefilter class.
idle_before_close : int
Timeout before giving up.
"""
if idle_before_close < 0:
raise TypeError("idle_before_close cannot be negative")
self.server = server
self.persist = persist
self.flush_on_start = flush_on_start
self.queue_key = queue_key
self.queue_cls = queue_cls
self.dupefilter_cls = dupefilter_cls
self.dupefilter_key = dupefilter_key
self.idle_before_close = idle_before_close
self.serializer = serializer
self.stats = None
def __len__(self):
return len(self.queue)
@classmethod
def from_settings(cls, settings):
kwargs = {
'persist': settings.getbool('SCHEDULER_PERSIST'),
'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
}
# If these values are missing, it means we want to use the defaults.
optional = {
# TODO: Use custom prefixes for this settings to note that are
# specific to scrapy-redis.
'queue_key': 'SCHEDULER_QUEUE_KEY',
'queue_cls': 'SCHEDULER_QUEUE_CLASS',
'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
# We use the default setting name to keep compatibility.
'dupefilter_cls': 'DUPEFILTER_CLASS',
'serializer': 'SCHEDULER_SERIALIZER',
}
for name, setting_name in optional.items():
val = settings.get(setting_name)
if val:
kwargs[name] = val
# Support serializer as a path to a module.
if isinstance(kwargs.get('serializer'), six.string_types):
kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
server = connection.from_settings(settings)
# Ensure the connection is working.
server.ping()
return cls(server=server, **kwargs)
@classmethod
def from_crawler(cls, crawler):
instance = cls.from_settings(crawler.settings)
# FIXME: for now, stats are only supported from this constructor
instance.stats = crawler.stats
return instance
def open(self, spider):
self.spider = spider
try:
self.queue = load_object(self.queue_cls)(
server=self.server,
spider=spider,
key=self.queue_key % {'spider': spider.name},
serializer=self.serializer,
)
except TypeError as e:
raise ValueError("Failed to instantiate queue class '%s': %s",
self.queue_cls, e)
try:
self.df = load_object(self.dupefilter_cls)(
server=self.server,
key=self.dupefilter_key % {'spider': spider.name},
debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
)
except TypeError as e:
raise ValueError("Failed to instantiate dupefilter class '%s': %s",
self.dupefilter_cls, e)
if self.flush_on_start:
self.flush()
# notice if there are requests already in the queue to resume the crawl
if len(self.queue):
spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
def close(self, reason):
if not self.persist:
self.flush()
def flush(self):
self.df.clear()
self.queue.clear()
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
if self.stats:
self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
self.queue.push(request)
return True
def next_request(self):
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)
if request and self.stats:
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request
def has_pending_requests(self):
return len(self) > 0
在爬虫开始运行时,读取配置文件配置,并创建Request队列对象和Request的url去重对象。
def open(self, spider):
self.spider = spider
try:
self.queue = load_object(self.queue_cls)(
server=self.server,
spider=spider,
key=self.queue_key % {'spider': spider.name},
serializer=self.serializer,
)
except TypeError as e:
raise ValueError("Failed to instantiate queue class '%s': %s",
self.queue_cls, e)
try:
self.df = load_object(self.dupefilter_cls)(
server=self.server,
key=self.dupefilter_key % {'spider': spider.name},
debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
)
except TypeError as e:
raise ValueError("Failed to instantiate dupefilter class '%s': %s",
self.dupefilter_cls, e)
if self.flush_on_start:
self.flush()
# notice if there are requests already in the queue to resume the crawl
if len(self.queue):
spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
调度器实现了两个重要的方法,一个是入队列一个是出队列;当一个Request提交给调度器后,会根据是启用重复下载来去重,然后放入队列中:
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
if self.stats:
self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
self.queue.push(request)
return True
另一个重要方法是出队列next_request,当调度完成时,next_request被调用,scheduler就通过queue容器的接口,取出一个request,把他发送给相应的spider,让spider进行爬取工作。
def next_request(self):
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)
if request and self.stats:
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request
当爬虫关闭时,会根据persist参数来决定是否清空队列,该参数说明:persist : bool Whether to flush requests when closing. Default is False.默认是False,但往往在配置文件中配置SCHEDULER_PERSIST = True(不清空)
def close(self, reason):
if not self.persist:
self.flush()
def flush(self):
self.df.clear()
self.queue.clear()
image
网友评论