前几天开始的IP代理池维护今天终于要见成果了。
我们一共写了4个模块:
获取模块----------->存储模块<--------------->检测模块
|
|
|
|
V
接口模块
一共四个模块:获取模块(crawler.py)、存储模块(saver.py)、检测模块(tester.py)、接口模块(api.py),其中获取模块我用了一个getter.py来调度爬虫,将数据存到数据库。
getter.py
from saver import RedisClient
from crawler import Crawler
POOL_UPPER_THRESHOLD = 10000
class Getter():
def __init__(self):
self.redis = RedisClient()
self.crawler = Crawler()
def is_over_threshold(self):
"""判断是否达到了代理池限制"""
if self.redis.count() > POOL_UPPER_THRESHOLD:
return True
else:
return False
def run(self):
"""调度获取器"""
print('获取器开始执行')
if not self.is_over_threshold():
for callback_lable in range(self.crawler.__CrawlFuncCount__):
callback = self.crawler.__CrawlFunc__[callback_lable]
proxies = self.crawler.get_proxies(callback)
for proxy in proxies:
self.redis.add(proxy)
有了这个getter之后,我们就可以写run.py来串联所有的模块,让程序运行起来。
import time
from api import app
from getter import Getter
from tester import Tester
from multiprocessing import Process
from multiprocessing import Pool
TESTER_CYCLE = 30
GETTER_CYCLE = 30
TESTER_ENABLE = True
GETTER_ENABLE = True
API_ENABLE = True
class Scheduler():
def schedule_tester(self, cycle=TESTER_CYCLE):
"""定时检测代理"""
tester = Tester()
while True:
print('测试器开始运行')
tester.run()
time.sleep(cycle)
def schedule_getter(self, cycle=GETTER_CYCLE):
"""定时获取代理"""
getter = Getter()
while True:
print('获取器开始运行')
getter.run()
time.sleep(cycle)
def schedule_api(self):
"""开启api"""
app.run()
def run(self):
"""运行代理池"""
# 开放api
if API_ENABLE:
api_process = Process(target=self.schedule_api)
api_process.start()
# 开始循环测试
if TESTER_ENABLE:
tester_process = Process(target=self.schedule_tester)
tester_process.start()
# 开始循环获取新的代理
if GETTER_ENABLE:
getter_process = Process(target=self.schedule_getter)
getter_process.start()
if __name__ == '__main__':
p = Scheduler()
p.run()
这里我们用了多进程,让getter、tester、和api一直运行下去,getter定时调用爬虫爬取各大免费ip代理网站,tester定时检测代理的可用性。getter将的得到代理存到数据库,tester从数据库取代理检测可用性,判定分数。api一直开启,在127.0.0.1:5000端口提供随机代理。
import time
from api import app
from getter import Getter
from tester import Tester
from multiprocessing import Process
TESTER_CYCLE = 30
GETTER_CYCLE = 30
TESTER_ENABLE = True
GETTER_ENABLE = True
API_ENABLE = True
class Scheduler():
def schedule_tester(self, cycle=TESTER_CYCLE):
"""定时检测代理"""
tester = Tester()
while True:
print('测试器开始运行')
tester.run()
time.sleep(cycle)
def schedule_getter(self, cycle=GETTER_CYCLE):
"""定时获取代理"""
getter = Getter()
while True:
print('获取器开始运行')
getter.run()
time.sleep(cycle)
def schedule_api(self):
"""开启api"""
app.run()
def run(self):
"""运行代理池"""
# 开放api
if API_ENABLE:
api_process = Process(target=self.schedule_api)
api_process.start()
# 开始循环测试
if TESTER_ENABLE:
tester_process = Process(target=self.schedule_tester)
tester_process.start()
# 开始循环获取新的代理
if GETTER_ENABLE:
getter_process = Process(target=self.schedule_getter)
getter_process.start()
if __name__ == '__main__':
p = Scheduler()
p.run()
注意!运行前一定要开启数据库。
这个程序调度三个模块,让他们同时运行,我运行一段时间后,效果如下:
程序会一直运行下去,直到手动停止,或者代理池为空。
我们打开redis可视化工具:
代理一共有2500条数据,可用代理有500左右的代理分数为100,:
结果效果挺满意,我们打开浏览器,访问127.0.0.1:5000
image.png image.png每次刷新这个页面都可以得到一个新的代理。
今天刚又运行了下,又多了不少个,真好。这下不愁没代理了。不过我是拿的百度首页做测试的,用的时候还要将测试对象改成爬取的网站。
刚才拿到了华章电子书vip卡,可以用一个月,但是只能在微信里看,我看看能不能给整下来,慢慢看。
真好。
网友评论