通过上一篇文章Python爬虫实战-使用Scrapy框架爬取土巴兔(二)我们创建了工程目录与完成了基本配置。接下来就要开始做中间件的编写。
该系列其它文章:
- Python爬虫实战-使用Scrapy框架爬取土巴兔(一)
- Python爬虫实战-使用Scrapy框架爬取土巴兔(二)
- Python爬虫实战-使用Scrapy框架爬取土巴兔(四)
- Python爬虫实战-使用Scrapy框架爬取土巴兔(五)
该篇文章主要讲Scrapy中自定义中间件(MIDDLEWARES)的作用与代码编写。
一.下载器中间件(Downloader Middleware)
1.下载器中间件的作用
下载器中间件是介于Scrapy的request/response处理的钩子框架。 是用于全局修改Scrapy request和response的一个轻量、底层的系统。
2.下载器中间件的配置
要使用自定义的下载器中间件,需要加入到settings.py文件的DOWNLOADER_MIDDLEWARES配置中:
DOWNLOADER_MIDDLEWARES = {
'msic.scrapy.middlewares.CustomUserAgentMiddleware': 2,
'tubatu.middlewares.RedirectionMiddleware': 998,
}
if USE_PROXY:
DOWNLOADER_MIDDLEWARES['msic.scrapy.middlewares.CustomHttpProxyMiddleware'] = 1
DOWNLOADER_MIDDLEWARES['msic.scrapy.middlewares.CatchExceptionMiddleware'] = 999
该配置是一个字典类型,键为中间件类的路径,值为其中间件的顺序,值越小越先被调用。
3.下载器中间件的编写
爬取土巴兔网站时,土巴兔对IP会有限制,如果一段时间内同一ip访问请求过多。服务端的http响应会直接返回503的error code。所以我们必须编写一些下载器中间件来应对这样的限制。同时我们也需要自己建立一个ip代理池来为爬虫中的http请求维护可用的代理ip。
- CustomUserAgentMiddleware:在每个http请求的请求头中添加User-Agent。
- CustomHttpProxyMiddleware:在每个http请求的请求头中添加代理ip,使得Scrapy的下载器在下载网页数据时都是通过代理IP来下载。
- CatchExceptionMiddleware:如果一个http请求出错或失败了,那么在ip代理池中添加一次该代理ip失败的记录。
import random
from msic.common import log, agents
from msic.proxy.proxy_pool import proxy_pool
class CatchExceptionMiddleware(object):
def process_response(self, request, response, spider):
if response.status < 200 or response.status >= 400:
try:
proxy_pool.add_failed_time(request.meta['proxy'].replace('http://', ''))
except KeyError:
pass
return response
def process_exception(self, request, exception, spider):
try:
proxy_pool.add_failed_time(request.meta['proxy'].replace('http://', ''))
except Exception:
pass
class CustomHttpProxyMiddleware(object):
def process_request(self, request, spider):
try:
request.meta['proxy'] = "http://%s" % proxy_pool.random_choice_proxy()
except Exception as e:
log.error(e)
class CustomUserAgentMiddleware(object):
def process_request(self, request, spider):
agent = random.choice(agents.AGENTS_ALL)
request.headers['User-Agent'] = agent
- RedirectionMiddleware:作用主要是添加一种容错机制,如果我们的代理全部都失效了,http请求数据频繁返回503错误,那么我们直接关闭scrapy,不再去做爬取的动作了。
from scrapy import Spider
class RedirectionMiddleware(object):
ERROR_COUNT = 0
def process_response(self, request, response, spider: Spider):
if response.status == 302 or response.status == 503:
self.ERROR_COUNT += 1
print('错误次数%s' % self.ERROR_COUNT)
if self.ERROR_COUNT > 100:
spider.close(spider, 'http status error')
return response
def process_exception(self, request, exception, spider):
pass
下载器中间件主要是覆写三个回调方法:
- process_request(request, spider):当每个request通过下载中间件时,该方法被调用。
- process_response(request, response, spider):当每个request返回response通过下载中间件时,该方法被调用。
- process_exception(request, exception, spider):当下载处理器或 process_request()抛出异常(包括 IgnoreRequest 异常)时,该方法被调用。
二.Spider中间件(Spider Middleware)
1.Spider中间件的作用
Spider中间件是介入到Scrapy的spider处理机制的钩子框架,通过它来处理发送给Spiders的response及spider产生的item和request。
2.Spider中间件的配置
要使用自定义的下载器中间件,需要加入到settings.py文件的SPIDER_MIDDLEWARES配置中:
SPIDER_MIDDLEWARES = {
'myproject.middlewares.CustomSpiderMiddleware': 543,
}
该配置同样是一个字典类型,键为中间件类的路径,值为其中间件的顺序,值越小越先被调用。
由于工程中没有使用到,所以不做过多说明,有疑问可以参考官方文档。
三.IP代理池
不光是土巴兔,很多网站都会都爬虫做自己的限制。限制在一定时间内访问请求过多的IP,所以我们不得不使用ip代理池来保证我们的爬虫能够长时间运作。
代理池的运行机制:
我们在爬虫启动时先去获取最新的ip代理。并将ip都存入到数据库中。所以我们在数据库中会存一份ip列表如图1。在scpray在爬取网站数据时http请求可以根据自定义的算法将数据库中可靠的ip做为代理,不能使用本机的ip直接访问目标网站,否则爬取了一段时间你的ip就被封了。
如果你的代理ip质量足够稳定,那么你可能不用在这上面多花心思。但如果我们用的代理ip不够稳定,那么我们就要强化我们的代理池。
- 首先,获取代理池获取ip是一个定时任务,每过一段时间检查数据库中ip数量是否小于预设的最少ip数,如果小于则自动抓取ip,保证代理池中ip充足。
- 其次,如果使用某个代理ip在http请求过程中请求失败了,那么在数据库中标记一次它的失败。当某个ip失败次数到预设的最大失败次数时,就把该ip从数据库中删除。每次删除ip都要检查,如果删除ip后数据库中ip数小于预设的最小ip数量就需要再次抓取ip来补充代理池。
- 最后,我们也要开启一个定时任务来运行代理池ip自检机制,这个自己机制就是通过代理ip来ping数据量很小的网站。如果ping失败了则直接删除该ip。
下面放上关键代码
代理IP实体类:
from msic.common import utils
class Proxy(object):
def __init__(self):
self.ip = ''
self.response_speed = -1
self.validity = False
self.origin = ''
self.create_time = ''
self.update_time = ''
self.failed_count = 0
@staticmethod
def create(ip, origin):
proxy = Proxy()
proxy.ip = ip
proxy.origin = origin
proxy.create_time = utils.get_utc_time() # 格式: 2017-03-18T06:16:26.887Z
proxy.update_time = proxy.create_time
proxy.failed_count = 0
proxy.response_speed = -1
proxy.validity = False
return proxy
启动爬虫任务
def start(self):
#抓取ip代理任务
self.crawl_proxy_task(False)
def task():
self.check_ip_availability_task()
schedule = Scheduler()
# 代理池IP自检,60分钟运行一次
schedule.every(60).minutes.do(self.check_ip_availability_task)
while True:
schedule.run_pending()
time.sleep(1)
thread = threading.Thread(target=task)
thread.start()
抓取IP代理
def crawl_proxy_task(self, check_num: bool = True):
if check_num:
count = self.collection.count()
#如果数据库中IP数大于最小ip数则不抓取
if count > MIN_PROXY_COUNT:
return
utils.log("开始抓取代理")
#具体抓取逻辑
proxy_list = proxy_strategy.crawl_proxy()
utils.log("开始保存")
for proxy in proxy_list:
if not self.collection.find_one({'ip': proxy.ip}):
self.collection.insert_one(proxy.__dict__)
utils.log('保存了:' + proxy.ip)
utils.log("保存结束")
代理池自检
def check_ip_availability_task(self):
#redis获取上次自检时间,如果未达到设定时间则不在检查
last_check_time = self.redis_client.get(REDIS_KEY_LAST_CHECK_IP_TIME)
now_time = datetime.utcnow().timestamp()
if last_check_time is not None and (now_time - float(last_check_time)) < (TASK_INTERVAL * 60):
return
self.redis_client.set(REDIS_KEY_LAST_CHECK_IP_TIME, now_time)
proxy_list = self.collection.find()
for proxy in proxy_list:
ip = proxy['ip']
start_time = time.time()
response = utils.http_request('http://lwons.com/wx', timeout=10)
is_success = response.status_code == 200
response.close()
if not is_success:
#如果请求失败,直接删除IP
try:
self.collection.delete_one({'ip': ip})
except:
pass
utils.log('Check ip %s FAILED' % ip)
else:
#如果请求成功,在数据库中记录该ip最后响应的时间,下次取ip时优先取出使用
elapsed = round(time.time() - start_time, 4)
try:
self.collection.update_one({'ip': ip},
{"$set": {'update_time': utils.get_utc_time(), 'response_speed': elapsed, 'validity': True}})
except:
pass
utils.log('Check ip %s SUCCESS' % ip)
http请求失败,在数据库中处理请求失败的IP
def add_failed_time(self, ip):
proxy = self.collection.find_one({'ip': ip})
if proxy is not None:
failed_count = proxy['failed_count'] + 1
utils.log("ip: %s 失败次数+1 已失败次数%s次" % (ip, failed_count))
if failed_count <= FAILED_COUNT_BORDER:
#如果未达到最大失败次数,则在数据库中添加一次失败
try:
self.collection.update_one({'ip': ip}, {"$set": {'update_time': utils.get_utc_time(), 'failed_count': failed_count}})
except:
pass
else:
#达到最大失败次数,则在数据库中删除
try:
self.collection.delete_one({'ip': ip})
except:
pass
#检查数据库中IP是否足够
self.crawl_proxy_task()
Scrapy的中间件在取出代理池中IP时,优先取出失败次数少,最近http响应成功的有效IP
def random_choice_proxy(self) -> str:
proxy = self.collection.find().sort(
[("failed_count", pymongo.ASCENDING), ("validity", pymongo.DESCENDING), ("response_speed", pymongo.ASCENDING),
("update_time", pymongo.DESCENDING)])
return proxy[0]['ip']
最后
爬虫的下载中间件和代理池都已创建完毕,接下来就要编写具体的爬取规则Python爬虫实战-使用Scrapy框架爬取土巴兔(四)。
附:
详细的项目工程在Github中,如果觉得还不错的话记得Star哦。
网友评论