- Redis多节点来存储代理池
- 哈希存储键为代理并且值为时间戳,判断代理过期时间
- 测试URL来测试代理的可用性,设置了5s超时不入代理池。
- 定期更新代理池,如果代理池中的代理数量小于设定的数量,会添加新的代理,如果代理池中的代理数量大于设定的数量,会删除一些代理(需完善)
- 在主程序中使用一个循环来打印代理池的大小。
import redis
import requests
import time
import threading
class ProxyPool:
def __init__(self, redis_nodes, test_url, test_timeout=5, pool_size=100):
self.redis_nodes = redis_nodes
self.test_url = test_url
self.test_timeout = test_timeout
self.pool_size = pool_size
self.pool_key = 'proxy_pool'
self.redis_conn = redis.RedisCluster(startup_nodes=self.redis_nodes)
self.update_interval = 60
self.update_thread = threading.Thread(target=self.update_pool)
self.update_thread.daemon = True
self.update_thread.start()
def update_pool(self):
while True:
proxies = self.get_proxies()
if len(proxies) < self.pool_size:
self.add_proxies(self.pool_size - len(proxies))
else:
self.remove_proxies(len(proxies) - self.pool_size)
time.sleep(self.update_interval)
def get_proxies(self):
proxies = {}
for proxy, timestamp in self.redis_conn.hgetall(self.pool_key).items():
if time.time() - float(timestamp) < 120:
proxies[proxy.decode()] = float(timestamp)
return proxies
def add_proxies(self, count):
while count > 0:
proxy = self.get_proxy()
if proxy:
self.redis_conn.hset(self.pool_key, proxy, time.time())
count -= 1
else:
break
def remove_proxies(self, count):
proxies = self.get_proxies()
for proxy in sorted(proxies, key=proxies.get, reverse=True)[:count]:
self.redis_conn.hdel(self.pool_key, proxy)
def get_proxy(self):
response = requests.get(self.test_url, timeout=self.test_timeout)
if response.status_code == 200:
return response.text.strip()
if __name__ == '__main__':
redis_nodes = [{'host': 'localhost', 'port': 6379}]
test_url = 'http://httpbin.org/ip'
pool_size = 100
proxy_pool = ProxyPool(redis_nodes, test_url, pool_size=pool_size)
while True:
proxies = proxy_pool.get_proxies()
print(f'Proxy pool size: {len(proxies)}/{pool_size}')
time.sleep(10)
网友评论