记录一个免费代理池的维护,主要包含四个模块:
获取模块:主要负责从各个免费代理网站提取出最新发布的免费代理,获取到本地并解析
存储模块:负责将获取模块获取到的proxy存储至redis数据库
检测模块:负责检测redis数据库中proxy的可用代理可不可用代理,并赋以权重
调度模块:负责将获取模块、存储模块和检测模块关联,并封装
主要涉及知识点:
- 元类
- python操作redis数据库,redis库的使用
- requests库的使用
- pyquery的使用
- aiohttp异步http框架的简单使用
- 多线程和多进程
检测模块
# -*- coding: utf-8 -*-
"""
__author__ = 'bingo'
__date__ = '2019/9/7'
# code is far away from bugs with the god animal protecting
I love animals. They taste delicious.
┏┓ ┏┓
┏┛┻━━━┛┻┓
┃ ☃ ┃
┃ ┳┛ ┗┳ ┃
┃ ┻ ┃
┗━┓ ┏━┛
┃ ┗━━━┓
┃ 神兽保 ┣┓
┃ 永无BUG┏┛
┗ ┓┏━┳┓┏┛
┃┫┫ ┃┫┫
┗┻┛ ┗┻┛
"""
import random
import asyncio
import requests
import time
import redis
import aiohttp
from pyquery import PyQuery as pq
from redis import ResponseError
from requests import ConnectTimeout
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Process
from flask import Flask
# 检测模块
class CrawlTester(object):
def __init__(self):
self.test_url = "https://www.baidu.com"
self.buffer_test_size = 100
self.db_client = ProxyRedisClient()
async def test_single_proxy(self, proxy):
"""
异步方式检测单个proxy,通过访问百度,看proxy是否可用
:param proxy:
:return:
"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.109 Safari/537.36'}
async with aiohttp.ClientSession(headers=headers) as session:
for i in range(2):
try:
real_proxy = 'http://' + proxy
print("正在测试proxy【{}】".format(proxy))
async with session.get(self.test_url, proxy=real_proxy, verify_ssl=False, timeout=10) as response:
response_status = response.status
if response_status in [200,]:
print("proxy有效【{}】,设置为最大分数".format(proxy))
self.db_client.max(proxy)
break
else:
if i == 1:
print("proxy无效【{}】".format(proxy))
self.db_client.decrease(proxy)
else:
print("proxy重新测试第{}次【{}】".format(i+1, proxy))
continue
except:
if i==1:
self.db_client.decrease(proxy)
else:
print("proxy重新测试第{}次【{}】".format(i + 1, proxy))
continue
def run(self):
"""
开启检测器,每次通过异步请求方式同时检测100个proxy
:return:
"""
print("测试器开启运行")
loop = asyncio.get_event_loop()
proxies_list = self.db_client.all()
try:
for i in range(0, len(proxies_list), self.buffer_test_size):
test_proxies = proxies_list[i: i+self.buffer_test_size]
tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
loop.run_until_complete(asyncio.wait(tasks))
time.sleep(2)
except Exception as e:
print("测试器发生错误,ERROR: {}".format(e))
网友评论