美文网首页python大法攻略
Scrapy+redis分布式爬虫(二、basespider类爬

Scrapy+redis分布式爬虫(二、basespider类爬

作者: 眼君 | 来源:发表于2020-09-14 08:56 被阅读0次

编写一个basespider爬虫

在spider脚本中编写页面跳转逻辑
    打开spiders文件夹下的脚本文件,在代码头部添加一行语句, 引入一个request类,这个类的构造函数可以将类中的参数url交给scrapy进行下载, 然后通过另一个参数callback设置回调函数, 将下载后返回的响应体进行处理:
import scrapy
from urllib.parse import urljoin,urlparse
from scrapy.http import Request
from BKSpider.items import BkItemLoader,BkdoneItem

class BeikeSpider(scrapy.Spider):
    name = 'beike'
    allowed_domains = ['hz.ke.com']
    start_urls = ['http://hz.ke.com/chengjiao//']

    # 第一层获取所有区域地址
    def parse(self, response):
        parts = response.css('dl > dd > div > div > a')
        for part in parts:
            part_url = urljoin(response.url, part.css('a::attr(href)').extract_first(""))
            part_name = part.css('a::text').extract_first("")
            yield Request(url=part_url,meta={'part_name': part_name},callback=self.parse_segment,dont_filter=True)

    # 第二层获取所有街道地址
    def parse_segment(self, response):
        segments = response.css('dl:nth-child(2)  div > div:nth-child(2) > a')
        for segment in segments:
            segment_url = urljoin(response.url, segment.css('a::attr(href)').extract_first(""))
            segment_name = segment.css('a::text').extract_first("")
            yield Request(url=segment_url,meta={'segment': segment_url,'segment_name': segment_name,'part_name':response.meta.get("part_name", "")}, callback=self.parse_unity,dont_filter=True)
    我们可以考虑将URL交给Request,  Request会先将URL交给scrapy下载,响应体则通过回调函数交给其他函数处理。

目录页URL通过Request回调给parse函数解析出单个文章页面的URL, 然后再一次使用Request回调给自定义函数做进一步处理。

有时候, 我们希望往Request返回的响应体中添加其它的信息, 这种情况下我们可以使用Request的另一个参数meta={},

这些信息就会以字典的形式存储到response响应体的meta这一项中。

spider的去重机制

Requset这个函数里有个参数dont_filter需要我们特别注意一下, 这里需要了解一下scrapy的去重机制。

首先, scrapy的Request下载一个页面时,会分析URL的host是否存在于allowed_domains中,如果不存在则会直接被剔除掉;

然后,scrapy进程会维护一个指纹集合,Request下载URL对应页面前,会对URL执行“RFPDupeFilter”去重筛选(将URL处理成指纹,然后与集合匹配,如果存在,也会被剔除。)具体的去重逻辑编写在scrapy.dupefilter.py 文件中。

  scrapy用于去重的指纹集合有内存存储和磁盘存储两种方式,默认是内存存储,当爬虫程序结束时集合内容会释放;如果希望保存指纹集合,可以选择用磁盘存储方式,具体操作是,在settings.py中定义一个“JOBDIR”变量定义指纹集合存储成文本的路径,那么在进程启动时会在该路径下生成一个requests.seen文件用于存储指纹集合,当进程结束时,该文件不会被删除,而是会保留下来。

 如果某一个Request执行时不希望对该URL去重,可以设置参数dont_filter=True(默认设置为False)。

  另外,如果不定义allowed_domains或者将其置为空,进程也不会进行去重。
配置items.py文件

scrapy提供了一种类似于map的数据结构item, 用于将scrapy下载页面返回的响应体数据进行结构化。

这样便于数据在pipline阶段时,给不同的存储数据库复用。

我们可以在items.py这个脚本中定义item的具体信息:

class BkdoneItem(scrapy.Item):
    # 主键链家编号
    data_id = scrapy.Field()
    # 数据源来源
    data_source = scrapy.Field()
    # 成交日期
    done_date = scrapy.Field()

    # 数据来源链接
    detail_url = scrapy.Field()
    # 交易总价
    total_price = scrapy.Field()
    # 交易均价
    per_price = scrapy.Field()
    ## 基本信息
    # 所在街道
    segment = scrapy.Field()
    # 所在区县
    part = scrapy.Field()

    def get_sql(self):
        rows_list = ('data_id','data_source','done_date','detail_url', 'total_price', 'per_price','segment','part')
        insert_sql = "INSERT INTO src_hz_done_house%s VALUES%s;" % (str(rows_list).replace("'", ""), ('%s',) * len(rows_list))
        return insert_sql,rows_list
    ......

回到脚本文件中,将定义的items引入文件,并创建一个items实例,将对应的数据填充进去, 填充完以后可以通过yield将这个item传给pipeline这一层:

from BKSpider.items import BkItemLoader,BkdoneItem

......

def parse_detail(self, response):
    Bk_item = BkdoneItem()
    ......
    Bk_item['title'] = title
    ......
    yield Bk_item 

考虑到url一般是不定长的, scrapy为了实现不重复读取页面, 需要维护一下已下载页面的列表, 如果这个列表里保存的是原始url的话, 会占用很大的内存, 所以我们可以将原url用散列函数进行处理, 我们可以写一个common模块用于编写常用的全局函数, 第一个函数就用于URL的压缩好了:

import hashlib

def get_md5(url):
    if isinstance(url,str):
        url = url.encode("utf-8")
    m = hashlib.md5()
    m.update(url)
    return m.hexdigest()
配置pipelines.py文件

pipelines这一层用于接收item中的数据, 并最终定义数据的存储方式, 使用pipeline时需要先打开settings.py文件, 该文件中有一段默认被注释的代码需要激活。

ITEM_PIPELINES = {
    'BKSpider.pipelines.BkspiderPipeline': 300,
}
1. 图片下载配置

我们之前存储的图片url, 也可以通过设置实现图片的自动下载, 我们需要在pipelines中添加一个scrapy自带的图片下载pipeline:

import os
......
project_dir = os_path_abspath(os_path.dirname(__file__)) 
......
ITEM_PIPELINES = {
    'BKSpider.pipelines.BkspiderPipeline': 300,
    'scrapy.pipelines.images.ImagesPipeline':1
}
IMAGES_URLS_FILED = "front_image_url" //这个字段名需要和item中的字段名一致,且对应item的值需要改成列表形式
IMAGES_STORE = os_path_join(project_dir,"images")
IMAGES_MIN_HEIGHT = 100
IMAGES_MIN_WIDTH = 100

item_piplines字典中的value对应的是pipeline的执行顺序, 序号越小的管道越早执行。

images_urls_filed这个变量中存储的是item中需要自动下载的图片的URL, pipeline会从item中选择这个字段名进行自动下载。

images_store这个变量存储的是自动下载的图片存放的目录路径。

settings.py中设置的IMAGES_MIN_HEIGHT和IMAGES_MIN_WIDTH这两个变量用于过滤小图片。我们下载了图片, 但是还需要想个办法获得图片在载到本地的存储路径, 我们可以重写一个pipeline对象来实现这个功能, 在pipeline.py中重写一个继承ImagesPipeline的类:

from scrapy.pipelines.images import ImagesPipeline


class ArticleImagePipeline(ImagesPipeline):
    def item_completed(self, results, item, info):
        for ok,value in results:
            image_file_path = value["path"]
            item["front_image_path"] = image_file_path
        return item

在这个重写的ImagePipelines中, 我们可以从item_completed这个方法中拿到图片存储的路径。为了使这个类生效, 我们需要将重写的ImagePipeline在settings.py中替换掉原来的ImagePipeline:

ITEM_PIPELINES = {
    'BKSpider.pipelines.BkspiderPipeline': 300,
    'ArticleSpider.pipelines.ArticleImagePipeline':1
}
2. 将数据保存到json本地文件

scrapy提供多种格式文件存储数据, 这里我们主要介绍用json文件的存储, 这里我们主要用到scrapy.exporters里的JsonItemExporter实现:

from scrapy.exporters import JsonItemExporter


class JsonExporterPipleline(object):
    #调用scrapy提供的json export导出
    def __init__(self):
        self.file = open('articleexport.json','wb')
        self.exporter = JsonItemExporter(self.file,encoding='utf-8',ensure_ascii=False)
        self.exporter.start_exporting()
    def process_item(self, item, spider):
        self.exporter.export_item(item)
        return item
    def close_spider(self,spider):
        self.exporter.finish_exporting()
        self.file.close()

当然, 也需要在settings.py中进行相应的配置:

ITEM_PIPELINES = {
    'ArticleSpider.pipelines.JsonExporterPipleline': 2,
    'ArticleSpider.pipelines.ArticleImagePipeline':1
}
3. 将数据保存到MySQL

用数据库存储数据是比较常用的方法, 这里我们先在MySQL中根据items的数据结构建立相应的数据表, 然后我们也要确保我们的语言环境中已经安装了MySQL的驱动:

pip install mysqlclient

linux下面安装可能会报错, 这时需要先安装一些其它的包:

sudo apt-get install libmysqlclient-dev    #(Ubuntu)
sudo yum install python-devel mysql-devel   #(centOS)

这里我们介绍另一个兼容性更好的驱动:

pip install pymysql

在pipelines.py中代码如下:

import pymysql
class MysqlPipeline(object):
    def __init__(self):
        self.conn = pymysql.connect(host='',port=3306,user='',password="",db="",charset='utf8')
        self.cursor = self.conn.cursor()

    def process_item(self,item,spider):
        insert_sql = "INSERT INTO tablename(columname) VALUES(%s);" % item['title']
        self.cursor.execute(insert_sql)
        self.conn.commit()

之后也需要在settings.py中配置好。

4. MySQL优化连接池

由于爬虫爬取速度会大于数据库中插入数据的速度, 所以我们需要提高数据库的使用性能, scrapy的解决思路是使用连接池, 让爬取数据和MySQL的插入数据两个任务变成异步。我们先在settings.py中配置好MySQL的连接信息:

MYSQL_HOST = ""
MYSQL_DBNAME = ""
MYSQL_USER = ""
MYSQL_PASSWORD = ""

之后就可以在pipelines.py中添加一个新的pipeline:

import pymysql
from twisted.enterprise import adbapi

class MysqlTwistedPipeline(object):
    def __init__(self,dbpool):
        self.dbpool = dbpool
    @classmethod
    def from_settings(cls,settings):
        dbparams = dict(
            host = settings["MYSQL_HOST"],
            db = settings["MYSQL_DB"],
            user = settings["MYSQL_USER"],
            password = settings["MYSQL_PASSEORD"],
            charset = "utf8",
            cursorclass = pymysql.cursors.DictCursor,
            use_unicode = True
        )
        dbpool = adbapi.ConnectionPool(dbapiName="pymysql",**dbparams)
        return cls(dbpool)

    def process_item(self,item,spider):
        #异步处理获取和插入
        query = self.dbpool.runInteraction(self.do_insert,item)
        query.addErrback(self.handle_error,item,spider)

    def do_insert(self,cursor,item):
        #这里cursor参数直接从类中获得
        insert_sql, rows_list = item.get_sql()
        cursor.execute(insert_sql % tuple(map(item.get, rows_list)))

    def handle_error(self, failure,item,spider):
        # 处理异常
        print(failure)

最后,settings也要做配置:

ITEM_PIPELINES = {
    'BKSpider.pipelines.MysqlTwistedPipeline': 300,
}
Item loader机制

item loader的使用能提高代码的解析效率, 节约内存。我们先在spider脚本中引入item loader:

from scrapy.loader import ItemLoader

然后我们使用Item loader改写我们的parse_detail函数:

from BKSpider.items import BkItemLoader,BkdoneItem

# 第四层获取单个房屋交易明细数据
def parse_detail(self, response):
    item_loader = BkItemLoader(item=BkdoneItem(), response=response)
    # 主键-链家编号
    item_loader.add_css("data_id", "div.transaction > div.content > ul > li:nth-child(1)::text")
    # 数据源
    item_loader.add_value("data_source", 'lianjia')
    # 成交日期
    done_date = response.meta.get("unity_done_date", "").replace(".", "-")
    item_loader.add_value("done_date", done_date)

    # 数据来源链接
    item_loader.add_value("detail_url", response.url)
    # 交易总价
    item_loader.add_css("total_price", "div.price > span > i::text")
    # 交易均价
    item_loader.add_css("per_price", "div.price > b::text")
    # 基本信息
    # 所在街道
    item_loader.add_value("segment", response.meta.get("segment_name", ""))
    # 所在区县
    item_loader.add_value("part", response.meta.get("part_name", ""))
    yield item_loader.load_item()

还有两个问题:

一、上述代码实际上item中各字段对应的值都是list.

二、假如用css解析出来的数据, 我们希望先预处理以后再传入item_loader, 该怎样实现呢?

我们可以在items.py中处理:

from scrapy.loader.processors import MapCompose,TakeFirst,Join

func1(value):
    ....

func2(value):
    ....

class ZirudoneItem(scrapy.Item)::
    done_date = scrapy.Field(
        input_processor = MapCompose(func1,func2),
        output_processor = TakeFirst()
    )
    front_image_url = scrapy.Field(
        input_processor = MapCompose(func1,func2),
        output_processor = Join(',')
    )
    ......

其实每一个scrapy.Field中都有两个参数:

第一个是input_processor, 这个参数可以定义一个预处理函数, 对传入item的字段进行预处理, 我们也可以使用MapCompose, 实现对传入的数据连续使用多个函数进行预处理;

第二个是output_processor, 这个参数和TakeFirst()搭配使用, 可以使输出的值只取list中的第一个值。

另外, 还有有一个Join函数, 可以将list拼接成str, 例如tag传入的值是['a','b','c'], 则输出的值是"a,b,c"。

如果我们有很多字段, 每个都要定义这两个参数, 就会显得很冗余, 这里我们有一个技巧:

from scrapy.loader import ItemLoader
......

class ZiruItemLoader(ItemLoader):
    default_input_processor = MapCompose(pre_str)
    default_output_processor = TakeFirst()

我们可以在items中重写ItemLoader这个类, 设置output_processor的默认值, 然后在spider脚本中用重写后的ItemLoader替换之前使用的ItemLoader。

相关文章

网友评论

    本文标题:Scrapy+redis分布式爬虫(二、basespider类爬

    本文链接:https://www.haomeiwen.com/subject/cedjektx.html