美文网首页
读轮子之免费代理池搭建

读轮子之免费代理池搭建

作者: hellodyp | 来源:发表于2018-09-17 16:53 被阅读0次

    造轮子很高大上,其实理解一个轮子也是一件麻烦事
    往往想理解一个库,打开源代码,我们就会在层层的函数/方法调用栈中迷失...

    大的项目比如tornado、flask的源码乍一下手难度太高>_<,不妨从一个小项目开始
    因为业务需要,公司的爬虫项目需要搭建一个代理池,确保池中的代理必须是可用状态
    面向博客编程后,发现很多代理池轮子
    综合评比下来,选用了这个方案:https://yukunweb.com/2018/4/build-free-asynchronous-proxy-pool/
    轮子不大,但涉及到的技术概念还蛮多的,该篇博客是代理池实现详情说明文档

    整套程序模块化设计,分为以下部分:

    • 获取ip部分
      从网上的免费代理网站抓取代理,设计成可扩展模式,如果有新的免费代理网站,可以添加新的抓取规则,在模块中添加方法即可
    • 数据库封装
      项目使用mongo数据库,在pymongo上又封装了一层,这样可以根据对应代理项目中的curd需求做更好的定制
    • 调度部分
      整个程序的核心调度设计,代理入库、定时检测代理的有效性、池子中代理的数量等等、
    • api
      设计对外的api,可以方便的调用代理,其他机器也可以通过网络调用池中代理

    程序结构:

    ProxyPool \
        Api \
            __init__.py
            api.py
        Spider \
            __init__.py
            get_proxy.py
        Db \
            __init__.py
            db.py
        Schedule \
            __init__.py
            adder.py
            tester.py
            schedule.py
        config.py
        run.py
    
    代理获取

    作者选择的免费代理:幻代理66代理快代理西刺代理
    然后分析这些代理网站的网页结构,设计规则抓下来并保存
    抓取部分代码:

    import requests
    from lxml import etree
    from requests.exceptions import ConnectionError
    
    def parse_url(url):
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) Gecko/20100101 Firefox/54.0'
        }
        try:
            resp = requests.get(url, headers=headers)
            if resp.status_code == 200:
                return resp.text
            return None
        except ConnectionError:
            print('Error.')
        return None
    
    def proxy_xici():
        url = 'http://www.xicidaili.com/'
        resp = parse_url(url)
        html = etree.HTML(resp)
        ips = html.xpath('//*[@id="ip_list"]/tr/td[2]/text()')
        ports = html.xpath('//*[@id="ip_list"]/tr/td[3]/text()')
        for ip, port in zip(ips, ports):
            proxy = ip + ':' + port
            yield proxy
    

    parse_url是一个网页下载函数
    proxy_xici是西刺代理的抓取规则,用xpath解析并使用yield返回组装好的ip+port格式的代理,注意此函数是一个生成器,需要用遍历的方式获取返回内容

    这些抓取ip的方法用类来实现,并且做成了可扩展模式
    具体就是所有的提取ip的方法统一以proxy_作方法名开头,并把这些方法保存在元类中(其实保存的是方法名的字符串格式数据),创建类的时候自动检查以proxy_开头的方法,然后做一些处理
    如果有新的免费代理网站,添加新的方法按照约定的规则命名即可
    具体实现:

    # Spider/get_proxy.py
    class ProxyMetaclass(type):
        """
        元类,在ProxyGetter类中加入
        __CrawlFunc__和__CrawlFuncCount__两个属性
        分别表示爬虫函数和爬虫函数的数量
        """
        def __new__(cls, name, bases, attrs):
            count = 0
            attrs['__CrawlFunc__'] = []
            for k in attrs.keys():
                if k.startswith('proxy_'):
                    attrs['__CrawlFunc__'].append(k)
                    count += 1
            attrs['__CrawlFuncCount__'] = count
            return type.__new__(cls, name, bases, attrs)
    
    
    class ProxyGetter(object, metaclass=ProxyMetaclass):
    
        def proxy_ip66(self):
            pass
    
        def proxy_xici(self):
            pass
    
        def proxy_kuai(self):
            pass
    
        def proxy_ihuan(self):
            pass
    

    上篇文章刚刚介绍了元类,如果你认真理解的话,这里的代码就不难懂了
    如果你理解的更透彻,甚至可以发现这个实现中有一个元类属性是不需要添加的,那就是后话了,下篇文章我们会对这个元类稍作修改使代码更简洁
    从上面的代码看,作者在attr中添加了两个属性,一个__CrawlFuncCount__是保存当前类中获取代理的方法数量
    另一个__CrawlFunc__是ip代理爬虫方法名的string列表,具体实现是约定如果类中属性以proxy_开头就表明是代理爬虫方法,保存到属性中
    注意new方法中使用的是type返回:
    return type.__new__(cls, name, bases, attrs)
    现学现卖,根据我的上篇文章,其实这样写会更好:
    return super(ProxyMetaclass, cls).__new__(cls, name, bases, attrs)

    同时类中还提供了一个方法,通过爬虫方法名来调用方法获取代理

    class ProxyGetter(object, metaclass=ProxyMetaclass):
    
        ...
    
        def get_raw_proxies(self, callback):
            proxies = []
            for proxy in eval("self.{}()".format(callback)):
                print('Getting', proxy, 'from', callback)
                proxies.append(proxy)
            return proxies
    

    关于eval的语法,作者也有解释:

    >>> m = 5
    >>> n = 3
    >>> eval('m') + eval('n')
    8
    
    数据库

    持久化用的是mongo数据库,封装了一套简洁api
    mongo是非关系型文档类型数据库,数据在库中的体现形式非常类似于json格式
    在linux中安装好mongo,终端shell中输入mongo启动客户端
    使用use db命令指定库,这一点和mysql一致
    但是查看库中某张表的命令是不同的:db.table.find()

    > use proxy
    switched to db proxy
    > db.proxy.find()
    { "_id" : ObjectId("5b9f43666cb73e293b904801"), "proxy" : "112.195.206.72:4226", "num" : 1024, "http/s" : true }
    { "_id" : ObjectId("5b9f45956cb73e293b90480e"), "proxy" : "119.114.123.98:8946", "num" : 1028, "http/s" : false }
    { "_id" : ObjectId("5b9f45966cb73e293b904811"), "proxy" : "116.7.187.99:4223", "num" : 1029, "http/s" : true }
    { "_id" : ObjectId("5b9f49c76cb73e293b904820"), "proxy" : "182.113.33.83:4221", "num" : 1033, "http/s" : true }
    { "_id" : ObjectId("5b9f502c6cb73e293b904822"), "proxy" : "116.149.202.69:6436", "num" : 1034, "http/s" : false }
    { "_id" : ObjectId("5b9f502e6cb73e293b904825"), "proxy" : "182.105.201.64:4162", "num" : 1035, "http/s" : true }
    { "_id" : ObjectId("5b9f50406cb73e293b90482c"), "proxy" : "106.46.136.83:4237", "num" : 1036, "http/s" : false }
    { "_id" : ObjectId("5b9f50426cb73e293b90482f"), "proxy" : "112.83.93.63:2589", "num" : 1037, "http/s" : true }
    >
    
    

    表中的数据类似json格式的键值对,其中必须要有_id字段,如果插入数据的时候未指定id,则mongo会自动插入id

    python中和mongo交互,使用的是pymongo库,封装了对数据库的操作
    作者在此之上又封装了一层:

    # db.py
    from pymongo import MongoClient, ASCENDING
    # 我们把配置文件放入 config.py 文件内
    from config import NAME, HOST, PORT
    
    
    class MongodbClient(object):
    
        def __init__(self, table=TABLE, host=HOST, port=PORT):
            self.table = table
            self.client = MongoClient(host, port)
            self.db = self.client.NAME
    
        @property
        def get_counts(self):
            """获取表里的数据总数"""
            return self.db[self.table].count()
    
        def change_table(self, table):
            """切换数据库表"""
            self.table = table
    
        def set_num(self):
            """给每条数据设置唯一自增num"""
            nums = []
            datas = self.get_all()
            if datas:
                for data in datas:
                    nums.append(data['num'])
                return max(nums)
            return 0
    
        def get_new(self):
            """获取最新一条数据"""
            data = self.get_all()[-1] if self.get_all() else None
            return data
    
        def get_data(self, num):
            """获取指定num的数据"""
            datas = self.get_all()
            if datas:
                data = [i for i in datas if i['num']==num][0]
                return data
            return None
    
        def get_all(self):
            """获取全部数据"""
            # 判断表里是否有数据
            if self.get_counts != 0:
                # 先排序
                self.sort()
                datas = [i for i in self.db[self.table].find()]
                return datas
            return None
    
        def put(self, proxy):
            """
            放置代理到数据库
            """
            num = self.proxy_num() + 1
            if self.db[self.table].find_one({'proxy': proxy}):
                self.delete(proxy)
                self.db[self.table].insert({'proxy': proxy, 'num': num})
            else:
                self.db[self.table].insert({'proxy': proxy, 'num': num})
    
        def delete(self, data):
            """删除数据"""
            self.db[self.table].remove(data)
    
        def clear(self):
            """清空表"""
            self.client.drop_database(self.table)
    
        def sort(self):
            """按num键排序"""
            self.db[self.table].find().sort('num', ASCENDING)
    

    在init中初始化数据库、host、端口
    put方法向库中插入数据
    delete方法根据数据内容删除对应的数据
    值得一提的是,插入的每条数据都有一个自增的字段num
    这个自己是设置的,sort方法就是根据num来排序

    调度器
    • 添加类:判断数据库中的代理数量是否达到最大阈值,进行启动爬虫和停止爬虫;

    • 测试类:对爬取到的代理进行检测,将有效的代理放入数据库;

    • 任务类:定时对数据库中的代理进行检测,用于启动整个调度器。

    代理测试:
    代理测试的网址可以选取一个页面很小网址,这样可以占用很小的流浪,响应的速度也会有略微提升
    作者提出如果用百度作为测试网址,会出现不管代理是否成功,都会有响应200的情况,我测了一下,的确是
    作者选择http://2017.ip138.com/ic.asp这个测试ip的站点作为检测站。是否好用未知,我用的http检测站点是:http://mini.eastday.com/assets/v1/js/search_word.js

    作者用的是aiohttp这个异步请求库,是一个基于协程异步实现的库,推荐在python3.5以上环境使用

    # Schedule/tester.py
    import asyncio
    import aiohttp
    
    from Db.db import MongodbClient
    # 将配置信息放入配置文件config.py
    from config import TEST_URL
    
    class ProxyTester(object):
        test_url = TEST_URL
    
        def __init__(self):
            self._raw_proxies = None
    
        def set_raw_proxies(self, proxies):
            # 供外部添加需要测试的代理
            self._raw_proxies = proxies
            self._conn = MongodbClient()
    
        async def test_single_proxy(self, proxy):
           """
           测试一个代理,如果有效,将他放入usable-proxies
           """
            try:
                async with aiohttp.ClientSession() as session:
                    try:
                        if isinstance(proxy, bytes):
                            proxy = proxy.decode('utf-8')
                        real_proxy = 'http://' + proxy
                        print('Testing', proxy)
                        async with session.get(self.test_url, proxy=real_proxy, timeout=10) as response:
                            if response.status == 200:
                                # 请求成功,放入数据库
                                self._conn.put(proxy)
                                print('Valid proxy', proxy)
                    except Exception as e:
                        print(e)
            except Exception as e:
                print(e)
    
        def test(self):
           """
           异步测试所有代理
           """
            print('Tester is working...')
            try:
                loop = asyncio.get_event_loop()
                tasks = [self.test_single_proxy(proxy) for proxy in self._raw_proxies]
                loop.run_until_complete(asyncio.wait(tasks))
            except ValueError:
                print('Async Error')
    

    关于aiohttp的用法,大家可以参考这篇文章。也可以直接参考中文文档

    使用aiohttp的好处是,当爬虫发出请求等待网路io的时候不会一直阻塞等待,而可以继续其他的任务,比如说请求一百个网页,假设每个网页io耗时90毫秒,如果使用吧requets阻塞请求的话,耗时大概是:100*90毫秒
    如果使用aiohttp,总耗时大概是90毫秒多一点
    可以发现任务越多的话,耗时的差距也非常大

    代理测的入口是test方法:
    创建事件循环,用列表生成器产生任务tasks,异步任务注册到事件循环中

    test_single_proxy是测试单个代理的方法,注意方法前的async 关键字
    async 、await关键字只能在python3.5以上的环境中使用,让我们可以像写同步任务一样去写异步任务,有了async 关键字以后,方法/函数就不是普通方法/函数了,需要注册到事件循环loop = asyncio.get_event_loop()里执行

    添加代理

    在循环中实现,先检测库中代理数量,如果低于设定的阈值,就调用获取代理类中的方法获取代理检测并入库,如果达到阈值则break循环
    在获取代理类中通过遍历CrawlFunc列表拿到获取代理的方法,调用类中的get_raw_proxies拿到代理,然后通过测试类中的set_raw_proxies设置好要测试的代理,在事件循环注册测试入库

    # Schedule/adder.py
    from Db.db import MongodbClient
    from Spider.get_proxy import ProxyGetter
    from .tester import ProxyTester
    
    
    class PoolAdder(object):
        """
        启动爬虫,添加代理到数据库中
        """
    
        def __init__(self, threshold):
            self._threshold = threshold
            self._conn = MongodbClient()
            self._tester = ProxyTester()
            self._crawler = ProxyGetter()
    
        def is_over_threshold(self):
            """
            判断数据库中代理数量是否达到设定阈值
            """
            return True if self._conn.get_nums >= self._threshold else False
    
        def add_to_pool(self):
            """
            补充代理
            """
            print('PoolAdder is working...')
            proxy_count = 0
            while not self.is_over_threshold():
                # 迭代所有的爬虫,元类给ProxyGetter的两个方法
                # __CrawlFuncCount__是爬虫数量,__CrawlFunc__是爬虫方法
                for callback_label in range(self._crawler.__CrawlFuncCount__):
                    callback = self._crawler.__CrawlFunc__[callback_label]
                    # 调用ProxyGetter()方法进行抓取代理
                    raw_proxies = self._crawler.get_raw_proxies(callback)
                    # 调用方法测试爬取到的代理
                    self._tester.set_raw_proxies(raw_proxies)
                    self._tester.test()
                    proxy_count += len(raw_proxies)
                    if self.is_over_threshold():
                        print('Proxy is enough, waiting to be used...')
                        break
                if proxy_count == 0:
                    print('The proxy source is exhausted.')
    
    调度器定时实现

    调度器中两个定时任务,用进程来实现(协程配合进程比较简单些),一个是定时取部分代理调用测试类检测,一个是定时检测数据库中的代理是否低于最低阈值,调用添加类添加。

    # Schedulr/schedule.py
    import time
    from multiprocessing import Process
    
    from ProxyPool.db import MongodbClient
    from .tester import ProxyTester
    from .adder import PoolAdder
    from config import VALID_CHECK_CYCLE, POOL_LEN_CHECK_CYCLE \
            POOL_LOWER_THRESHOLD, POOL_UPPER_THRESHOLD
    
    class Schedule(object):
    
        @staticmethod
        def valid_proxy(cycle=VALID_CHECK_CYCLE):
            """
            从数据库中拿到一半代理进行检查
            """
            conn = MongodbClient()
            tester = ProxyTester()
            while True:
                print('Refreshing ip...')
                # 调用数据库,从左边开始拿到一半代理
                count = int(0.5 * conn.get_nums)
                if count == 0:
                    print('Waiting for adding...')
                    time.sleep(cycle)
                    continue
                raw_proxies = conn.get(count)
                tester.set_raw_proxies(raw_proxies)
                tester.test()
                time.sleep(cycle)
    
        @staticmethod
        def check_pool(lower_threshold=POOL_LOWER_THRESHOLD,
                    upper_threshold=POOL_UPPER_THRESHOLD,
                    cycle=POOL_LEN_CHECK_CYCLE):
            """
            如果代理数量少于最低阈值,添加代理
            """
            conn = MongodbClient()
            adder = PoolAdder(upper_threshold)
            while True:
                if conn.get_nums < lower_threshold:
                    adder.add_to_pool()
                time.sleep(cycle)
    
        def run(self):
            print('Ip Processing running...')
            valid_process = Process(target=Schedule.valid_proxy)
            check_process = Process(target=Schedule.check_pool)
            valid_process.start()
            check_process.start()
    

    阈值和两个调度周期在config中:

    # config.py
    # Pool 的低阈值和高阈值
    POOL_LOWER_THRESHOLD = 10
    POOL_UPPER_THRESHOLD = 40
    
    # 两个调度进程的周期
    VALID_CHECK_CYCLE = 600
    POOL_LEN_CHECK_CYCLE = 20
    

    valid_proxy周期的从数据库中左侧(为什么是左侧,详情查看db.py源码,在db中实现了)获取一半的代理然后检测
    check_pool方法周期的检测数据库中的代理数量,少于阈值则调用add_to_pool方法补充

    API

    接口部分用flask实现,比较简单,就不列举出来了
    以下是程序入口文件:

    # run.py
    from Api.api import app
    from Schedule.schedule import Schedule
    
    
    def main():
        # 任务类的两个周期进程就是整个调度器
        s = Schedule()
        s.run()
        app.run()
    
    
    if __name__ == '__main__':
        main()
    

    项目地址

    作者的项目github
    感兴趣的童鞋赶紧下载下来好好调试一下吧

    相关文章

      网友评论

          本文标题:读轮子之免费代理池搭建

          本文链接:https://www.haomeiwen.com/subject/nydwgftx.html