美文网首页python大法攻略
Scrapy+redis分布式爬虫(七、分布式爬虫)

Scrapy+redis分布式爬虫(七、分布式爬虫)

作者: 眼君 | 来源:发表于2020-09-16 00:15 被阅读0次

    scrapy框架本身是不支持分布式的, 但是我们可以考虑将scrapy的scheduler改写成一个共享组件, 交给一个状态管理器进行管理, 从而实现对不同服务器上的爬虫程序进行协调去重。考虑到性能问题, 一般这些需要共享的数据会存储到redis上。

    一、准备工作

    首先, 我们需要在git上下载一个项目scrapy-redis:

    git clone https://github.com/rolando/scrapy-redis
    

    然后在我们python环境下安装一个python连接redis的接口:

    pip install redis
    

    新建一个项目, 之后用Pycharm打开这个项目:

    scrapy startproject ScrapyRedisTest
    

    之后将scrapy-redis中src目录下的文件夹复制到ScrapyRedisTest这个项目根目录下。

    二、编写spider脚本

    不同以往我们用命令直接生成脚本的方式, 这次我们在spiders目录下新建一个空白文件, 之后编写如下一段代码:

    from scrapy_redis.spiders import RedisSpider
    
    class MySpider(RedisSpider):
        name = '<spider_name>'
        redis_key = '<spider_name>:<start_urls>'
        def parse(self,response):
            pass
    

    其中的name就是这个爬虫名称, redis_key是用于区分该爬虫与其它爬虫的识别码, 一般由spider_name:start_urls组成。parse之后的使用逻辑和以往一样, 没有区别。

    三、设置settings

    之后我们需要重新设置settings, 用scrapy_redis的一些组件来填写一些参数

    SCHEDULER = "scrapy_redis.scheduler.Scheduler"
    DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
    ITEM_PIPELINES = {
        'scrapy_redis.pipelines.RedisPipeline': 300,
    }
    

    当在settings中修改好参数, 在spider中编写好逻辑就可以启动爬虫了。

    如果我们机器上已经启动了redis服务, 那么启动爬虫程序之后, 爬虫就会打印日志, 正在监听127.0.0.1:6379, 在正好是redis所在的端口, 这时我们执行redis-cli打开redis客户端, 执行以下命令就可以为该爬虫传入一个start_urls:

    127.0.0.1:6379> lpush <redis_key> <start_urls>
    

    之后, 我们的爬虫程序就会开始以该urls为start_urls执行程序。

    四、配置connection.py

    scrapy_redis目录下有个文件connection.py, 主要在这个文件中进行连接redis的配置:

    #scrapy_redis/connection.py
    ...
    SETTINGS_PARAMS_MAP = {
        'REDIS_URL': 'url',
        'REDIS_HOST': 'host',
        'REDIS_PORT': 'port',
        'REDIS_ENCODING': 'encoding',
    }
    ...
    

    五、bloomfilter实现去重
    首先我们在项目ScrapyRedisTest下新建一个utils文件夹, 在该文件夹下新建一个bloomfilter.py, 编写以下代码:

    #scrapy_redis/utils/bloomfilter.py
    import mmh3
    import redis
    import math
    import time
    
    
    class PyBloomFilter():
        #内置100个随机种子
        SEEDS = [543, 460, 171, 876, 796, 607, 650, 81, 837, 545, 591, 946, 846, 521, 913, 636, 878, 735, 414, 372,
                 344, 324, 223, 180, 327, 891, 798, 933, 493, 293, 836, 10, 6, 544, 924, 849, 438, 41, 862, 648, 338,
                 465, 562, 693, 979, 52, 763, 103, 387, 374, 349, 94, 384, 680, 574, 480, 307, 580, 71, 535, 300, 53,
                 481, 519, 644, 219, 686, 236, 424, 326, 244, 212, 909, 202, 951, 56, 812, 901, 926, 250, 507, 739, 371,
                 63, 584, 154, 7, 284, 617, 332, 472, 140, 605, 262, 355, 526, 647, 923, 199, 518]
    
        #capacity是预先估计要去重的数量
        #error_rate表示错误率
        #conn表示redis的连接客户端
        #key表示在redis中的键的名字前缀
        def __init__(self, capacity=1000000000, error_rate=0.00000001, conn=None, key='BloomFilter'):
            self.m = math.ceil(capacity*math.log2(math.e)*math.log2(1/error_rate))      #需要的总bit位数
            self.k = math.ceil(math.log1p(2)*self.m/capacity)                           #需要最少的hash次数
            self.mem = math.ceil(self.m/8/1024/1024)                                    #需要的多少M内存
            self.blocknum = math.ceil(self.mem/512)                                     #需要多少个512M的内存块,value的第一个字符必须是ascii码,所有最多有256个内存块
            self.seeds = self.SEEDS[0:self.k]
            self.key = key
            self.N = 2**31-1
            self.redis = conn
            print(self.mem)
            print(self.k)
    
        def add(self, value):
            name = self.key + "_" + str(ord(value[0])%self.blocknum)
            hashs = self.get_hashs(value)
            for hash in hashs:
                self.redis.setbit(name, hash, 1)
    
        def is_exist(self, value):
            name = self.key + "_" + str(ord(value[0])%self.blocknum)
            hashs = self.get_hashs(value)
            exist = True
            for hash in hashs:
                exist = exist & self.redis.getbit(name, hash)
            return exist
    
        def get_hashs(self, value):
            hashs = list()
            for seed in self.seeds:
                hash = mmh3.hash(value, seed)
                if hash >= 0:
                    hashs.append(hash)
                else:
                    hashs.append(self.N - hash)
            return hashs
    
    
    pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=0)
    conn = redis.StrictRedis(connection_pool=pool)
    
    if __name__ == "__main__":
        start = time.time()
        bf = PyBloomFilter(conn=conn)
        bf.add('www.jobbole.com')
        bf.add('www.zhihu.com')
        print(bf.is_exist('www.zhihu.com'))
        print(bf.is_exist('www.lagou.com'))
    

    接下来, 我们重写scrapy_redis下的dupefilter.py文件:

    #scrapy_redis/dupefilter.py
    ...
    from ScrapyRedisTest.utils.bloomfilter import conn,PyBloomFilter
    
    class RFPDupeFilter(BaseDupeFilter):
        ...
        def __init__(self, server, key, debug=False):
            ...
            self.bf = PyBloomFilter(conn = conn,key = key)
        
        def request_seen(self,request):
            fp = self.request_fingerprint(request)
            if self.bf.is_exist(fp):
                return True
            else:
                self.bf.add(fp)
                return False
    

    相关文章

      网友评论

        本文标题:Scrapy+redis分布式爬虫(七、分布式爬虫)

        本文链接:https://www.haomeiwen.com/subject/otozektx.html