美文网首页虫虫
爬虫—代理池的维护四(调度模块)

爬虫—代理池的维护四(调度模块)

作者: 八戒无戒 | 来源:发表于2019-09-29 00:43 被阅读0次

    记录一个免费代理池的维护,主要包含四个模块:
    获取模块:主要负责从各个免费代理网站提取出最新发布的免费代理,获取到本地并解析
    存储模块:负责将获取模块获取到的proxy存储至redis数据库
    检测模块:负责检测redis数据库中proxy的可用代理可不可用代理,并赋以权重
    调度模块:负责将获取模块、存储模块和检测模块关联,并封装

    主要涉及知识点:

    • 元类
    • python操作redis数据库,redis库的使用
    • requests库的使用
    • pyquery的使用
    • aiohttp异步http框架的简单使用
    • 多线程和多进程

    调度模块

    此处新增一个web API接口,可以通过访问url获取一个随机可用代理

    # -*- coding: utf-8 -*-
    """
    __author__ = 'bingo'
    __date__ = '2019/9/7'
    # code is far away from bugs with the god animal protecting
        I love animals. They taste delicious.
                 ┏┓   ┏┓
                ┏┛┻━━━┛┻┓
                ┃     ☃ ┃
                ┃  ┳┛  ┗┳  ┃
                ┃      ┻   ┃
                ┗━┓      ┏━┛
                    ┃    ┗━━━┓
                    ┃  神兽保 ┣┓
                    ┃ 永无BUG┏┛
                    ┗ ┓┏━┳┓┏┛
                      ┃┫┫  ┃┫┫
                      ┗┻┛  ┗┻┛
    """
    import random
    import asyncio
    import requests
    import time
    import redis
    import aiohttp
    from pyquery import PyQuery as pq
    from redis import ResponseError
    from requests import ConnectTimeout
    from concurrent.futures import ThreadPoolExecutor
    from multiprocessing import Process
    from flask import Flask
    
    # 调度模块
    class CrawlSchedule(object):
        __CRAWL_GETTER_CYCLE__ = 300
        __CRAWL_TESTER_CYCLE__ = 20
        __getter_enable__ = True
        __tester_enable__ = True
        __api_enable__ = True
    
        def schedule_getter(self, cycle=__CRAWL_GETTER_CYCLE__):
            """
            调度获取代理
            :param cycle:
            :return:
            """
            crawl_getter = CrawlerGetter()
            while True:
                crawl_getter.run()
                time.sleep(cycle)
    
        def schedule_tester(self, cycle=__CRAWL_TESTER_CYCLE__):
            """
            调度检测代理
            :param cycle:
            :return:
            """
            crawl_tester = CrawlTester()
            while True:
                crawl_tester.run()
                time.sleep(cycle)
    
        def schedule_api(self):
            """
            开启api
            :return:
            """
            return API.run()
    
        def run(self, getter_enable=__getter_enable__, test_enable=__tester_enable__, api_enable=__api_enable__):
            print("代理池开始运行")
            if getter_enable:
                getter_process=Process(target=self.schedule_getter)
                getter_process.start()
            if test_enable:
                tester_process=Process(target=self.schedule_tester)
                tester_process.start()
            if api_enable:
               api_process=Process(target=self.schedule_api)
               api_process.start()
    
    # web API,通过访问路由方式直接获取一个随机代理
    class API():
        app = Flask(__name__)
        redis_db = ProxyRedisClient()
    
        @classmethod
        def get_proxy(cls):
            return cls.redis_db.random()
    
        @classmethod
        def all_proxy_count(cls):
            return str(cls.redis_db.count())
    
        @classmethod
        def run(cls):
            cls.app.add_url_rule('/random/', view_func=cls.get_proxy)
            cls.app.add_url_rule('/count/', view_func=cls.all_proxy_count)
            return cls.app.run(port=8000)
    
    

    到此,代理池的四个模块均已完成,只需实例化一个CrawlSchedule()对象,并调用该对象的run()方法便可以直接开启代理池自行维护

    全部源码

    # -*- coding: utf-8 -*-
    """
    __author__ = 'bingo'
    __date__ = '2019/9/7'
    # code is far away from bugs with the god animal protecting
        I love animals. They taste delicious.
                 ┏┓   ┏┓
                ┏┛┻━━━┛┻┓
                ┃     ☃ ┃
                ┃  ┳┛  ┗┳  ┃
                ┃      ┻   ┃
                ┗━┓      ┏━┛
                    ┃    ┗━━━┓
                    ┃  神兽保 ┣┓
                    ┃ 永无BUG┏┛
                    ┗ ┓┏━┳┓┏┛
                      ┃┫┫  ┃┫┫
                      ┗┻┛  ┗┻┛
    """
    import random
    import asyncio
    import requests
    import time
    import redis
    import aiohttp
    from pyquery import PyQuery as pq
    from redis import ResponseError
    from requests import ConnectTimeout
    from concurrent.futures import ThreadPoolExecutor
    from multiprocessing import Process
    from flask import Flask
    
    
    # 获取模块
    class ProxyMeta(type):
        def __new__(cls, name, bases, attrs):
            crawl_count = 0
            attrs["__CrawlFunc__"] = []
    
            # 获取获取模块中用来爬取代理的所有函数
            for k, v in attrs.items():
                if k.startswith("crawl_"):
                    func = "self.{}()".format(k)
                    attrs["__CrawlFunc__"].append(func)
                    crawl_count += 1
    
            # 获取获取模块中用来爬取代理的函数数量
            attrs["__CrawlFuncCount__"] = crawl_count
            return type.__new__(cls, name, bases, attrs)
    
    
    class CrawlerGetter(object, metaclass=ProxyMeta):
    
        def __init__(self):
            headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X x.y; rv:42.0) Gecko/20100101 Firefox/42.0'}
            self.headers = headers
            self.proxy_count = 0
            self.db_client = ProxyRedisClient()
    
        def get_page(self, url, encoding):
            try:
                res = requests.get(url, headers=self.headers, timeout=2.5)
                if res.status_code == 200:
                    res.encoding = encoding
                    html = res.text
                    return html
                else:
                    return None
            except ConnectTimeout:
                return None
    
        def crawl_66daili(self):
            """
            66代理
            :return:
            """
            i = 0
            url = "http://www.66ip.cn/{page}.html"
            for page in range(1, 11):
                html = self.get_page(url.format(page=page), 'gbk')
                if html:
                    p = pq(html)
                    doc = p(".containerbox table tr:gt(0)")
                    for item in doc.items():
                        proxy_ip = item("td:first-child").text()
                        proxy_port = item("td:nth-child(2)").text()
                        if proxy_ip and proxy_port:
                            proxy = ":".join([proxy_ip, proxy_port])
                            i += 1
                            print("【66代理%s】:%s" % (i, proxy))
                            self.proxy_count += 1
                            yield proxy
                        else:
                            pass
                else:
                    print("【66代理】获取代理失败page:%s" % page)
                    continue
    
        def crawl_iphai(self):
            """
            ip海代理
            :return:
            """
            i = 0
            urls = ["http://www.iphai.com/free/ng", "http://www.iphai.com/free/wg"]
            for url in urls:
                html = self.get_page(url, 'utf8')
                if html:
                    p = pq(html)
                    doc = p(".table-responsive table tr:gt(0)")
                    for item in doc.items():
                        proxy_ip = item("td:first-child").text()
                        proxy_port = item("td:nth-child(2)").text()
                        if proxy_ip and proxy_port:
                            proxy = ":".join([proxy_ip, proxy_port])
                            i += 1
                            print("【IP海代理%s】:%s" % (i, proxy))
                            self.proxy_count += 1
                            yield proxy
                        else:
                            pass
                else:
                    print("【IP海代理】获取代理失败: %s" % url)
                    continue
    
        def crawl_qiyun(self):
            """
            齐云代理
            :return:
            """
            i = 0
            url = "http://www.qydaili.com/free/?action=china&page={page}"
            for page in range(1, 11):
                html = self.get_page(url.format(page=page), "utf8")
                if html:
                    p = pq(html)
                    doc = p(".table tbody tr")
                    for item in doc.items():
                        proxy_ip = item("td:first-child").text()
                        proxy_port = item("td:nth-child(2)").text()
                        if proxy_ip and proxy_port:
                            proxy = ":".join([proxy_ip, proxy_port])
                            i += 1
                            print("【齐云代理%s】:%s" % (i, proxy))
                            self.proxy_count += 1
                            yield proxy
                        else:
                            pass
                else:
                    print("【齐云代理】获取代理失败page:%s" % page)
                    continue
    
        def crawl_89daili(self):
            """
            89免费代理
            :return:
            """
            i = 0
            url = "http://www.89ip.cn/index_{page}.html"
            for page in range(1, 21):
                html = self.get_page(url.format(page=page), "utf8")
                if html:
                    p = pq(html)
                    doc = p(".layui-table tbody tr")
                    for item in doc.items():
                        proxy_ip = item("td:first-child").text()
                        proxy_port = item("td:nth-child(2)").text()
                        if proxy_ip and proxy_port:
                            proxy = ":".join([proxy_ip, proxy_port])
                            i += 1
                            print("【89免费代理%s】:%s" % (i, proxy))
                            self.proxy_count += 1
                            yield proxy
                        else:
                            pass
                else:
                    print("【89免费代理】获取代理失败page:%s" % page)
                    continue
    
        def crawl_kuaidaili(self):
            """
            快代理
            :return:
            """
            i = 0
            url = "https://www.kuaidaili.com/free/inha/{page}/"
            for page in range(1, 11):
                html = self.get_page(url.format(page=page), "utf8")
                if html:
                    p = pq(html)
                    doc = p("table tbody tr")
                    for item in doc.items():
                        proxy_ip = item("td:first-child").text()
                        proxy_port = item("td:nth-child(2)").text()
                        if proxy_ip and proxy_port:
                            proxy = ":".join([proxy_ip, proxy_port])
                            i += 1
                            print("【快代理%s】:%s" % (i, proxy))
                            self.proxy_count += 1
                            yield proxy
                        else:
                            pass
                else:
                    print("【快代理】获取代理失败page:%s" % page)
                    continue
    
        def crawl_yundaili(self):
            """
            云代理
            :return:
            """
            i = 0
            url = "http://www.ip3366.net/free/?stype=1&page={page}"
            for page in range(1, 8):
                html = self.get_page(url.format(page=page), "gb2312")
                if html:
                    p = pq(html)
                    doc = p("table tbody tr")
                    for item in doc.items():
                        proxy_ip = item("td:first-child").text()
                        proxy_port = item("td:nth-child(2)").text()
                        if proxy_ip and proxy_port:
                            proxy = ":".join([proxy_ip, proxy_port])
                            i += 1
                            print("【云代理%s】:%s" % (i, proxy))
                            self.proxy_count += 1
                            yield proxy
                        else:
                            pass
                else:
                    print("【云代理】获取代理失败page:%s" % page)
                    continue
    
        def crawl_xicidaili(self):
            """
            西刺代理
            :return:
            """
            i = 0
            url = "https://www.xicidaili.com/nn/{page}"
            for page in range(1, 6):
                html = self.get_page(url.format(page=page), "utf8")
                if html:
                    p = pq(html)
                    doc = p(".proxies table tr:gt(0)")
                    for item in doc.items():
                        proxy_ip = item("td:nth-child(2)").text()
                        proxy_port = item("td:nth-child(3)").text()
                        if proxy_ip and proxy_port:
                            proxy = ":".join([proxy_ip, proxy_port])
                            i += 1
                            print("【西刺代理%s】:%s" % (i, proxy))
                            self.proxy_count += 1
                            yield proxy
                        else:
                            pass
                else:
                    print("【西刺代理】获取代理失败page:%s" % page)
                    continue
    
        def run(self):
            """
            返回各个网站爬虫接口函数生成器,并以多线程方式存入redis数据库
            :return:
            """
            crawl_funcs_list = []
            try:
                executor = ThreadPoolExecutor(max_workers=10)
                for crawl_func_name in self.__CrawlFunc__:
                    crawl_funcs_list.append(eval(crawl_func_name))
                for crawl_func in crawl_funcs_list:
                    executor.submit(self.to_redis_db, crawl_func)
                executor.shutdown()
            except Exception as e:
                print("ERROR:", e)
    
        def to_redis_db(self, generation):
            """
            接受一个生成代理ip的生成器,将代理存入redis代理池
            :param generation:
            :return:
            """
            proxies_generation = generation
            for proxy in proxies_generation:
                self.db_client.add(proxy)
    
    
    # 存储模块
    class ProxyRedisClient(object):
        __INIT__SCORE__ = 10
        __MAX__SCORE__ = 100
        __REDIS__KEY__ = "proxies"
    
        def __init__(self):
            self.host = "localhost"
            self.port = 6379
            self.password = None
            self.db = redis.StrictRedis(host=self.host, port=self.port, password=self.password, decode_responses=True,
                                        db=10)
    
        def add(self, proxy, score=__INIT__SCORE__):
            """
            代理池数据库添加代理
            :param proxy: 代理ip
            :param score: 分数,初始10分
            :return: True or False
            """
    
            # zadd()向有序集合中添加一个元素
            if not self.exists(proxy):
                self.db.zadd(self.__REDIS__KEY__, {proxy: score})
                return True
            else:
                print("代理【{}】重复".format(proxy))
                return False
    
        def max(self, proxy, score=__MAX__SCORE__):
            """
             代理池数据库添加代理,如果已存在,则更新为最大值
            :param proxy: 代理ip
            :param score: 分数,最大值
            :return: True or False
            """
            return self.db.zadd(self.__REDIS__KEY__, {proxy: score})
    
        def random(self):
            """
            返回一个随机代理,优先返回分数最高的
            :return:
            """
            # zrevrangebyscore()根据给定的键名和分数取键返回区间内元素,并由大到小排序
            # zrevrange() 根据分数从大到小排序,并返回给定索引区间的元素,如此处返回前10个元素
            result = self.db.zrevrangebyscore(self.__REDIS__KEY__, 100, 100)
            if result:
                return random.choice(result)
            else:
                result = self.db.zrevrange(self.__REDIS__KEY__, 0, 10)
                if result:
                    return random.choice(result)
                else:
                    raise ResponseError
    
        def increase(self, proxy):
            """
            分数+1
            :return:
            """
            # zscore() 通过键名key和value返回元素分数,不存在返回None
            # zincrby() 通过键名key和value对对应元素分数加或者减
            proxy_score = self.db.zscore(self.__REDIS__KEY__, proxy)
            if not proxy_score:
                return
            if proxy_score < 100:
                print("当前代理【%s】分数【%s】:+3" % (proxy, proxy_score))
                self.db.zincrby(self.__REDIS__KEY__, 3, proxy)
            else:
                print("当前代理【%s】分数【%s】:已为最大分数" % (proxy, proxy_score))
                pass
    
        def decrease(self, proxy):
            """
            分数-1
            :return:
            """
            proxy_score = self.db.zscore(self.__REDIS__KEY__, proxy)
            if not proxy_score:
                return
            if proxy_score > self.__INIT__SCORE__:
                print("当前代理【%s】分数【%s】:-5" % (proxy, proxy_score))
                self.db.zincrby(self.__REDIS__KEY__, -5, proxy)
            else:
                print("当前代理【%s】分数【%s】:移除" % (proxy, proxy_score))
                self.db.zrem(self.__REDIS__KEY__, proxy)
    
        def exists(self, proxy):
            """
            判断代理是否存在代理池数据库中
            :param proxy: 代理ip
            :return: True or False
            """
            # zscore()通过键名key和value返回元素分数,不存在返回None
            r = self.db.zscore(self.__REDIS__KEY__, proxy)
            if r:
                return True
            else:
                return False
    
        def count(self):
            """
            获取代理池全部代理数量
            :return:
            """
            # zcount()根据键名和分数区间,返回区间内元素个数
            return self.db.zcount(self.__REDIS__KEY__, 0, 100)
    
        def all(self):
            """
            获取代理池全部代理
            :return:
            """
            # zrangebyscore()根据给定的键名和分数取键返回区间内元素,并由小到大排序
            return self.db.zrangebyscore(self.__REDIS__KEY__, 0, self.__MAX__SCORE__)
    
    
    # 检测模块
    class CrawlTester(object):
    
        def __init__(self):
            self.test_url = "https://www.baidu.com"
            self.buffer_test_size = 100
            self.db_client = ProxyRedisClient()
    
        async def test_single_proxy(self, proxy):
            """
            异步方式检测单个proxy,通过访问百度,看proxy是否可用
            :param proxy:
            :return:
            """
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.109 Safari/537.36'}
            async with aiohttp.ClientSession(headers=headers) as session:
                for i in range(2):
                    try:
                        real_proxy = 'http://' + proxy
                        print("正在测试proxy【{}】".format(proxy))
                        async with session.get(self.test_url, proxy=real_proxy, verify_ssl=False, timeout=10) as response:
                            response_status = response.status
                            if response_status in [200,]:
                                print("proxy有效【{}】,设置为最大分数".format(proxy))
                                self.db_client.max(proxy)
                                break
                            else:
                                if i == 1:
                                    print("proxy无效【{}】".format(proxy))
                                    self.db_client.decrease(proxy)
                                else:
                                    print("proxy重新测试第{}次【{}】".format(i+1, proxy))
                                    continue
                    except:
                        if i==1:
                            self.db_client.decrease(proxy)
                        else:
                            print("proxy重新测试第{}次【{}】".format(i + 1, proxy))
                            continue
    
        def run(self):
            """
            开启检测器,每次通过异步请求方式同时检测100个proxy
            :return:
            """
            print("测试器开启运行")
            loop = asyncio.get_event_loop()
            proxies_list = self.db_client.all()
            try:
                for i in range(0, len(proxies_list), self.buffer_test_size):
                    test_proxies = proxies_list[i: i+self.buffer_test_size]
                    tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
                    loop.run_until_complete(asyncio.wait(tasks))
                    time.sleep(2)
            except Exception as e:
                print("测试器发生错误,ERROR: {}".format(e))
    
    
    # 调度模块
    class CrawlSchedule(object):
        __CRAWL_GETTER_CYCLE__ = 300
        __CRAWL_TESTER_CYCLE__ = 20
        __getter_enable__ = True
        __tester_enable__ = True
        __api_enable__ = True
    
        def schedule_getter(self, cycle=__CRAWL_GETTER_CYCLE__):
            """
            调度获取代理
            :param cycle:
            :return:
            """
            crawl_getter = CrawlerGetter()
            while True:
                crawl_getter.run()
                time.sleep(cycle)
    
        def schedule_tester(self, cycle=__CRAWL_TESTER_CYCLE__):
            """
            调度检测代理
            :param cycle:
            :return:
            """
            crawl_tester = CrawlTester()
            while True:
                crawl_tester.run()
                time.sleep(cycle)
    
        def schedule_api(self):
            """
            开启api
            :return:
            """
            return API.run()
    
        def run(self, getter_enable=__getter_enable__, test_enable=__tester_enable__, api_enable=__api_enable__):
            print("代理池开始运行")
            if getter_enable:
                getter_process=Process(target=self.schedule_getter)
                getter_process.start()
            if test_enable:
                tester_process=Process(target=self.schedule_tester)
                tester_process.start()
            if api_enable:
               api_process=Process(target=self.schedule_api)
               api_process.start()
    
    # web API,通过访问路由方式直接获取一个随机代理
    class API():
        app = Flask(__name__)
        redis_db = ProxyRedisClient()
    
        @classmethod
        def get_proxy(cls):
            return cls.redis_db.random()
    
        @classmethod
        def all_proxy_count(cls):
            return str(cls.redis_db.count())
    
        @classmethod
        def run(cls):
            cls.app.add_url_rule('/random/', view_func=cls.get_proxy)
            cls.app.add_url_rule('/count/', view_func=cls.all_proxy_count)
            return cls.app.run(port=8000)
    
    if __name__ == "__main__":
        proxy_pool=CrawlSchedule()
        proxy_pool.run(getter_enable=True, test_enable=True, api_enable=True)
    
    

    运行效果

    效果.png
    我们还可以通过访问127.0.0.1:8000/random/获取一个随即代理
    image.png

    相关文章

      网友评论

        本文标题:爬虫—代理池的维护四(调度模块)

        本文链接:https://www.haomeiwen.com/subject/azhcectx.html