美文网首页python大法攻略
Scrapy+redis分布式爬虫(七、分布式爬虫)

Scrapy+redis分布式爬虫(七、分布式爬虫)

作者: 眼君 | 来源:发表于2020-09-16 00:15 被阅读0次

scrapy框架本身是不支持分布式的, 但是我们可以考虑将scrapy的scheduler改写成一个共享组件, 交给一个状态管理器进行管理, 从而实现对不同服务器上的爬虫程序进行协调去重。考虑到性能问题, 一般这些需要共享的数据会存储到redis上。

一、准备工作

首先, 我们需要在git上下载一个项目scrapy-redis:

git clone https://github.com/rolando/scrapy-redis

然后在我们python环境下安装一个python连接redis的接口:

pip install redis

新建一个项目, 之后用Pycharm打开这个项目:

scrapy startproject ScrapyRedisTest

之后将scrapy-redis中src目录下的文件夹复制到ScrapyRedisTest这个项目根目录下。

二、编写spider脚本

不同以往我们用命令直接生成脚本的方式, 这次我们在spiders目录下新建一个空白文件, 之后编写如下一段代码:

from scrapy_redis.spiders import RedisSpider

class MySpider(RedisSpider):
    name = '<spider_name>'
    redis_key = '<spider_name>:<start_urls>'
    def parse(self,response):
        pass

其中的name就是这个爬虫名称, redis_key是用于区分该爬虫与其它爬虫的识别码, 一般由spider_name:start_urls组成。parse之后的使用逻辑和以往一样, 没有区别。

三、设置settings

之后我们需要重新设置settings, 用scrapy_redis的一些组件来填写一些参数

SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
ITEM_PIPELINES = {
    'scrapy_redis.pipelines.RedisPipeline': 300,
}

当在settings中修改好参数, 在spider中编写好逻辑就可以启动爬虫了。

如果我们机器上已经启动了redis服务, 那么启动爬虫程序之后, 爬虫就会打印日志, 正在监听127.0.0.1:6379, 在正好是redis所在的端口, 这时我们执行redis-cli打开redis客户端, 执行以下命令就可以为该爬虫传入一个start_urls:

127.0.0.1:6379> lpush <redis_key> <start_urls>

之后, 我们的爬虫程序就会开始以该urls为start_urls执行程序。

四、配置connection.py

scrapy_redis目录下有个文件connection.py, 主要在这个文件中进行连接redis的配置:

#scrapy_redis/connection.py
...
SETTINGS_PARAMS_MAP = {
    'REDIS_URL': 'url',
    'REDIS_HOST': 'host',
    'REDIS_PORT': 'port',
    'REDIS_ENCODING': 'encoding',
}
...

五、bloomfilter实现去重
首先我们在项目ScrapyRedisTest下新建一个utils文件夹, 在该文件夹下新建一个bloomfilter.py, 编写以下代码:

#scrapy_redis/utils/bloomfilter.py
import mmh3
import redis
import math
import time


class PyBloomFilter():
    #内置100个随机种子
    SEEDS = [543, 460, 171, 876, 796, 607, 650, 81, 837, 545, 591, 946, 846, 521, 913, 636, 878, 735, 414, 372,
             344, 324, 223, 180, 327, 891, 798, 933, 493, 293, 836, 10, 6, 544, 924, 849, 438, 41, 862, 648, 338,
             465, 562, 693, 979, 52, 763, 103, 387, 374, 349, 94, 384, 680, 574, 480, 307, 580, 71, 535, 300, 53,
             481, 519, 644, 219, 686, 236, 424, 326, 244, 212, 909, 202, 951, 56, 812, 901, 926, 250, 507, 739, 371,
             63, 584, 154, 7, 284, 617, 332, 472, 140, 605, 262, 355, 526, 647, 923, 199, 518]

    #capacity是预先估计要去重的数量
    #error_rate表示错误率
    #conn表示redis的连接客户端
    #key表示在redis中的键的名字前缀
    def __init__(self, capacity=1000000000, error_rate=0.00000001, conn=None, key='BloomFilter'):
        self.m = math.ceil(capacity*math.log2(math.e)*math.log2(1/error_rate))      #需要的总bit位数
        self.k = math.ceil(math.log1p(2)*self.m/capacity)                           #需要最少的hash次数
        self.mem = math.ceil(self.m/8/1024/1024)                                    #需要的多少M内存
        self.blocknum = math.ceil(self.mem/512)                                     #需要多少个512M的内存块,value的第一个字符必须是ascii码,所有最多有256个内存块
        self.seeds = self.SEEDS[0:self.k]
        self.key = key
        self.N = 2**31-1
        self.redis = conn
        print(self.mem)
        print(self.k)

    def add(self, value):
        name = self.key + "_" + str(ord(value[0])%self.blocknum)
        hashs = self.get_hashs(value)
        for hash in hashs:
            self.redis.setbit(name, hash, 1)

    def is_exist(self, value):
        name = self.key + "_" + str(ord(value[0])%self.blocknum)
        hashs = self.get_hashs(value)
        exist = True
        for hash in hashs:
            exist = exist & self.redis.getbit(name, hash)
        return exist

    def get_hashs(self, value):
        hashs = list()
        for seed in self.seeds:
            hash = mmh3.hash(value, seed)
            if hash >= 0:
                hashs.append(hash)
            else:
                hashs.append(self.N - hash)
        return hashs


pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=0)
conn = redis.StrictRedis(connection_pool=pool)

if __name__ == "__main__":
    start = time.time()
    bf = PyBloomFilter(conn=conn)
    bf.add('www.jobbole.com')
    bf.add('www.zhihu.com')
    print(bf.is_exist('www.zhihu.com'))
    print(bf.is_exist('www.lagou.com'))

接下来, 我们重写scrapy_redis下的dupefilter.py文件:

#scrapy_redis/dupefilter.py
...
from ScrapyRedisTest.utils.bloomfilter import conn,PyBloomFilter

class RFPDupeFilter(BaseDupeFilter):
    ...
    def __init__(self, server, key, debug=False):
        ...
        self.bf = PyBloomFilter(conn = conn,key = key)
    
    def request_seen(self,request):
        fp = self.request_fingerprint(request)
        if self.bf.is_exist(fp):
            return True
        else:
            self.bf.add(fp)
            return False

相关文章

网友评论

    本文标题:Scrapy+redis分布式爬虫(七、分布式爬虫)

    本文链接:https://www.haomeiwen.com/subject/otozektx.html