代理池

作者: GHope | 来源:发表于2018-11-08 16:21 被阅读5次

    代理池的作用

    解决短时间内频繁爬取统一网站导致IP封锁的情况。具体工作机制:从各大代理网站抓取免费IP —— 去重后以有序集合的方式保存到Redis中 —— 定时检测IP有效性、根据自己设定的分数规则进行优先级更改并删除分数为零(无效)的IP —— 提供代理接口供爬虫工具使用

    获取模块:

    定时抓取

        def schedule_getter(self, cycle=GETTER_CYCLE):
            """
            定时获取代理
            """
            getter = Getter()
            while True:
                print('开始抓取代理')
                getter.run()
                time.sleep(cycle)
    

    一定时间间隔内判断一次代理池是否需要继续抓取

        def is_over_threshold(self):
            """
            判断是否达到了代理池限制
            """
            if self.redis.count() >= POOL_UPPER_THRESHOLD:
                return True
            else:
                return False
        
        def run(self):
            print('获取器开始执行')
            if not self.is_over_threshold():
                for callback_label in range(self.crawler.__CrawlFuncCount__):
                    callback = self.crawler.__CrawlFunc__[callback_label]
                    # 获取代理
                    proxies = self.crawler.get_proxies(callback)
                    sys.stdout.flush()
                    for proxy in proxies:
                        self.redis.add(proxy)
    

    部分代理抓取的具体实现

        def crawl_daili66(self, page_count=4):
            """
            获取代理66
            :param page_count: 页码
            :return: 代理
            """
            start_url = 'http://www.66ip.cn/{}.html'
            urls = [start_url.format(page) for page in range(1, page_count + 1)]
            for url in urls:
                print('Crawling', url)
                html = get_page(url)
                if html:
                    doc = pq(html)
                    trs = doc('.containerbox table tr:gt(0)').items()
                    for tr in trs:
                        ip = tr.find('td:nth-child(1)').text()
                        port = tr.find('td:nth-child(2)').text()
                        yield ':'.join([ip, port])
    
        def crawl_proxy360(self):
            """
            获取Proxy360
            :return: 代理
            """
            start_url = 'http://www.proxy360.cn/Region/China'
            print('Crawling', start_url)
            html = get_page(start_url)
            if html:
                doc = pq(html)
                lines = doc('div[name="list_proxy_ip"]').items()
                for line in lines:
                    ip = line.find('.tbBottomLine:nth-child(1)').text()
                    port = line.find('.tbBottomLine:nth-child(2)').text()
                    yield ':'.join([ip, port])
    

    免费代理补充
    http://www.goubanjia.com/free/gngn/index.shtml>
    http://www.ip181.com/
    http://www.ip3366.net/free/?stype=1
    http://www.kxdaili.com/ipList/
    https://premproxy.com/proxy-by-country/
    http://www.kuaidaili.com/free/inha/
    http://www.xicidaili.com/nn/
    http://www.ip3366.net/?stype=1
    http://www.iphai.com/
    http://www.data5u.com/free/gngn/index.shtml

    存储模块:

    存储合法IP(redis sorted set,有序集合)

     def add(self, proxy, score=INITIAL_SCORE):
            """
            添加代理,设置分数为最高
            :param proxy: 代理
            :param score: 分数
            :return: 添加结果
            """
            if not re.match('\d+\.\d+\.\d+\.\d+\:\d+', proxy):
                print('代理不符合规范', proxy, '丢弃')
                return
            if not self.db.zscore(REDIS_KEY, proxy):
                return self.db.zadd(REDIS_KEY, score, proxy)
    

    检测模块:

    分数规则: 分数100为可用,定时循环检测每个代理使用情况,一旦检测到可用代理设置为100, 不可用 时分数减1,分数减为0后移除。新获取的代理分数为10, 如测试可行,分数设为100,不可行分数减1,分数减为0后代理移除。

    测试抓取到的IP是否可用

    class Tester(object):
        def __init__(self):
            self.redis = RedisClient()
        
        async def test_single_proxy(self, proxy):
            """
            测试单个代理
            :param proxy:
            :return:
            """
            conn = aiohttp.TCPConnector(verify_ssl=False)
            async with aiohttp.ClientSession(connector=conn) as session:
                try:
                    if isinstance(proxy, bytes):
                        proxy = proxy.decode('utf-8')
                    real_proxy = 'http://' + proxy
                    print('正在测试', proxy)
                    async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response:
                        if response.status in VALID_STATUS_CODES:
                            self.redis.max(proxy)
                            print('代理可用', proxy)
                        else:
                            self.redis.decrease(proxy)
                            print('请求响应码不合法 ', response.status, 'IP', proxy)
                except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError):
                    self.redis.decrease(proxy)
                    print('代理请求失败', proxy)
        
        def run(self):
            """
            测试主函数
            :return:
            """
            print('测试器开始运行')
            try:
                count = self.redis.count()
                print('当前剩余', count, '个代理')
                for i in range(0, count, BATCH_TEST_SIZE):
                    start = i
                    stop = min(i + BATCH_TEST_SIZE, count)
                    print('正在测试第', start + 1, '-', stop, '个代理')
                    test_proxies = self.redis.batch(start, stop)
                    loop = asyncio.get_event_loop()
                    tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
                    loop.run_until_complete(asyncio.wait(tasks))
                    sys.stdout.flush()
                    time.sleep(5)
            except Exception as e:
                print('测试器发生错误', e.args)
    

    可用情况:设置初始分数并保存到本地数据库(Redis)

        def max(self, proxy):
            """
            将代理设置为MAX_SCORE
            :param proxy: 代理
            :return: 设置结果
            """
            print('代理', proxy, '可用,设置为', MAX_SCORE)
            return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy)
    

    不可用情况:分数减一,如果分数小于设置的最小分数则移除该代理IP

     def decrease(self, proxy):
            """
            代理值减一分,小于最小值则删除
            :param proxy: 代理
            :return: 修改后的代理分数
            """
            score = self.db.zscore(REDIS_KEY, proxy)
            if score and score > MIN_SCORE:
                print('代理', proxy, '当前分数', score, '减1')
                return self.db.zincrby(REDIS_KEY, proxy, -1)
            else:
                print('代理', proxy, '当前分数', score, '移除')
                return self.db.zrem(REDIS_KEY, proxy)
    

    接口模块:

    提供代理接口(Flask)

    from flask import Flask, g
    
    from .db import RedisClient
    
    __all__ = ['app']
    
    app = Flask(__name__)
    
    
    def get_conn():
        if not hasattr(g, 'redis'):
            g.redis = RedisClient()
        return g.redis
    
    
    @app.route('/')
    def index():
        return '<h2>Welcome to Proxy Pool System</h2>'
    
    
    @app.route('/random')
    def get_proxy():
        """
        Get a proxy
        :return: 随机代理
        """
        conn = get_conn()
        return conn.random()
    
    
    @app.route('/count')
    def get_counts():
        """
        Get the count of proxies
        :return: 代理池总量
        """
        conn = get_conn()
        return str(conn.count())
    
    
    if __name__ == '__main__':
        app.run()
    

    优先使用分数高的代理

        def random(self):
            """
            随机获取有效代理,首先尝试获取最高分数代理,如果不存在,按照排名获取,否则异常
            :return: 随机代理
            """
            result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
            if len(result):
                return choice(result)
            else:
                result = self.db.zrevrange(REDIS_KEY, 0, 100)
                if len(result):
                    return choice(result)
                else:
                    raise PoolEmptyError
    

    相关配置

    # Redis数据库地址
    REDIS_HOST = '127.0.0.1'
    
    # Redis端口
    REDIS_PORT = 6379
    
    # Redis密码,如无填None
    REDIS_PASSWORD = None
    
    REDIS_KEY = 'proxies'
    
    # 代理分数
    MAX_SCORE = 100
    MIN_SCORE = 0
    INITIAL_SCORE = 10
    
    VALID_STATUS_CODES = [200, 302]
    
    # 代理池数量界限
    POOL_UPPER_THRESHOLD = 50000
    
    # 检查周期
    TESTER_CYCLE = 20
    # 获取周期
    GETTER_CYCLE = 300
    
    # 测试API,建议抓哪个网站测哪个
    TEST_URL = 'http://www.baidu.com'
    
    # API配置
    API_HOST = '0.0.0.0'
    API_PORT = 5555
    
    # 开关
    TESTER_ENABLED = True
    GETTER_ENABLED = True
    API_ENABLED = True
    
    # 最大批测试量
    BATCH_TEST_SIZE = 10
    

    测试用例

    import os
    import sys
    import requests
    from bs4 import BeautifulSoup
    
    dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    sys.path.insert(0, dir)
    
    
    def get_proxy():
        r = requests.get('http://127.0.0.1:5000/get')
        proxy = BeautifulSoup(r.text, "lxml").get_text()
        return proxy
    
    
    def crawl(url, proxy):
        proxies = {'http': proxy}
        r = requests.get(url, proxies=proxies)
        return r.text
    
    
    def main():
        proxy = get_proxy()
        html = crawl('http://docs.jinkan.org/docs/flask/', proxy)
        print(html)
    
    if __name__ == '__main__':
        main()
    

    Git地址

    相关文章

      网友评论

        本文标题:代理池

        本文链接:https://www.haomeiwen.com/subject/jcqbtqtx.html