scrapy框架本身是不支持分布式的, 但是我们可以考虑将scrapy的scheduler改写成一个共享组件, 交给一个状态管理器进行管理, 从而实现对不同服务器上的爬虫程序进行协调去重。考虑到性能问题, 一般这些需要共享的数据会存储到redis上。
一、准备工作
首先, 我们需要在git上下载一个项目scrapy-redis:
git clone https://github.com/rolando/scrapy-redis
然后在我们python环境下安装一个python连接redis的接口:
pip install redis
新建一个项目, 之后用Pycharm打开这个项目:
scrapy startproject ScrapyRedisTest
之后将scrapy-redis中src目录下的文件夹复制到ScrapyRedisTest这个项目根目录下。
二、编写spider脚本
不同以往我们用命令直接生成脚本的方式, 这次我们在spiders目录下新建一个空白文件, 之后编写如下一段代码:
from scrapy_redis.spiders import RedisSpider
class MySpider(RedisSpider):
name = '<spider_name>'
redis_key = '<spider_name>:<start_urls>'
def parse(self,response):
pass
其中的name就是这个爬虫名称, redis_key是用于区分该爬虫与其它爬虫的识别码, 一般由spider_name:start_urls组成。parse之后的使用逻辑和以往一样, 没有区别。
三、设置settings
之后我们需要重新设置settings, 用scrapy_redis的一些组件来填写一些参数
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
ITEM_PIPELINES = {
'scrapy_redis.pipelines.RedisPipeline': 300,
}
当在settings中修改好参数, 在spider中编写好逻辑就可以启动爬虫了。
如果我们机器上已经启动了redis服务, 那么启动爬虫程序之后, 爬虫就会打印日志, 正在监听127.0.0.1:6379, 在正好是redis所在的端口, 这时我们执行redis-cli打开redis客户端, 执行以下命令就可以为该爬虫传入一个start_urls:
127.0.0.1:6379> lpush <redis_key> <start_urls>
之后, 我们的爬虫程序就会开始以该urls为start_urls执行程序。
四、配置connection.py
scrapy_redis目录下有个文件connection.py, 主要在这个文件中进行连接redis的配置:
#scrapy_redis/connection.py
...
SETTINGS_PARAMS_MAP = {
'REDIS_URL': 'url',
'REDIS_HOST': 'host',
'REDIS_PORT': 'port',
'REDIS_ENCODING': 'encoding',
}
...
五、bloomfilter实现去重
首先我们在项目ScrapyRedisTest下新建一个utils文件夹, 在该文件夹下新建一个bloomfilter.py, 编写以下代码:
#scrapy_redis/utils/bloomfilter.py
import mmh3
import redis
import math
import time
class PyBloomFilter():
#内置100个随机种子
SEEDS = [543, 460, 171, 876, 796, 607, 650, 81, 837, 545, 591, 946, 846, 521, 913, 636, 878, 735, 414, 372,
344, 324, 223, 180, 327, 891, 798, 933, 493, 293, 836, 10, 6, 544, 924, 849, 438, 41, 862, 648, 338,
465, 562, 693, 979, 52, 763, 103, 387, 374, 349, 94, 384, 680, 574, 480, 307, 580, 71, 535, 300, 53,
481, 519, 644, 219, 686, 236, 424, 326, 244, 212, 909, 202, 951, 56, 812, 901, 926, 250, 507, 739, 371,
63, 584, 154, 7, 284, 617, 332, 472, 140, 605, 262, 355, 526, 647, 923, 199, 518]
#capacity是预先估计要去重的数量
#error_rate表示错误率
#conn表示redis的连接客户端
#key表示在redis中的键的名字前缀
def __init__(self, capacity=1000000000, error_rate=0.00000001, conn=None, key='BloomFilter'):
self.m = math.ceil(capacity*math.log2(math.e)*math.log2(1/error_rate)) #需要的总bit位数
self.k = math.ceil(math.log1p(2)*self.m/capacity) #需要最少的hash次数
self.mem = math.ceil(self.m/8/1024/1024) #需要的多少M内存
self.blocknum = math.ceil(self.mem/512) #需要多少个512M的内存块,value的第一个字符必须是ascii码,所有最多有256个内存块
self.seeds = self.SEEDS[0:self.k]
self.key = key
self.N = 2**31-1
self.redis = conn
print(self.mem)
print(self.k)
def add(self, value):
name = self.key + "_" + str(ord(value[0])%self.blocknum)
hashs = self.get_hashs(value)
for hash in hashs:
self.redis.setbit(name, hash, 1)
def is_exist(self, value):
name = self.key + "_" + str(ord(value[0])%self.blocknum)
hashs = self.get_hashs(value)
exist = True
for hash in hashs:
exist = exist & self.redis.getbit(name, hash)
return exist
def get_hashs(self, value):
hashs = list()
for seed in self.seeds:
hash = mmh3.hash(value, seed)
if hash >= 0:
hashs.append(hash)
else:
hashs.append(self.N - hash)
return hashs
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=0)
conn = redis.StrictRedis(connection_pool=pool)
if __name__ == "__main__":
start = time.time()
bf = PyBloomFilter(conn=conn)
bf.add('www.jobbole.com')
bf.add('www.zhihu.com')
print(bf.is_exist('www.zhihu.com'))
print(bf.is_exist('www.lagou.com'))
接下来, 我们重写scrapy_redis下的dupefilter.py文件:
#scrapy_redis/dupefilter.py
...
from ScrapyRedisTest.utils.bloomfilter import conn,PyBloomFilter
class RFPDupeFilter(BaseDupeFilter):
...
def __init__(self, server, key, debug=False):
...
self.bf = PyBloomFilter(conn = conn,key = key)
def request_seen(self,request):
fp = self.request_fingerprint(request)
if self.bf.is_exist(fp):
return True
else:
self.bf.add(fp)
return False
网友评论