美文网首页
简易爬虫框架(一)

简易爬虫框架(一)

作者: 浅浅的笑意 | 来源:发表于2017-11-11 20:33 被阅读0次

Hello,大家好。上周末比较忙,没时间更新文章。这周就写一点其他的吧。


其实也不算框架吧,只能算简单的封装了一些东西(大佬手下留情<( ̄3 ̄)> ),同时加了RedismongoDB来分别作为任务队列以及存储请求的过程(本来打算作为url过滤,不过不清楚怎么写了,就先把能跑的东西放上来吧)。

设计

没有那么夸张,就是普通的流程图。


简陋设计

这个Request是自己写的一个自定义类(取名太随意了O(∩_∩)O哈哈~)。主要就是先封装一个Request对象,里面包含了如下内容:

  • url
  • headers
  • proxy
  • downloader
  • pipeline, etc.

新创建的Request对象将通过初始化传递给Manger并序列化为数据存放到相关IDRedis队列中,然后正式启动爬虫的进程。一旦管理器启动,就会进入循坏迭代,不断从队列中取出数据并反序列化为Request对象。

取出对象后,就判断它的类别,带有自带下载器则采用自带的下载器,进行请求。(通常这些类别也同时拥有回调函数,这样请求结果直接进入回调函数进行分析并生成新的Request对象)。

以上为大致流程。


自定义Request类

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
 @File       : request.py
 @Time       : 2017/10/21 0021 17:25
 @Author     : Empty Chan
 @Contact    : chen19941018@gmail.com
 @Description: 自定义Reuqest类
"""
import json
import click
from hashlib import md5
import time
import datetime
from utils import ua
from downloader import HttpDownloader


class Request(object):
    """
    请求类
    """
    def __init__(self, url, name, title=None,
                 folder=None, headers=None, callback=None, pipeline=None,
                 downloader=None,
                 category=None, proxy=None):
        """
        初始化
        :param url: 请求链接
        :param name: 请求名称,用来作为存放的文件夹名称以及mongo的集合名
        :param title: 存放到mongo的信息
        :param folder: 存放image或者video的子文件夹名称或者存放text的文件名
        :param headers: 请求头
        :param callback: 回调函数
        :param pipeline: 处理管道
        :param downloader: 下载器
        :param category: 类别,定义在工具类中,作为mongo的集合名
        :param proxy: 代理
        """
        super(object, self).__init__()
        self.name = name
        self.category = category
        self.url = url
        headers_temp = {"User-Agent": ua.random}
        if headers:
            headers_temp.update(headers)
        r = md5()
        __id = '{url}+{headers}'.format(url=url, headers=headers_temp)
        r.update(__id.encode('utf-8'))
        self.id = r.hexdigest()
        self.title = title
        self.folder = folder
        self.headers = headers_temp
        self.pipeline = pipeline
        if not downloader:
            downloader = HttpDownloader
        self.downloader = downloader
        self.callback = callback
        self.proxy = proxy

    def __call__(self, *args, **kwargs):
        """
        存放到mongo
        :param args: 位置参数
        :param kwargs: 命名参数
        :return:
        """
        return {"_id": self.id,
                "name": self.name,
                "url": self.url,
                "title": self.title,
                "folder": self.folder,
                "category": self.category,
                "date": datetime.datetime.utcnow(),
                "timestamp": time.time() * 1000}

    def to_dict(self):
        """
        序列化
        :return:
        """
        return {"name": self.name,
                "url": self.url,
                "title": self.title,
                "headers": self.headers,
                "folder": self.folder,
                "pipeline": self.pipeline,
                "category": self.category,
                "downloader": self.downloader,
                "callback": self.callback,
                "proxy": self.proxy}

    @staticmethod
    def from_dict(di):
        """
        反序列化
        :param di: 从redis取出的序列化的数据
        :return:
        """
        cb = di['callback']
        # import dictionary
        # callback = dictionary.TASK[cb]
        name = di['name']
        title = di['title'] if di['title'] else None
        headers = di['headers'] if di['headers'] else None
        folder = di['folder'] if di['folder'] else None
        proxy = di['proxy'] if di['proxy'] else None
        return Request(url=di['url'],
                       name=name,
                       title=title,
                       headers=headers,
                       folder=folder,
                       pipeline=di['pipeline'],
                       category=di['category'],
                       downloader=di['downloader'],
                       callback=cb,
                       proxy=proxy)

Manager类

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
 @File       : manager.py
 @Time       : 2017/11/5 0005 11:33
 @Author     : Empty Chan
 @Contact    : chen19941018@gmail.com
 @Description: 主要的管理类
"""
import json

import click
import time
from request import Request
import grequests
from log_util import Log
from redis_util import rdb
from retrying import retry
from db_store import mongo_map
from utils import TEXT, INDEX, IMAGE, VIDEO, NEXT, DETAIL
import pickle as cPickle
import time


def exception_handler(requests, exception):
    """
    grequests的错误异常处理
    :param requests: 请求链接
    :param exception: 异常信息
    :return: None
    """
    click.echo(exception)


class Manager(object):
    """
    管理器
    """
    def __init__(self, start_request: Request):
        """
        初始化
        :param start_request: 初始化的自定义request
        """
        super().__init__()
        self.rdb = rdb
        self.start_request = start_request
        self.append_spider(start_request)
        self.task_list = []  # 辅助并发请求
        self.logger = Log(name='Manager')
        self.req_count = 0
        self.count = 0

    def append_spider(self, req):
        """
        添加到redis
        :param req: 自定义request
        :return: None
        """
        temp = cPickle.dumps(req.to_dict(), protocol=-1)
        self.rdb.rpush(self.start_request.id, temp)

    @retry(stop_max_attempt_number=3, wait_random_min=0, wait_random_max=200)
    def __request(self, spiders: list):
        """
        并发批量请求,用于image和video
        :param spiders: 自定义request集合
        :return: 请求的数据集合
        """
        url_list = []
        self.logger.info('start batch request!')
        for url in spiders:
            if url.proxy:
                url_list.append(grequests.get(url.url, headers=url.headers, proxies=url.proxy, timeout=10))
            else:
                url_list.append(grequests.get(url.url, headers=url.headers, timeout=10))
        self.logger.info('all complete!')
        return grequests.map(url_list, exception_handler=exception_handler)

    def handle(self, spider: Request):
        """
        redis中取出的request处理
        :param spider: 自定义的request
        :return: None
        """
        gallery = mongo_map(spider.name)
        self.req_count += 1
        if self.req_count == 50:
            time.sleep(3)
            self.req_count = 0
        if spider.category == IMAGE \
                or spider.category == VIDEO:
            retry_list = []
            if not gallery.find_one({"_id": spider.id}):
                gallery.insert_one(spider())
            if not spider.pipeline.exist(spider.pipeline, spider):
                self.task_list.append(spider)
            if 20 >= len(self.task_list) >= 10 or self.count < 10:  # 并发处理
                res_list = self.__request(self.task_list)
                for i, res in enumerate(res_list):
                    if not self.task_list[i].pipeline.store(self.task_list[i].pipeline, data=res, spider=self.task_list[i]):
                        retry_list.append(spider)
                self.task_list.clear()
                self.task_list.extend(retry_list)
                retry_list.clear()
        elif spider.category == INDEX \
                or spider.category == NEXT \
                or spider.category == DETAIL:
            # start = time.time()
            res = spider.downloader.request(spider.downloader, spider=spider)
            # end = time.time()
            # click.echo('request consume %s' % str(end - start))
            spider.pipeline.store(spider.pipeline, data=res, spider=spider)
            # start = time.time()
            # click.echo('store consume %s' % str(start - end))
            if spider.callback:
                result = spider.callback(res, spider)
                for sp in result:
                    if not gallery.find_one({"_id": sp.id}):
                        gallery.insert_one(sp())
                    self.append_spider(sp)
        elif spider.category == TEXT:
            res = spider.downloader.request(spider.downloader, spider=spider)
            spider.pipeline.store(spider.pipeline, data=res, spider=spider)

    def run(self):
        """
        运行
        :return: None
        """
        self.count = self.rdb.llen(self.start_request.id)
        while self.count:
            spider = None
            try:
                click.echo('start spider.....')
                start = time.time()
                temp = self.rdb.lpop(self.start_request.id)
                by = cPickle.loads(temp)
                spider = Request.from_dict(by)
                end = time.time()
                click.echo('get spider consume %s' % str(end - start))
                self.handle(spider)
                click.echo('handle complete!')
            except Exception as e:
                # time.sleep(0.1)
                # if spider:
                #     self.append_spider(spider)
                raise e
            self.count = self.rdb.llen(self.start_request.id)

Pipeline类

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
 @File       : pipeline.py
 @Time       : 2017/10/21 0021 16:31
 @Author     : Empty Chan
 @Contact    : chen19941018@gmail.com
 @Description: 文件处理管道
"""
import json
import click
import os
from requests import Response
from utils import TEXT, IMAGE, VIDEO
from request import Request
import abc


class BasePipeline(abc.ABC):
    def __init__(self):
        pass

    @abc.abstractmethod
    def store(self, data: Response, spider: Request) -> bool:
        pass

    @abc.abstractmethod
    def exist(self, spider: Request) -> bool:
        pass


class FilePipeline(BasePipeline):
    """
    文件处理管道
    """
    def __init__(self):
        super().__init__()

    def store(self, data: Response, spider: Request) -> bool:
        """
        储存文件
        :param data: requests返回的数据
        :param spider: 请求的自定义request
        :return: 存储是否成功
        """
        if not data:
            click.echo('data is None')
            return False
        main_folder = str(spider.name).lower()
        if not os.path.exists('../{0}/{1}'.format(spider.category, main_folder)):
            os.mkdir('../{0}/{1}'.format(spider.category, main_folder))
        if spider.category == IMAGE:
            query = '../{0}/{1}/{2}/{3}.jpg'.format(spider.category, main_folder, spider.folder, spider.id)
            with open(query, mode='wb') as f:
                f.write(data.content)
                click.echo("save %s in %s" % (spider.category, query))
                click.echo("save %s===>>>%s" % (spider.category, spider.url))
        elif spider.category == TEXT:
            query = '../{0}/{1}/{2}.txt'.format(spider.category, main_folder, spider.folder)
            if spider.callback:
                result = spider.callback(data, spider)
                with open(query, mode='w', encoding='utf-8') as f:
                    f.writelines(result.get('title'))
                    f.writelines(result.get('content'))
                    click.echo("save %s in %s" % (spider.category, query))
                    click.echo("save %s===>>>%s" % (spider.category, spider.url))
        return True

    def exist(self, spider: Request) -> bool:
        """
        判断文件是否存在
        :param spider: 请求的自定义request
        :return: 文件是否存在
        """
        main_folder = str(spider.name).lower()
        query = None
        if spider.category == IMAGE:
            query = '../{0}/{1}/{2}/{3}.jpg'.format(spider.category, main_folder, spider.folder, spider.id)
        elif spider.category == TEXT:
            query = '../{0}/{1}/{2}.txt'.format(spider.category, main_folder, spider.folder)
        if not query:
            return False
        if os.path.exists(query):
            return True
        return False


class FolderPipeline(BasePipeline):
    """
    文件夹处理
    """
    def __init__(self):
        super().__init__()

    def store(self, data: Response, spider: Request) -> bool:
        """
        文件夹处理
        :param data: requests返回的数据
        :param spider: 请求的自定义request
        :return: 文件夹创建是否成功
        """
        click.echo("***************")
        click.echo(spider.id)
        click.echo(spider.name)
        click.echo(spider.url)
        main_folder = str(spider.name).lower()
        query = '.'
        if spider.category == TEXT or spider.category == IMAGE:
            query = '../{0}/{1}'.format(spider.category, main_folder)
        if not os.path.exists(query):
            os.mkdir(query)
        if spider.category == IMAGE:
            if not os.path.exists('%s/%s' % (query, spider.folder)):
                os.mkdir('%s/%s' % (query, spider.folder))
                click.echo(' create folder=>>> %s/%s' % (query, spider.folder))
            click.echo("@@@@@@@@@@@@@@@")
        return True

    def exist(self, spider: Request) -> bool:
        return False


class ConsolePipeline(BasePipeline):
    """
    控制台输出
    """
    def __init__(self):
        super().__init__()

    def store(self, data: Response, spider: Request) -> bool:
        click.echo("***************")
        click.echo(spider.id)
        click.echo(spider.name)
        click.echo(spider.url)
        click.echo("@@@@@@@@@@@@@@@")
        return True

    def exist(self, spider: Request) -> bool:
        return False

还有一个Callback没有介绍,作为下回和具体的实例相结合说明一下,而且其中的一些东西写得不算好,希望后期能够完善一下。放上Github地址。
一同庆祝单身节!!程序猿怎么可能有女朋友,不可能的不可能的,滑稽。
下周见!!

下一篇

相关文章

网友评论

      本文标题:简易爬虫框架(一)

      本文链接:https://www.haomeiwen.com/subject/vshgmxtx.html