美文网首页
开源代码阅读——proxy_pool★

开源代码阅读——proxy_pool★

作者: 我只要喝点果粒橙 | 来源:发表于2022-04-04 14:38 被阅读0次

    项目链接: https://github.com/jhao104/proxy_pool

    爬虫代理IP池项目,主要功能为定时采集网上发布的免费代理验证入库,定时验证入库的代理保证代理的可用性,提供API和CLI两种使用方式。同时你也可以扩展代理源以增加代理池IP的质量和数量。

    模块组成:获取功能、存储功能、校验功能、接口管理

    :star:程序主要是启动了startServer的API接口服务、startScheduler定时服务

    • startServer->runFlask:是向外提供了通过proxyHandler来获得Redis中的proxy数据
    • startScheduler->sche.add_task(__runProxyFetch)、sche.add_task(__runProxyCheck)
      • __runProxyFetch:proxy_fetcher.run()->proxy_queue->Checker("raw", proxy_queue)获得各个代理网站的代理信息后,进行校验,校验成功则入库
      • __runProxyCheck:proxy in proxy_handler.getAll()->proxy_queue->Checker("use", proxy_queue):通过proxy_handler拿到库里所有现存的数据后,进行有效性校验,无效的则删除,有效的则更新信息

    作为存储功能的接口proxyHandler,也是两个API服务与定时服务的中介。程序也是通过存储功能,将核心的两个功能:<u>定时抓取的proxy数据</u>与<u>提供proxy数据给用户使用</u>成功联系在了一起

    • 通过元类实现单例模式:ConfigHandler,其可以在任意模块中以c = ConfigHandler()的形式获得,而不是ConfigHandler.getInstance()

    • @LazyProperty懒加载属性的装饰器: 只有用到时才会加载并将值注入到__dict__、加载一次后值就不再变化、;讲解可见:https://www.jianshu.com/p/708dc26f9b92——描述符or修饰符实现

      class LazyProperty(object):
          # 在被注解类方法被解释器运行的时候就会创建LazyProperty实例并返回
          def __init__(self, func):
              self.func = func
          """通过python描述符来实现"""
          def __get__(self, instance, owner):
              if instance is None:
                  return self
              else:
                  # 会将结果值通过setattr方法存入到instance对象实例的__dict__中
                  value = self.func(instance)
                  setattr(instance, self.func.__name__, value)
                  return value
      class ConfigHandler(withMetaclass(Singleton)):
          # 返回一个LazyProperty实例 
          @LazyProperty
          def serverHost(self):
              return os.environ.get("HOST", setting.HOST)
      c = ConfigHandler()
      # 会触发ConfigHandler.__dict__["serverHost"], 然后接而触发LazyProperty的__get__,value = self.func(instance)会得到真正serverHost函数的值后将其设置在ConfigHandler instance对象的__dict__中,由于对象的__dict__["serverHost"]=value优先级高于类的__dict__["serverHost"]=LazyProperty()对象,因此之后调用得到的是value结果
      print(c.serverHost)
      

      __get__只有访问类属性的时候才会生效,这边是通过setattr将serverHost设置成了ConfigHandler的类属性

    • 封装了一个请求工具类WebRequest:

      • 增加了异常处理的功能
      • 增加了日志功能
      • 请求头会得到随机UA
      • 设置重试
    • 使用click创建子命令:

      1. 得到一个click_group
      CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
      
      @click.group(context_settings=CONTEXT_SETTINGS)
      @click.version_option(version=VERSION)
      def cli():
          """ProxyPool cli工具"""
      
      1. 指定group下的子命令
      @cli.command(name="server")
      # 还可以设置参数: @click.option('--count', default=1, help='Number of greetings.') --> def server(count)
      def server():
          """ 启动api服务 """
          click.echo(BANNER)
          startServer()
      
      if __name__ == '__main__':
          cli()
      

      然后通过bash脚本同时开启两个进程

      #!/usr/bin/env bash
      python proxyPool.py server &
      python proxyPool.py schedule
      
    • DbClient DB工厂类

      class DbClient(withMetaclass(Singleton)):
         def __init__(self, db_conn):
              self.parseDbConn(db_conn)
              self.__initDbClient()
      
          @classmethod
          def parseDbConn(cls, db_conn):
              db_conf = urlparse(db_conn)
              cls.db_type = db_conf.scheme.upper().strip()
              cls.db_host = db_conf.hostname
              cls.db_port = db_conf.port
              cls.db_user = db_conf.username
              cls.db_pwd = db_conf.password
              cls.db_name = db_conf.path[1:]
              return cls
      
          def __initDbClient(self):
              """
              init DB Client
              :return:
              """
              __type = None
              if "SSDB" == self.db_type:
                  __type = "ssdbClient"
              elif "REDIS" == self.db_type:
                  __type = "redisClient"
              else:
                  pass
              assert __type, 'type error, Not support DB type: {}'.format(self.db_type)
              self.client = getattr(__import__(__type), "%sClient" % self.db_type.title())(host=self.db_host,
      port=self.db_port,
      username=self.db_user,
      password=self.db_pwd,
      db=self.db_name)
      
    • 继承重写logging.logger,可选参数为name, level=DEBUG, stream=True, file=True,让每个功能函数都能生成单独的日志文件,并进行了可选控制。

      相比单例,日志精度更细,但也使用起来也更麻烦,需要考虑什么地方需要。

    • 提供"扩展代理"接口

      1. ProxyFetcher类中添加自定义的获取代理的静态方法, 该方法需要以生成器(yield)形式返回host:ip格式的代理

      2. 添加好方法后,修改setting.py文件中的PROXY_FETCHER项下添加自定义方法的名字:

        PROXY_FETCHER = [
            "freeProxy01",    
            "freeProxy02",
            # ....
            "freeProxyCustom1"  #  # 确保名字和你添加方法名字一致
        ]
        

        schedule 进程会每隔一段时间抓取一次代理,下次抓取时会自动识别调用你定义的方法。

      实现方式

    self.log.info("ProxyFetch : start")
    
    # 从配置中拿执行函数
    for fetch_source in self.conf.fetchers:
        # 判断ProxyFetcher中是否有定义、是否可调用
        fetcher = getattr(ProxyFetcher, fetch_source, None)
        if not fetcher:
            self.log.error("ProxyFetch - {func}: class method not exists!".format(func=fetch_source))
            continue
        if not callable(fetcher):
            self.log.error("ProxyFetch - {func}: must be class method".format(func=fetch_source))
            continue
        thread_list.append(_ThreadFetcher(fetch_source, proxy_dict))
        
    for thread in thread_list:
        thread.setDaemon(True)
        thread.start()
    for thread in thread_list:
        thread.join()
    self.log.info("ProxyFetch - all complete!")
        
    
    • Cpython(默认安装的都是Cpython)中Dict和list、tuple都是线程安全

      • 以装饰器的形式将过滤器将入到容器中

        
        class ProxyValidator(withMetaclass(Singleton)):
            pre_validator = []
            http_validator = []
            https_validator = []
        
            @classmethod
            def addPreValidator(cls, func):
                cls.pre_validator.append(func)
                return func
        
        # 实际上执行了 formatValidator=ProxyValidator.addPreValidator(formatValidator)
        # 由于addPreValidator返回了func, 所以formatValidator还是原来的addPreValidator, 但在类定义的时候ProxyValidator.pre_validator添加了formatValidator方法
        @ProxyValidator.addPreValidator
        def formatValidator(proxy):
            """检查代理格式"""
            verify_regex = r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}"
            _proxy = findall(verify_regex, proxy)
            return True if len(_proxy) == 1 and _proxy[0] == proxy else False
        
        
        class DoValidator(object):
            """ 校验执行器 """
            @classmethod
            def validator(cls, proxy):
                """
                校验入口
                Args:
                    proxy: Proxy Object
                Returns:
                    Proxy Object
                """
                http_r = cls.httpValidator(proxy)
                https_r = False if not http_r else cls.httpsValidator(proxy)
        
                proxy.check_count += 1
                proxy.last_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                proxy.last_status = True if http_r else False
                if http_r:
                    if proxy.fail_count > 0:
                        proxy.fail_count -= 1
                    proxy.https = True if https_r else False
                else:
                    proxy.fail_count += 1
                return proxy
            
            @classmethod
            def preValidator(cls, proxy):
                for func in ProxyValidator.pre_validator:
                    if not func(proxy):
                        return False
                return True
        
    • starter-banner:启动横幅

      • reflection: No、adjustment: cewnter、Stretch: Yes、width: 80
      • 还不错的font:
        • 5lineoblique——好看
        • banner3——清楚
        • bell——抽象
        • big——清晰
        • bigchief——等高线版本、艺术
        • block——块状
        • bulbhead——可爱
        • larry3d——立体3d
        • ogre——清晰
        • puffy——清晰+一点可爱
        • slant——清晰+斜体
    • 定时器框架apschedule配置:

      scheduler = BlockingScheduler(logger=scheduler_log, timezone=timezone)
      
      scheduler.add_job(__runProxyCheck, 'interval', minutes=2, id="proxy_check", name="proxy检查")
      
      executors = {
          # job_defaults中的max_instances也受限于max_workers, 所以要大于max_instances;此外max_workers也决定了同时能处理几个同时发生的task
          'default': {'type': 'threadpool', 'max_workers': 20},
          'processpool': ProcessPoolExecutor(max_workers=5)
      }
      job_defaults = {
          # 合并将所有这些错过的执行合并为一个, 默认为True。 如果是定时的存储任务的话,参数肯定不同,不能合并所以得手动设置False
          # 像本项目每隔一段时间抓取到的数据也不太一样,所以无法直接当作一次错误任务合并
          'coalesce': False,
          # 默认情况下,每个作业只允许同时运行一个实例。这意味着,如果作业即将运行,但前一次运行尚未完成,则认为最近一次运行失败。通过在添加作业时使用关键字参数,可以设置调度程序允许同时运行的特定作业的最大实例数。默认为1
          'max_instances': 10,
          # 框架会检查每个错过的执行时间,如果当前还在misfire_grace_time时间内,则会重新尝试执行任务,设高点就可以避免任务被漏掉执行。默认为1
          # "misfire_grace_time": 5  该项目未使用,而是采用了多任务实例来规避任务错过执行==>即官方给出两种方案中的另一种。任务错过信息:Run time of job "say (trigger: interval[0:00:02])" was missed by 0:00:03.010383
      }
      
      scheduler.configure(executors=executors, job_defaults=job_defaults, timezone=timezone)
      scheduler.start()
      
      

      job_defaults参数含义见官方文档

      注: 经过测试,在add_task中的func如果起了多个线程,其执行不受限于sche的配置

    • Python中如果只是使用全局变量则不需要用global声明(因为变量搜寻会由内往外),但是如果需要修改则需要用global声明,否则无法找到相应变量

    • 生成器:使用了yield关键字的函数就是生成器,生成器是一类特殊的迭代器。

      作用:

      • 处理大量数据:生成器一次返回一个结果,而不是一次返回所有结果。比如sum([i for i in range(10000000000000)])会卡机;sum(i for i in range(10000000000000))则不会
      • 代码更加简洁:可以减少变量、空间
      • 迭代器本身的作用

      yield关键字有两点作用:

      保存当前运行状态(断点),然后暂停执行,即将生成器(函数)挂起;可以使用next()函数让生成器从断点处继续执行,即唤醒生成器(函数)
      将yield关键字后面表达式的值作为返回值返回,此时可以理解为起到了return的作用

      def __runProxyFetch():     
          for proxy in proxy_fetcher.run():
              proxy_queue.put(proxy)
              
      class Fetcher(object):
          name = "fetcher"
          def run(self):
              # ...
              # 相比使用生成推导式 return [p for p in proxy_dict.values() if DoValidator.preValidator(p.proxy)], 使用yield生成器可以节省空间
              for _ in proxy_dict.values():
                      if DoValidator.preValidator(_.proxy):
                          yield _
      
    • 应用部署:

      ①对apk换源;②设置时区

      FROM python:3.6-alpine
      # ..
      # apk repository
      RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.ustc.edu.cn/g' /etc/apk/repositories
      # timezone
      RUN apk add -U tzdata && cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && apk del tzdata
      # ...
      ENTRYPOINT [ "sh", "start.sh" ]
      
      

      docker-compose.yml: 镜像还没编译好的情况。(如果自己改了功能并启用的话,需要用这种;或者自己发布镜像后用后一种)

      version: '2'
      services:
        proxy_pool:
          build: .
          container_name: proxy_pool
          ports:
            - "5010:5010"
          links:
            - proxy_redis
          environment:
            DB_CONN: "redis://@proxy_redis:6379/0"
        proxy_redis:
          image: "redis"
          container_name: proxy_redis
      

      docker-compose.yml:别人镜像已经编译好并上传

      version: "3"
      services:
        redis:
          image: redis
          expose:
            - 6379
        web:
          restart: always
          image: jhao104/proxy_pool
          environment:
            - DB_CONN=redis://redis:6379/0
          ports:
            - "5010:5010"
          depends_on:
            - redis
      

    scheduler的逻辑

    proxy_pool时序图.png

    项目目录结构默写:

    • settings: 配置文件
    • main:启动文件
    • api:提供获取proxy数据接口
    • handler:
      • loggerHandler:日志类
      • configHandler:单例的配置接口类
      • ProxyHandler: Proxy CRUD操作类
    • fetcher: 代理数据获取类
    • db:
      • dbClinet: 存储功能接口类
      • redisClient:存储功能实现类
    • helper
      • scheduler: 定时任务的定义与启动类
      • validator: proxy有效性校验类
      • check: 具体执行校验逻辑类
      • proxy: 获取的proxy数据封装类
    • utils:
      • lazyProperty: 懒加载描述器
      • singleton: 单例管理器类
      • six: python2与python3兼容类
      • webRequest: 网络请求封装类

    相关文章

      网友评论

          本文标题:开源代码阅读——proxy_pool★

          本文链接:https://www.haomeiwen.com/subject/xfucsrtx.html