美文网首页
实现增强版的代理IP池

实现增强版的代理IP池

作者: 小温侯 | 来源:发表于2018-07-21 00:08 被阅读216次

背景

之前实现了一个版本的代理IP池:爬虫代理IP池的实现。整个过程和设计思路都是我自己想出来的,实际用下来效果还可以,但是有一些不是很如意的地方,比如说:

  • 用起来很麻烦。如果你要在其他程序里使用这个代理代码,你需要不断复制粘贴proxy.py,同时Python环境要支持sqlite3
  • 刚初始化的时候IP的质量还不错,但是如果隔一天在跑,原本在数据库里的IP很多都不能用了(但是仍会被标记为有效的),这样就会导致第二天用的时候前几个IP质量很差。
  • IP质量的鉴别,我后来才知道有时候IP不能用可能只是临时的,应该再给它一次机会。

代理是爬虫必不可少的一部分,自然一个能够高效、稳定提供高质量的代理IP池就是爬虫工作的刚需了。我就开始想,一个“代理IP池”的理想状态究竟应该是什么样?它至少要满足两个条件:

  • 用起来要方便,最好在代码的任何地方,只要调用一个接口就行,没有任何限制条件。
  • 这个代理池要稳定,每时每刻都要有合理数量有效IP在池里。

后来我就想不如把它做成一个类似于“服务”的程序:

  • 正好手头有一个AWS的EC2,装了Ubuntu 16.04.2 LTS, Python 3.5, 可以当作服务器用用
  • 程序维护一个大概20个左右的IP池, 起一个crontab,保证这个程序不会挂
  • 程序采用多进程:
  • 获取IP,如果要从各种网站抓,那这里可能要再分配几个线程;我这里直接用代理网站的接口,一条足以。
  • 校验IP池里IP的有效性,这个要多几条线程,线程的数量取决于校验的速度(异步IO?)。
  • 提供IP的接口,这里我本来想做个socket服务端,后来看网上说可以用Flask做成一个轻量的web服务器,之后用GET请求就可以了。
  • 定时清理没用的IP
  • Web端再做一个app,用来返回请求的IP地址,供判断代理是否生效。

代码细节

如何评估IP的质量

之前的版本对IP的评估过于草率,因为一个如果一个IP一次校验的时候失败了,这并不代表这个IP是无效的,它可能只是这次请求没成功,下次说不定就成功了。因此,最快能想到的评估IP质量的方法就是计分,每个IP都有一个基础分,如果校验成功则分数+1,失败则分数-1。

POOL_IP_QUALITY配置项决定了IP的质量,它有HIGH,MEDIUM,LOW几个级别,默认情况下IP质量等级是MEDIUM。如果cleaner发现一个IP的分数低于MEDIUM的值,那么这个IP就会被认为是无效的,会被清除掉。IP质量级别越高,分数阈值也越高,理论上,质量级别越高,IP越难找。

webapi会提供一个请求,返回当前进程池里分数最高的一个IP,同时这个IP的分数会-1。这样做的目的是避免每次都返回这个IP,给其他IP一个机会。

存储IP方面,我最开始是想用Redis数据库的集合Set做的,后来无意中瞄到了set下面的有序set。有序set不仅是有序的,而且还自带一个score属性,简直是为这个项目量身定做的数据类型。

其二,使用集合可以避免添加重复的IP。

其三,Redis数据库是一个独立运行的数据库,也许这就是数据持久化

还有其四,按照一般的多进程思路,因为我使用的是一个全局Redis连接,那么多个进程竞争的话,必须要加上锁。但是我查了一下Redis的文档,上面说Redis的操作都是原子操作,所以锁就可以省去了。(我不确定这么理解对不对,我写完之后跑了大概一天,代码没出问题。)

proxy.py

主进程,会产生四条子进程。

采用Flask实现轻量级WebAPI

具体参考下文的webapi.py

补充IP到进程池 - retriever

retriever会按照配置项中RETRIEVER_INTERVAL的大小定时检查进程池,如果发现进程池里的IP数量小于POOL_SIZE就会从IP源请求足够的IP来填满进程池。IP源可以是自己写爬虫抓取的免费IP,也可以自己花钱买,方式不重要。

def fn_retriever():
    global redis_clent
    fn_name = sys._getframe().f_code.co_name
    while True:
        Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Routine Retriever Start.")
        cnt = redis_client.zcard(Configs.REDIS_KEY)
        ip_cnt_needed = Configs.POOL_SIZE - redis_client.zcard(Configs.REDIS_KEY)

        if ip_cnt_needed <= 0:
            Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Enough IP(s) in Redis.")
            Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Entering Sleep: {0:d}s.".format(Configs.RETRIEVER_INTERVAL))
            time.sleep(Configs.RETRIEVER_INTERVAL)
            continue

        tid = Configs.DAXIANG_RPOXY_ORDERID
        url = "http://tvp.daxiangdaili.com/ip/?tid={0:s}&num={1:d}&delay=5&category=2&sortby=time&filter=on&format=json&protocol=https".format(tid, ip_cnt_needed)

        try:
            response = requests.get(url)
            content = None

            if response.status_code == requests.codes.ok:
                content = response.text
        except Exception as e:
            print (e)

        res_json = ast.literal_eval(content.strip())

        if res_json:
            for addr in res_json:
                if addr.get('error'):
                    continue
                redis_client.zadd(Configs.REDIS_KEY, Configs.MEDIUM, '{0:s}:{1:d}'.format(addr.get('host'), addr.get('port')))

        Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Refill {0:d} IP(s) to Redis.".format(ip_cnt_needed))

校验进程池里的IP的有效性 - validator

validator会每隔VALIDATOR_INTERVAL秒检查一下次当前进程池里分数低于POOL_IP_QUALITY的IP,为什么是这个值?因为之后cleaner会把所有分数低于POOL_IP_QUALITY 的IP清理掉,如果IP的分数高于POOL_IP_QUALITY,那么即使校验失败分数也不会因为低于阈值而被清理掉;同理,如果IP的分数低于POOL_IP_QUALITY,即使校验成功仍然会因为分数低于阈值而被清理掉;只有分数等于POOL_IP_QUALITY的IP,还有一次翻身的机会。

这里为什么不检查所有的IP?目的就是为了加速校验这个过程。当然这里也可以使用多线程或异步IO进行全局IP校验,这肯定是可以的。不过我的进程池很小,我就没这么做。当然如果进程池要很大的话,我个人推荐直接去购买相应的代理IP服务。

判断一个IP是否有效,只需要用它访问一个页面,检查返回码是否为200即可。另外,我在webapi里提供了一个/myIP地址,访问的话,它会返回一个页面内容为请求头中Remote Address字段的值。

def __validation(addr):
    proxies = {
        "http": "http://{0}".format(addr),
        "https": "http://{0}".format(addr)
    }

    header = {}
    header['user-agent'] = choice(Configs.FakeUserAgents)

    try:
        response = requests.get("http://52.206.77.228:5000/myIP", headers=header, proxies=proxies, timeout=5)
    except Exception as e:
        return False
    else:
        if response.status_code == requests.codes.ok:
            #print (response.text)
            return True

def fn_validator():
    global redis_clent
    fn_name = sys._getframe().f_code.co_name
    while True:
        # Test all ips whose score >= POOL_IP_QUALITY - 1
        # False ==> score - 1
        # True  ==> score + 1 
        maxscore = redis_client.zrange(Configs.REDIS_KEY, 0, 0, desc=True, withscores=True)
        if not maxscore:
            Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Pool is empty. Entering Sleep: {0:d}s.".format(Configs.VALIDATOR_INTERVAL))
            time.sleep(Configs.VALIDATOR_INTERVAL)
            continue

        maxscore = maxscore[0][1]
        Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Max score in this round: {0:d}.".format(int(maxscore)))
        res = redis_client.zrangebyscore(Configs.REDIS_KEY, Configs.POOL_IP_QUALITY - 1, maxscore)
        Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Start to Validate {0:d} IP(s).".format(len(res)))

        increment = []

        i = 0
        for ip in res:
            n = 1 if __validation(ip.decode('utf-8')) else -1
            increment.append([ip, n])
            Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "[{0:d}]Validated[{1:s}], Result:[{2:d}].".format(i, ip.decode('utf-8'), n))
            i += 1
            
        for inc in increment:
            redis_client.zincrby(Configs.REDIS_KEY, inc[0], inc[1])

        Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Validation finished. Entering Sleep: {0:d}s.".format(Configs.VALIDATOR_INTERVAL))
        time.sleep(Configs.VALIDATOR_INTERVAL)

清除进程池里无效的IP - cleaner

如上文所述,cleaner会每隔CLEANER_INTERVAL秒清理一次进程池。

def fn_cleaner():
    global redis_clent
    fn_name = sys._getframe().f_code.co_name
    while True:
        # Remove all ips whose score < POOL_IP_QUALITY
        res = redis_client.zremrangebyscore(Configs.REDIS_KEY, -1, Configs.POOL_IP_QUALITY - 1)
        Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Remove {0:d} IP(s) from Redis.".format(res))
        Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Entering Sleep: {0:d}s.".format(Configs.CLEANER_INTERVAL))
        time.sleep(Configs.CLEANER_INTERVAL)

webapi.py - Flask

除了/myIP, webapi还提供了一个简单的欢迎页面和/getIP页面,后者会返回当前进程池里分数最高的IP,格式为IP:PORT的字符串,可以直接使用,非常方便。

[图片上传失败...(image-c5b5f9-1532102907537)]

[图片上传失败...(image-e95048-1532102907537)]

from flask import Flask
from flask import request
import redis

import proxy as Proxy
import configure as Configs

app = Flask("__name__")

@app.route('/')
def welcome():
    strr = '''
        <h3>Welcome to IP proxy pooling</h3><br>
        Access <strong>/myIP</strong> to retrieve 'remote address' header in GET request. For IP validation.<br>
        Access <strong>/getIP</strong> to retrieve an Proxy IP, format: IP:PORT<br>
        <p>Ethan Huang, <a href="https://journal.ethanshub.com/">https://journal.ethanshub.com/</a></p>
    '''
    return strr

@app.route("/myIP", methods=["GET"])
def myIP():
    return request.remote_addr

@app.route("/getIP")
def getIP():
    return Proxy.get_one_ip()

if __name__ == '__main__':
    app.run('0.0.0.0',5000)

logger

除了核心的部分,我还顺便实现了一个logger模块,它可以接受其他模块传来的日志,并根据配置项LOG_TYPE决定日志是作为标准输出至屏幕,还是写入文件。

日志分为三个级别LOG_LEVEL_FINE, LOG_LEVEL_FINER, LOG_LEVEL_FINEST,由产生日志的模块自行选择,但是配置项LOG_LEVEL决定输出哪个级别以该级别以上的日志,默认为LOG_LEVEL_FINE

import time
import configure as Configs

def log(level, module_name, content):
    file = open(Configs.LOG_FILE, 'a', encoding='utf-8')
    #global file
    if Configs.LOG_TYPE == 0:
        fn = print
    elif Configs.LOG_TYPE == 1:
        fn = file.write
    else:
        fn = print

    strr = "[{0:d}][{1:s}] --- {2:12s} --- {3:s}".format(level, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), module_name, content)
    
    if level >= Configs.LOG_LEVEL:
        if fn == file.write:
            strr = strr + '\n'

        fn(strr)

    file.close()

写完这个,我发现Python竟然有logging模块,惊了个呆。反正我这个也能用,就先用着。

configure.py

配置项

# Proxy related
DAXIANG_RPOXY_ORDERID = '大象代理的订单号'
POOL_SIZE = 10
HIGH, MEDIUM, LOW = 8, 5, 2
POOL_IP_QUALITY = MEDIUM
VALIDATOR_THREAD_NUM = 1
RETRIEVER_INTERVAL = 60
VALIDATOR_INTERVAL = 60
CLEANER_INTERVAL = 60

# Web API
API_ADDR = '0.0.0.0'
API_PORT = '5000'

# Redis
REDIS_HOST = '52.206.77.228' # 这是我的服务器的公网IP
REDIS_PORT = '6379'
REDIS_KEY  = 'PROXYS'

# Logger
# 0 - Standard Output 1 - file
LOG_TYPE = 0
LOG_FILE = 'ohIPPool.log'

LOG_LEVEL_FINE = 0
LOG_LEVEL_FINER = 1
LOG_LEVEL_FINEST = 2
LOG_LEVEL = LOG_LEVEL_FINE

最后把代码部署到服务器上即可,像AWS的EC2需要额外配置安全组,其他就和本地一样了。还有一点就是Ubuntu一般会有两个甚至多个Python版本共存,执行的时候要注意使用相对应版本的python和pip。具体就不说了。

相关文章

网友评论

      本文标题:实现增强版的代理IP池

      本文链接:https://www.haomeiwen.com/subject/xlycmftx.html