背景
之前实现了一个版本的代理IP池:爬虫代理IP池的实现。整个过程和设计思路都是我自己想出来的,实际用下来效果还可以,但是有一些不是很如意的地方,比如说:
- 用起来很麻烦。如果你要在其他程序里使用这个代理代码,你需要不断复制粘贴proxy.py,同时Python环境要支持sqlite3
- 刚初始化的时候IP的质量还不错,但是如果隔一天在跑,原本在数据库里的IP很多都不能用了(但是仍会被标记为有效的),这样就会导致第二天用的时候前几个IP质量很差。
- IP质量的鉴别,我后来才知道有时候IP不能用可能只是临时的,应该再给它一次机会。
代理是爬虫必不可少的一部分,自然一个能够高效、稳定提供高质量的代理IP池就是爬虫工作的刚需了。我就开始想,一个“代理IP池”的理想状态究竟应该是什么样?它至少要满足两个条件:
- 用起来要方便,最好在代码的任何地方,只要调用一个接口就行,没有任何限制条件。
- 这个代理池要稳定,每时每刻都要有合理数量的有效IP在池里。
后来我就想不如把它做成一个类似于“服务”的程序:
- 正好手头有一个AWS的EC2,装了Ubuntu 16.04.2 LTS, Python 3.5, 可以当作服务器用用
- 程序维护一个大概20个左右的IP池, 起一个crontab,保证这个程序不会挂
- 程序采用多进程:
- 获取IP,如果要从各种网站抓,那这里可能要再分配几个线程;我这里直接用代理网站的接口,一条足以。
- 校验IP池里IP的有效性,这个要多几条线程,线程的数量取决于校验的速度(异步IO?)。
- 提供IP的接口,这里我本来想做个socket服务端,后来看网上说可以用Flask做成一个轻量的web服务器,之后用GET请求就可以了。
- 定时清理没用的IP
- Web端再做一个app,用来返回请求的IP地址,供判断代理是否生效。
代码细节
如何评估IP的质量
之前的版本对IP的评估过于草率,因为一个如果一个IP一次校验的时候失败了,这并不代表这个IP是无效的,它可能只是这次请求没成功,下次说不定就成功了。因此,最快能想到的评估IP质量的方法就是计分,每个IP都有一个基础分,如果校验成功则分数+1,失败则分数-1。
POOL_IP_QUALITY
配置项决定了IP的质量,它有HIGH
,MEDIUM
,LOW
几个级别,默认情况下IP质量等级是MEDIUM
。如果cleaner
发现一个IP的分数低于MEDIUM
的值,那么这个IP就会被认为是无效的,会被清除掉。IP质量级别越高,分数阈值也越高,理论上,质量级别越高,IP越难找。
webapi
会提供一个请求,返回当前进程池里分数最高的一个IP,同时这个IP的分数会-1。这样做的目的是避免每次都返回这个IP,给其他IP一个机会。
存储IP方面,我最开始是想用Redis数据库的集合Set做的,后来无意中瞄到了set下面的有序set。有序set不仅是有序的,而且还自带一个score属性,简直是为这个项目量身定做的数据类型。
其二,使用集合可以避免添加重复的IP。
其三,Redis数据库是一个独立运行的数据库,也许这就是数据持久化。
还有其四,按照一般的多进程思路,因为我使用的是一个全局Redis连接,那么多个进程竞争的话,必须要加上锁。但是我查了一下Redis的文档,上面说Redis的操作都是原子操作,所以锁就可以省去了。(我不确定这么理解对不对,我写完之后跑了大概一天,代码没出问题。)
proxy.py
主进程,会产生四条子进程。
采用Flask实现轻量级WebAPI
具体参考下文的webapi.py
补充IP到进程池 - retriever
retriever会按照配置项中RETRIEVER_INTERVAL
的大小定时检查进程池,如果发现进程池里的IP数量小于POOL_SIZE
就会从IP源请求足够的IP来填满进程池。IP源可以是自己写爬虫抓取的免费IP,也可以自己花钱买,方式不重要。
def fn_retriever():
global redis_clent
fn_name = sys._getframe().f_code.co_name
while True:
Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Routine Retriever Start.")
cnt = redis_client.zcard(Configs.REDIS_KEY)
ip_cnt_needed = Configs.POOL_SIZE - redis_client.zcard(Configs.REDIS_KEY)
if ip_cnt_needed <= 0:
Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Enough IP(s) in Redis.")
Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Entering Sleep: {0:d}s.".format(Configs.RETRIEVER_INTERVAL))
time.sleep(Configs.RETRIEVER_INTERVAL)
continue
tid = Configs.DAXIANG_RPOXY_ORDERID
url = "http://tvp.daxiangdaili.com/ip/?tid={0:s}&num={1:d}&delay=5&category=2&sortby=time&filter=on&format=json&protocol=https".format(tid, ip_cnt_needed)
try:
response = requests.get(url)
content = None
if response.status_code == requests.codes.ok:
content = response.text
except Exception as e:
print (e)
res_json = ast.literal_eval(content.strip())
if res_json:
for addr in res_json:
if addr.get('error'):
continue
redis_client.zadd(Configs.REDIS_KEY, Configs.MEDIUM, '{0:s}:{1:d}'.format(addr.get('host'), addr.get('port')))
Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Refill {0:d} IP(s) to Redis.".format(ip_cnt_needed))
校验进程池里的IP的有效性 - validator
validator会每隔VALIDATOR_INTERVAL
秒检查一下次当前进程池里分数低于POOL_IP_QUALITY
的IP,为什么是这个值?因为之后cleaner会把所有分数低于POOL_IP_QUALITY
的IP清理掉,如果IP的分数高于POOL_IP_QUALITY
,那么即使校验失败分数也不会因为低于阈值而被清理掉;同理,如果IP的分数低于POOL_IP_QUALITY
,即使校验成功仍然会因为分数低于阈值而被清理掉;只有分数等于POOL_IP_QUALITY
的IP,还有一次翻身的机会。
这里为什么不检查所有的IP?目的就是为了加速校验这个过程。当然这里也可以使用多线程或异步IO进行全局IP校验,这肯定是可以的。不过我的进程池很小,我就没这么做。当然如果进程池要很大的话,我个人推荐直接去购买相应的代理IP服务。
判断一个IP是否有效,只需要用它访问一个页面,检查返回码是否为200即可。另外,我在webapi里提供了一个/myIP
地址,访问的话,它会返回一个页面内容为请求头中Remote Address
字段的值。
def __validation(addr):
proxies = {
"http": "http://{0}".format(addr),
"https": "http://{0}".format(addr)
}
header = {}
header['user-agent'] = choice(Configs.FakeUserAgents)
try:
response = requests.get("http://52.206.77.228:5000/myIP", headers=header, proxies=proxies, timeout=5)
except Exception as e:
return False
else:
if response.status_code == requests.codes.ok:
#print (response.text)
return True
def fn_validator():
global redis_clent
fn_name = sys._getframe().f_code.co_name
while True:
# Test all ips whose score >= POOL_IP_QUALITY - 1
# False ==> score - 1
# True ==> score + 1
maxscore = redis_client.zrange(Configs.REDIS_KEY, 0, 0, desc=True, withscores=True)
if not maxscore:
Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Pool is empty. Entering Sleep: {0:d}s.".format(Configs.VALIDATOR_INTERVAL))
time.sleep(Configs.VALIDATOR_INTERVAL)
continue
maxscore = maxscore[0][1]
Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Max score in this round: {0:d}.".format(int(maxscore)))
res = redis_client.zrangebyscore(Configs.REDIS_KEY, Configs.POOL_IP_QUALITY - 1, maxscore)
Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Start to Validate {0:d} IP(s).".format(len(res)))
increment = []
i = 0
for ip in res:
n = 1 if __validation(ip.decode('utf-8')) else -1
increment.append([ip, n])
Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "[{0:d}]Validated[{1:s}], Result:[{2:d}].".format(i, ip.decode('utf-8'), n))
i += 1
for inc in increment:
redis_client.zincrby(Configs.REDIS_KEY, inc[0], inc[1])
Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Validation finished. Entering Sleep: {0:d}s.".format(Configs.VALIDATOR_INTERVAL))
time.sleep(Configs.VALIDATOR_INTERVAL)
清除进程池里无效的IP - cleaner
如上文所述,cleaner会每隔CLEANER_INTERVAL
秒清理一次进程池。
def fn_cleaner():
global redis_clent
fn_name = sys._getframe().f_code.co_name
while True:
# Remove all ips whose score < POOL_IP_QUALITY
res = redis_client.zremrangebyscore(Configs.REDIS_KEY, -1, Configs.POOL_IP_QUALITY - 1)
Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Remove {0:d} IP(s) from Redis.".format(res))
Logger.log(Configs.LOG_LEVEL_FINE, fn_name, "Entering Sleep: {0:d}s.".format(Configs.CLEANER_INTERVAL))
time.sleep(Configs.CLEANER_INTERVAL)
webapi.py - Flask
除了/myIP
, webapi还提供了一个简单的欢迎页面和/getIP
页面,后者会返回当前进程池里分数最高的IP,格式为IP:PORT
的字符串,可以直接使用,非常方便。
[图片上传失败...(image-c5b5f9-1532102907537)]
[图片上传失败...(image-e95048-1532102907537)]
from flask import Flask
from flask import request
import redis
import proxy as Proxy
import configure as Configs
app = Flask("__name__")
@app.route('/')
def welcome():
strr = '''
<h3>Welcome to IP proxy pooling</h3><br>
Access <strong>/myIP</strong> to retrieve 'remote address' header in GET request. For IP validation.<br>
Access <strong>/getIP</strong> to retrieve an Proxy IP, format: IP:PORT<br>
<p>Ethan Huang, <a href="https://journal.ethanshub.com/">https://journal.ethanshub.com/</a></p>
'''
return strr
@app.route("/myIP", methods=["GET"])
def myIP():
return request.remote_addr
@app.route("/getIP")
def getIP():
return Proxy.get_one_ip()
if __name__ == '__main__':
app.run('0.0.0.0',5000)
logger
除了核心的部分,我还顺便实现了一个logger模块,它可以接受其他模块传来的日志,并根据配置项LOG_TYPE
决定日志是作为标准输出至屏幕,还是写入文件。
日志分为三个级别LOG_LEVEL_FINE
, LOG_LEVEL_FINER
, LOG_LEVEL_FINEST
,由产生日志的模块自行选择,但是配置项LOG_LEVEL
决定输出哪个级别以该级别以上的日志,默认为LOG_LEVEL_FINE
。
import time
import configure as Configs
def log(level, module_name, content):
file = open(Configs.LOG_FILE, 'a', encoding='utf-8')
#global file
if Configs.LOG_TYPE == 0:
fn = print
elif Configs.LOG_TYPE == 1:
fn = file.write
else:
fn = print
strr = "[{0:d}][{1:s}] --- {2:12s} --- {3:s}".format(level, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), module_name, content)
if level >= Configs.LOG_LEVEL:
if fn == file.write:
strr = strr + '\n'
fn(strr)
file.close()
写完这个,我发现Python竟然有logging模块,惊了个呆。反正我这个也能用,就先用着。
configure.py
配置项
# Proxy related
DAXIANG_RPOXY_ORDERID = '大象代理的订单号'
POOL_SIZE = 10
HIGH, MEDIUM, LOW = 8, 5, 2
POOL_IP_QUALITY = MEDIUM
VALIDATOR_THREAD_NUM = 1
RETRIEVER_INTERVAL = 60
VALIDATOR_INTERVAL = 60
CLEANER_INTERVAL = 60
# Web API
API_ADDR = '0.0.0.0'
API_PORT = '5000'
# Redis
REDIS_HOST = '52.206.77.228' # 这是我的服务器的公网IP
REDIS_PORT = '6379'
REDIS_KEY = 'PROXYS'
# Logger
# 0 - Standard Output 1 - file
LOG_TYPE = 0
LOG_FILE = 'ohIPPool.log'
LOG_LEVEL_FINE = 0
LOG_LEVEL_FINER = 1
LOG_LEVEL_FINEST = 2
LOG_LEVEL = LOG_LEVEL_FINE
最后把代码部署到服务器上即可,像AWS的EC2需要额外配置安全组,其他就和本地一样了。还有一点就是Ubuntu一般会有两个甚至多个Python版本共存,执行的时候要注意使用相对应版本的python和pip。具体就不说了。
网友评论