代理池

作者: 乐小Pi孩_VoV | 来源:发表于2016-08-04 16:52 被阅读32次

1.首先创建一个获取代理ip的类,这里取名为ProxyPool。

class ProxyPool:
    def get_soup(self, url):
        pass
    def get_youdaili(self):
        pass

这个ProxyPool类中有两个方法:

  • get_soup(self,url)
    这个方法除了接受本身参数之外,还接受一个url参数(网址),返回一个美味的soup对象。
def get_soup(self, url):
        resp = requests.get(url)
        if resp.status_code == 200:
            resp.encoding = "utf-8"
            soup = BeautifulSoup(resp.text, "lxml")
            return soup
  • get_youdaili(self)
    这个方法不用额外的参数,它使用方法本身里的数据,在这里是优代理网站的地址。经过对该方法的调用,网站中提供的ip会被逐个地添加到数据库中。
    def get_youdaili(self):
        soup = self.get_soup('http://www.youdaili.net/Daili/')
        a_tag = soup.select('div.newslist_body > ul > li > a')
        for i in a_tag:
            url = i.get('href')
            ip_re = re.compile(r'((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{2,5})@([a-zA-Z0-9]{4,7}))')
            soup = self.get_soup(url)
            ips = ip_re.findall(soup.text)
            page_tag = soup.select('ul.pagelist > li > a')
            if page_tag:
                page = re.search(r'\d', page_tag[0].get_text()).group()
                page = int(page)
            else:
                page = 1
            if page >= 2:  # 如果有第二页就继续爬取
                for i in range(2, page + 1):
                    soup_sub = self.get_soup(url[:-5] + "_" + str(i) + ".html")
                    ips += ip_re.findall(soup_sub.text)
            if ips:
                for i in ips:
                    try:
                        proxy_pool.insert_one({
                            'ip_port': i[1],
                            'protocol': i[2].lower(),
                            'update_time': int(time.time())
                        })
                    except pymongo.errors.DuplicateKeyError as ex:
                        pass
  • 数据库 proxy 以及数据库表单 proxy_pool 的建立
client = pymongo.MongoClient("localhost", 27017)
proxy = client['proxy']
proxy_pool = proxy['proxy_pool']
proxy_pool.ensure_index('ip_port', unique=True)  # 如果有重复的ip 写进去 会报错

2.接着创建一个检测代理质量的类,这里取名为ProxyCheck。

class ProxyCheck:
    ip_port_all = [(i['ip_port'], i['protocol']) for i in proxy_pool.find()]  # 查询,获取所有ip

    def remove_ip(self, ip_port):
        pass

    def get_status(self, ip_port, protocol):
        pass

    def check(self):
        pass

这个类中有三个方法:

  • remove_ip(self, ip_port)
    这个方法需要一个ip_port参数(如1.255.53.81:80)),查询数据库后,若找到含有该参数的pymongo对象,他会给该对象添加一个为None的speed属性,随后判断该对象的更新时间是否大于一周,并删除大于一周的对象。
def remove_ip(self, ip_port): # 如果没能成功响应,将执行次方法,将其响应速度设置为空并且判断存在时间是否超过一周
        ip_data = proxy_pool.find({'ip_port': ip_port}) # <pymongo.cursor.Cursor object at 0x042D8FF0>
        proxy_pool.update_one({'ip_port': ip_port}, {'$set': {'speed': None}})
        if int(time.time()) - ip_data[0]['update_time'] > 604800:  # time.time()是指1970纪元后经过的浮点秒数
            proxy_pool.remove({'ip_port': ip_port})
  • get_status(self, ip_port, protocol)
    该函数需要接受2个参数,如('1.255.53.81:80', 'http'),并试图用该代理参数去访问一个正常的页面,若成功便将反应时间更新到speed属性,同时更新update属性,否者就调用remove_ip()方法来删除该对象。

    def get_status(self, ip_port, protocol):
        url = "http://fz.58.com/"
        proxies = {"http": protocol + "://" + ip_port}
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36",
        }
        time1 = time.clock()  # 以浮点数计算的秒数返回当前的CPU时间,用来衡量不同程序的耗时
        try: # 使用代理常常容易出错
            resp = requests.get(url, headers=headers, proxies=proxies, timeout=6)
        except Exception as ex:
            print(ex)
            return self.remove_ip(ip_port)
        time2 = time.clock()
        time_result = time2 - time1 # 计算响应时间
        if resp.status_code == 200:
            print(ip_port)
            proxy_pool.update_one({"ip_port": ip_port},
                                  {'$set': {'speed': time_result, 'update_time': int(time.time())}})
        else:
            self.remove_ip(ip_port)
  • check()
    开启多线程进行检测
    def check(self):
        pool = Pool(20)
        for i in self.ip_port_all:
            if i[1] == 'http':
                pool.apply_async(self.get_status, args=i)
        pool.close()
        pool.join()

3.if name=='main'启动部分:

if __name__ == "__main__":
    if len(sys.argv) > 1:  # 接收第一个参数,第一个参数为脚本运行的间隔时间
        time_sleep = int(sys.argv[1])
    else:
        time_sleep = 60 * 60
    while (True):
        pp = ProxyPool()
        pp.get_youdaili()
        pc = ProxyCheck()
        pc.check()
        time.sleep(time_sleep)

全篇代码如下:

# coding:utf-8
# 因为网络上的代理毕竟是有限的,所以希望大家不要滥用

import re
import requests
import time
import pymongo
import sys
from bs4 import BeautifulSoup
from multiprocessing.dummy import Pool

client = pymongo.MongoClient("localhost", 27017)
proxy = client['proxy']
proxy_pool = proxy['proxy_pool']
proxy_pool.ensure_index('ip_port', unique=True)  # 如果有重复的ip 写进去 会报错


class ProxyPool:  # 获取代理ip的类
    def get_soup(self, url):
        resp = requests.get(url)
        if resp.status_code == 200:
            resp.encoding = "utf-8"
            soup = BeautifulSoup(resp.text, "lxml")
            return soup

    def get_youdaili(self):
        soup = self.get_soup("http://www.youdaili.net/Daili/")
        a_tag = soup.select("div.newslist_body > ul > li > a")
        for i in a_tag:
            url = i.get('href')
            ip_re = re.compile(r'((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{2,5})@([a-zA-Z0-9]{4,7}))')
            soup = self.get_soup(url)
            ips = ip_re.findall(soup.text)
            page_tag = soup.select("ul.pagelist > li > a")  # 是否还有第二页
            if page_tag:
                page = re.search(r"\d", page_tag[0].get_text()).group()
                page = int(page)
            else:
                page = 1
            if page >= 2:  # 如果有第二页就继续爬取
                for i in range(2, page + 1):
                    soup_sub = self.get_soup(url[:-5] + "_" + str(i) + ".html")
                    ips += ip_re.findall(soup_sub.text)
            if ips:
                for i in ips:
                    try: # 数据库不允许插入相同的ip,如果有相同的,这里将会报错,所以加个try
                        proxy_pool.insert_one({
                            'ip_port': i[1],
                            'protocol': i[2].lower(), # 协议
                            'update_time': int(time.time()) # 抓取时的时间
                        })
                    except pymongo.errors.DuplicateKeyError as ex:
                        pass
            print(url)


class ProxyCheck:
    ip_port_all = [(i['ip_port'], i['protocol']) for i in proxy_pool.find()] # 查询,获取所有ip

    def remove_ip(self, ip_port): # 如果没能成功响应,将执行次方法,将其响应速度设置为空并且判断存在时间是否超过一周
        ip_data = proxy_pool.find({'ip_port': ip_port})
        proxy_pool.update_one({'ip_port': ip_port}, {'$set': {'speed': None}})
        if int(time.time()) - ip_data[0]['update_time'] > 604800:
            proxy_pool.remove({'ip_port': ip_port})

    def get_status(self, ip_port, protocol):
        url = "http://fz.58.com/"
        proxies = {"http": protocol + "://" + ip_port}
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36",
        }
        time1 = time.clock()
        try: # 使用代理常常容易出错
            resp = requests.get(url, headers=headers, proxies=proxies, timeout=6)
        except Exception as ex:
            print(ex)
            return self.remove_ip(ip_port)
        time2 = time.clock()
        time_result = time2 - time1 # 计算响应时间
        if resp.status_code == 200:
            print(ip_port)
            proxy_pool.update_one({"ip_port": ip_port},
                                  {'$set': {'speed': time_result, 'update_time': int(time.time())}})
        else:
            self.remove_ip(ip_port)

    def check(self): # 开启多线程进行检测
        pool = Pool(20)
        for i in self.ip_port_all:
            if i[1] == 'http':
                pool.apply_async(self.get_status, args=i)
        pool.close()
        pool.join()


if __name__ == "__main__":
    if len(sys.argv) > 1:  # 接收第一个参数,第一个参数为脚本运行的间隔时间
        time_sleep = int(sys.argv[1])
    else:
        time_sleep = 60 * 60
    while (True):
        pp = ProxyPool()
        pp.get_youdaili()
        pc = ProxyCheck()
        pc.check()
        time.sleep(time_sleep)

相关文章

  • day72 - 代理

    免费代理代理池 付费代理

  • 对代理池搭建的一些理解

    搭建代理池的用处有许多,爬虫是使用代理池较多的一种。 尝试过搭建简单的代理池,记录一下,搭建过程。 代理池需要如下...

  • 爬虫(2)--- 构建简单代理IP池

    目录 1. 何为代理IP池?2. 代理IP池构建2.1 浏览器伪装2.2 代理IP爬取2.3 代理IP验证2.4 ...

  • 代理池

    代理池的作用 解决短时间内频繁爬取统一网站导致IP封锁的情况。具体工作机制:从各大代理网站抓取免费IP —— 去重...

  • 代理池

    1.首先创建一个获取代理ip的类,这里取名为ProxyPool。 这个ProxyPool类中有两个方法: get_...

  • 线程池的使用

    1.创建线程池代理者,由于创建线程池所需参数过多,因此封装在代理者中创建,需要拿到线程池对象时,找其代理者即可 2...

  • asyncio实现代理池

    从一个代理池讲起? 搞爬虫的一般都有自己的代理池,代理池的结构一般分为抓取模块,存储模块,检测模块,api模块。抓...

  • 如何给自己搭建一个爬虫代理IP池?

    本文关键词:爬虫代理IP池,稳定的爬虫代理ip,搭建代理ip池 在这篇文章之前, 应该不少人都看过很多搭建代理ip...

  • 维护动态代理池

    代理池要求 多站抓取,异步检测 定时筛选,持续更新 提供接口,易于提取 代理池架构 获取器: 从各大网站上获取代理...

  • Redis中zadd方法参数变化

    搭建代理池的时候遇到的坑。 redis-py 3.0之前写法: redis-py 3.0之后写法: 代理池搭建参考...

网友评论

    本文标题:代理池

    本文链接:https://www.haomeiwen.com/subject/fuocsttx.html