美文网首页
为爬虫搭建一个ProxyPool

为爬虫搭建一个ProxyPool

作者: LinxsCoding | 来源:发表于2018-05-31 21:11 被阅读210次

前言

在日常写数据采集脚本的时候,我们总会遇到各种反爬手段,比如:headers反爬,ip反爬等。上一篇我们知道了Cookies池的搭建过程,通过Cookies池可以登录站点抓取设置登录限制的页面数据。但是仅仅有Cookies池是不够的,因为你Cookies的数量再多,也只是单节点,有一些网站如58会设置IP反爬,你的频率快了之后,ip可能会有被封的风险。那我限制下速度不行吗? 限制了速度,其实也就降低了效率,还有这并不一定能有效绕过IP反爬。所以,我们就有必要去构建和维护一个代理池

Step 1 ---- 实现思路

  • 获取代理从各个代理网站抓取免费代理
  • 存储代理选定一种数据结构(这里采用redis数据库中的有序集合,原因稍后说)用来存储抓取下来的代理
  • 检测代理设置一个测试URL,然后使用代理去请求,根据响应的状态码判断代理是否可用。
  • 接口提供给爬虫一个接口,返回一个随机代理(这里使用flask)



Step2 ---- 模块构成及其功能

代理池有5个模块构成:存储模块,获取模块,检测模块,接口模块,调度模块。下图展示的是代理池的结构

proxy.jpg
  • 获取模块

    从多个站点中抓取代理存放到数据库中,我们要测试是否有用,就必须设计一个筛选可用代理的规则(这里我使用Python3网络爬虫开发实战一书作者的方法)。

    筛选规则:在获取代理后,在将它写入数据库前给它设置一个初始score,如果检测到该代理可用,就讲它设置为100,不可用减一;当减到0时,从数据库中删除。然后设置一个随机函数,用于返回随机代理

  • 存储模块

    获取到一个代理之后,我们需要对其设置score(设置score的目的是用于筛选出可用代理)。这里我们选用redis中的有序集合,是因为这个数据结构里面带有score字段,我们可以根据score字段进行排序,而且集合内所有元素不重复。

  • 检测模块

    拿出每一个代理进行请求测试,根据代理筛选规则设置代理分数

  • 接口模块

    在检测模块运行完之后,代理池中会留下两种代理。一种是可用代理(score=100),另一种是不可用代理(score<100,失败次数太多或是代理不稳定),通过flask构建一个爬虫接口返回score=100的代理。

  • 调度模块

    如果构建一个代理池,你是先采集代理 --> 存储代理 --> 检测代理这样的顺序执行的话,那效率就有点低了,因为需要等待上一个任务完成之后才能进行下一个任务。因此,这里我们采用多进程和异步来实现对各个模块的调度(异步的语法要Python3.5以上才支持。)

Step3 ---- 各个模块实现

-- 存储模块 --



class RedisClient(object):
    def __init__(self):
        '''
        decode_responses参数设置为true,写入的value值为str,否则为字节型
        '''
        self.db = StrictRedis(host=HOST,port=PORT,password=PASSWD,decode_responses=True)
        pass


    def random(self):
        result = self.db.zrangebyscore(REDIS_KEY,MAX_SCORE,MAX_SCORE)
        if len(result):
            return choice(result)
            
        else:
            result = self.db.zrevrange(REDIS_KEY,MIN_SCORE,MAX_SCORE)
            if len(result):
                return choice(result)
            else:
                raise PoolEmptyError

    

    def add(self,proxy,score=INITIAL_SCORE):
        if not self.db.zscore(REDIS_KEY,proxy):
            return self.db.zadd(REDIS_KEY,score,proxy)
        
    

    def decrease(self,proxy):
        # 获取proxy对应的分数
        score = self.db.zscore(REDIS_KEY,proxy)
        if score and score > MIN_SCORE:
            print("代理 {} , 分数 {} , 减1".format(proxy,score))
            return self.db.zincrby(REDIS_KEY,proxy,-1)
            
        else:
            print("代理 {} , 当前分数 {} , 移除".format(proxy,score))
            self.db.zrem(REDIS_KEY,proxy)
            
        
    

    def exists(self,proxy):
        return not self.db.zscore(REDIS_KEY,proxy) == None
        
    

    def max(self,proxy):
        print("代理 {} , 可用 , 设置为{}".format(proxy,MAX_SCORE))
        return self.db.zadd(REDIS_KEY,MAX_SCORE,proxy)v

-- 获取模块 --


class ProxyMetaClass(type):
    def __new__(cls,name,bases,attrs):
        attrs['__CrawleFunc__'] = []
        count = 0
        for k,v in attrs.items():
            if 'crawle' in k:
                attrs['__CrawleFunc__'].append(k)
                count += 1
        attrs['__CrawleCount__'] = count
        return type.__new__(cls,name,bases,attrs)



class Crawler(object,metaclass=ProxyMetaClass):
    def get_proxies(self,callack):
        proxies = []
        if callack == 'crawle_daili66':
            return self.process_daili66(callack)
        else:
            for proxy in eval("self.{}()".format(callack)):
                print("成功获取代理 {}".format(proxy))
                proxies.append(proxy)
            return proxies
    

    def process_daili66(self,callback):
        proxies = []
        for page_count in range(1,11):
            for proxy in eval("self.{}(page_count={})".format(callback,page_count)):
                print("成功获取代理 {}".format(proxy))
                proxies.append(proxy)
        return proxies


    def crawle_xiciproxy(self):
        start_url = "http://www.xicidaili.com/"
        html = get_pages(start_url,"xici")
        if html:
            return parse_xiciproxy(html)
        
    

    def crawle_daili66(self,page_count=1):
        start_url = "http://www.66ip.cn/{}.html"
        sleep(5)
        html = get_pages(start_url.format(page_count),website="66")
        if html:
            print("第{}页代理".format(page_count))
            return parse_66(html)
    

    def crawle_guobanjia(self):
        start_url = "http://www.goubanjia.com/"
        html = get_pages(start_url,'goubanjia')
        if html:
            return parse_guobanjia(html)
        pass


-- 检测模块 --


class CheckUp(object):
    def __init__(self):
        self.db = RedisClient()
    


    def run(self):
        # get all proxies
        # get event loop
        # split proxies to serval part
        # pack a task list
        # call run to exec asynic

        print("Start test")
        try:
            proxies = self.db.all()
            loop = asyncio.get_event_loop()
            for i in range(0,len(proxies),BATCH_SIZE):
                tasks_proxies = proxies[i:i + BATCH_SIZE]
                tasks = [self.check_single_proxy(proxy) for proxy in tasks_proxies]
                loop.run_until_complete(asyncio.wait(tasks))
                sleep(3)
        except Exception as e:
            _ = e
            print(e.args)
            print("CheckUp occurs Error")

    async def check_single_proxy(self,proxy):
        connection = aiohttp.TCPConnector(verify_ssl=False)
        async with aiohttp.ClientSession(connector=connection) as session:
            # 测试代理可用性



-- 接口模块 --



@app.route("/")
def index():
    return "<h2>Welcome to Proxy Pool System</h2>"
    



def get_conn():
    if not hasattr(g,"redis"):
        g.redis = RedisClient()
    return g.redis

@app.route('/count')
def get_count():
    conn = get_conn()
    return conn.count()
    


@app.route('/random')
def random():
    conn = get_conn()
    return conn.random()



-- 调度模块 --



class Scheduler(object):
    def getter(self,cylcle=CYCLE_GETTER):
        getter = Getter()
        while True:
            print("Start to get proxy")
            getter.run()
            sleep(cylcle)
            
    

    def checkup(self,cycle=CYCLE_CHECKUP):
        check = CheckUp()
        while True:
            print("Start to checkup proxy")
            check.run()
            sleep(cycle)
    

    def api(self):
        app.run(API_HOST,API_PORT)
        
    
    def run(self):
        print("Proxy Pool start run")
        if API_PROCSS:
            api_process = multiprocessing.Process(target=self.api)
            api_process.start()

        
        if GETTER_PROCESS:
            getter_process = multiprocessing.Process(target=self.getter)
            getter_process.start()

        
        if CHECKUP_PROCESS:
            checkup_process = multiprocessing.Process(target=self.checkup)
            checkup_process.start()
        


最后

使用代理池我们可以随机更换IP(不一定都有用)来对抗IP反爬,这对于爬取大规模数据是必要的,也是对一个爬虫工程师最基本的条件

最后运行一下,可以看到如下结果

运行.png 代理池.png

具体代码已发布在github上:点我传送门

相关文章

网友评论

      本文标题:为爬虫搭建一个ProxyPool

      本文链接:https://www.haomeiwen.com/subject/hhecsftx.html