爬虫分析之数据存储——基于MySQL,Scrapy

作者: FesonX | 来源:发表于2017-10-07 11:06 被阅读232次

    上一篇->爬虫练习之数据整理——基于Pandas
    上上篇->爬虫练习之数据清洗——基于Pandas

    配置MySql

    关于MySQL在Ubuntu的Pycharm上的配置,可以参考这篇文章中的第三部分

    Mac安装mysql及终端操作mysql与pycharm的数据库可视化

    如果上面的步骤处理完毕后找不到你新建的数据库, 可以参照下图配置

    勾选要显示的Schemas(数据库集合)

    数据存储需要用到pymysql模块, 在File->Settings中找到如图的设置页面,点击加号搜索pymysql并安装

    如何存储

    在开始考虑如何存储之前, 我们需要考虑一个问题, 数据存储应该是什么时候要做的事.
    假设你已经了解过Scrapy框架, 下面是来自官网对item pipeline的典型应用

    • 清理数据
    • 验证爬取的数据(检查item包含某些字段)
    • 查重(并丢弃)
    • 将爬取结果保存到数据库中

    另请参阅官方文档>Item Pipeline

    我们要实现的数据存储, 先来试一试能否成功吧

    # 你可以参考以下代码编写自己的pipeline
    import pymysql
    
    class jobCrawlerPipeline(object):
        def process_item(self, item, spider):
            '''
            将爬取的信息保存到mysql
            :param item:
            :param spider:
            :return: item
            '''
            # Get data from item
            job_name = item['job_name']
            company = item['company']
            address = item['address']
            salary = item['salary']
            time = item['time']
    
            # Connecting with local database, change the value if not the same
            db = pymysql.connect(
                host='localhost',
                user='root',
                passwd='1320',
                db='scrapyDB',
                charset='utf8',
                cursorclass=pymysql.cursors.DictCursor)
            try:
                # open the cursor
                cursor = db.cursor()
                sql = 'INSERT INTO tb_job(job_name,company,address,salary,time)' \
                      'VALUES ("%s", "%s", "%s", "%s", "%s")' % (job_name,company,address,salary,time)
                # execute the sql
                cursor.execute(sql)
                db.commit()
            finally:
                # close the connection
                db.close()
            return item
    

    爬虫尚未结束, 但是通过终端, 我们知道该停下爬虫了.


    爬取中... 存储在MySQL的信息

    重新回到爬虫项目的思路

    思考整个爬虫项目的流程, 应该是这样

    抓取信息->清理信息->整理信息->存储信息->分析信息

    数据整理

    而上面的存储信息虽然已经成功了一部分,但是薪资信息仍需要整理,更重要的是爬取的信息中没有明确的id, 如何在后续中加入topSalary, bottomSalary 等整理后才有的信息与之对应呢?

    重新审视Item Pipeline的典型应用, 我们能不能在Pipeline上实现整理,清理, 验证或是丢弃呢?

    分析item中的项目, 整理和验证可能是最容易实现的部分
    我们先把整理功能实现并验证是否成功, 在class jobCrawlerPipeline(object):中添加下面这个方法.用于把爬取下来的工资数据进行整理,关于这个方法的实现,请参考前一篇爬虫练习之数据整理——基于Pandas

        class jobCrawlerPipeline(object):
    
        def cut_word(self, word, method):
            if method == 'bottom':
                length = len(word)
                if (word.find('万') == -1):
                    if (word.find('以下') != -1):
                        # XX千以下
                        postion = word.find('以下')
                        bottomSalary = str(word[:(postion - 5)])
                    elif (word.find('以上') != -1):
                        postion = word.find('以上')
                        bottomSalary = str(float(word[:postion - 5]))
                    else:
                        # XX千/月
                        postion = word.find('-')
                        bottomSalary = str(float(word[:(postion)]))
                else:
                    if (word.find('年') == -1):
                        if (word.find('以下') != -1):
                            # XX万以下
                            postion = word.find('以下')
                            bottomSalary = str(float(word[:(postion - 5)]) * 10)
                        elif (word.find('以上') != -1):
                            # XX万以上
                            postion = word.find('以上')
                            bottomSalary = str(float(word[:postion - 5]) * 10)
                        elif (word.find('+') != -1):
                            # XX万+
                            postion = word.find('+')
                            bottomSalary = str(float(word[:(postion)]) * 10)
                        else:
                            # XX万/月
                            postion = word.find('-')
                            bottomSalary = str(float(word[:(postion)]) * 10)
    
                    else:
                        if (word.find('以下') != -1):
                            # XX万以下/年
                            postion = word.find('以下')
                            bottomSalary = str(float(word[:(postion - 5)]) / 1.2)
                        elif (word.find('以上') != -1):
                            postion = word.find('以上')
                            bottomSalary = str(float(word[:postion - 5]) / 1.2)
                        elif (word.find('+') != -1):
                            # XX万+
                            postion = word.find('+')
                            bottomSalary = str(float(word[:(postion)]) / 1.2)
                        else:
                            # XX万/年
                            postion = word.find('-')
                            bottomSalary = word[:(postion)]
                            bottomSalary = str(float(bottomSalary) / 1.2)
                return bottomSalary
    
            if method == 'top':
                length = len(word)
                if (word.find('万') == -1):
                    if (word.find('以下') != -1):
                        # XX千以下
                        postion = word.find('以下')
                        topSalary = str(float(word[:(postion - 5)]))
                    elif (word.find('以上') != -1):
                        postion = word.find('以上')
                        topSalary = str(float(word[:postion - 5]))
                    else:
                        # XX千/月
                        postion = word.find('-')
                        topSalary = str(float(word[(postion + 1):(length - 11)]))
                else:
                    if (word.find('年') == -1):
                        if (word.find('以下') != -1):
                            # XX万以下
                            postion = word.find('以下')
                            topSalary = str(float(word[:(postion - 5)]) * 10)
                        elif (word.find('以上') != -1):
                            # XX万以上
                            postion = word.find('以上')
                            topSalary = str(float(word[:postion - 5]) * 10)
                        else:
                            # XX万/月
                            postion = word.find('-')
                            topSalary = str(float(word[(postion + 1):(length - 11)]) * 10)
    
                    else:
                        if (word.find('以下') != -1):
                            # XX万以下/年
                            postion = word.find('以下')
                            topSalary = str(float(word[:(postion - 5)]) / 1.2)
                        elif (word.find('以上') != -1):
                            # XX万以上一年
                            postion = word.find('以上')
                            topSalary = str(float(word[:postion - 5]) / 1.2)
                        elif (word.find('+') != -1):
                            # XX万+
                            postion = word.find('+')
                            topSalary = str(float(word[:(postion)]) / 1.2)
                        else:
                            # XX万/年
                            postion = word.find('-')
                            topSalary = word[(postion + 1):(length - 11)]
                            topSalary = str(int(topSalary) / 1.2)
                return topSalary
    

    如果你看了上面的代码, 你可能发现与前一篇有些许不同, 最主要的差别就是字符串数组切片的位置发生了改变.
    为什么要改呢?

    因为这是Python的编码坑啊

    通过观察终端的输出,可以看到爬下来尚未存储的数据是以unicode的形式存在,这个时候是5个字节一个中文
    因此看到下面截图中的salary,可以判断要得到薪资的底薪和顶薪,需要剔除掉11个字节

    爬取数据中

    数据清洗

    至此,数据的基本处理已经合并到Pipeline中,鉴于可能还有脏数据在item中,我们在Pipeline的process_item方法中加入相应的代码
    这段代码应当加在处理数据之前,减少一些系统开销

    # Get data from item
            job_name = item['job_name']
            salary = item['salary']
    
            dirty_job_name = re.compile(r'(\*|在家|试用|体验|无需|无须|试玩|红包)+')
            dirty_salary = re.compile(r'(小时|天)+')
    
            # clean dirty data
            if(dirty_job_name.search(str(job_name))):
                raise DropItem("Dirty data %s" % item)
            if(dirty_salary.search(str(salary))):
                raise DropItem("Dirty data %s" % item)
            if(salary == None):
                raise DropItem("Dirty data %s" % item)
    

    数据存储

    把清洗并整理完毕的数据进行数据存储

    建立数据库的相关MySql语句是

    CREATE DATABASE IF NOT EXISTS scrapyDB DEFAULT CHARACTER SET utf8;
    
    CREATE TABLE IF NOT EXISTS `tb_job`(
      `job_id` bigint NOT NULL AUTO_INCREMENT,
      `job_name` varchar(50) NOT NULL,
      `company` varchar(50) NOT NULL,
      `address` varchar(50) NOT NULL,
      `bottom_salary` varchar(10) NOT NULL,
      `top_salary` varchar(10) NOT NULL,
      `salary` varchar(15) NOT NULL,
      `time` varchar(10) NOT NULL,
      PRIMARY KEY (`job_id`),
      UNIQUE KEY `unique_info`(`job_name`, `company`, `address`)
      );
    

    这里实现的思路不止一种

    Solution 1 在process_item中直接将处理完的item保存到数据库中

    实际测试的时候发现保存下来的数据除了job_name字段外, 其他中文字段全部变成Unicode码, 原因不明. 大家如果成功用这种方法实现了, 不妨在留言区告知一下, 毕竟第二种方法多了文件IO的开销, 耗时会比较大

    Solution 2 在爬取结束之后再进行数据库写入操作

    爬取结束后, 用pandas模块的csv读取函数打开爬取完毕的csv文件, 写入数据库

    Attention!

    以上两种方法的commit()建议在全部插入后一次commit完成
    必须在close_spider方法中关闭数据库
    若使用第一种方法, 建议在open_spider中实现数据库初始化工作, 而不是每执行一次process_item进行一次打开关闭数据库

    写入数据库

    参考代码

    # Function1
    def open_spider(self, spider):
            self.conn = pymysql.connect(
                host='localhost',
                user='root',
                passwd='mysql',
                db='scrapyDB',
                charset='utf8',
                cursorclass=pymysql.cursors.DictCursor)
    
    def close_spider(self, spider):
            try:
                # open the cursor
                self.cursor = self.conn.cursor()
    
                # get data from csv file
                # reload data
                f = open(r'job.csv', 'r')
                f.close()
                job_info = pandas.read_csv(r'job.csv', iterator=True,chunksize=1,
                                           header=None,names=
                                           ['job_name','company','address','bottom_salary','top_salary','salary','time'])
    
                # store data
                for i, job in enumerate (job_info):
                    # use -1 or ' ' to fill NAN
                    job = job.fillna({'job_name':'','company':'','address':'','time':''})
                    job = job.fillna(-1)
                    # transform series to list type
                    job = job.values[0]
    
                    sql = 'INSERT INTO tb_job(job_name,company,address,bottom_salary,top_salary,salary,time)' \
                          'VALUES ("%s", "%s", "%s", "%s", "%s", "%s", "%s")' % (
                          job[6], job[2], job[3], job[1], job[5], job[0], job[6])
                    self.cursor.execute(sql)
                self.conn.commit()
    
            finally:
                # close the connection
                self.conn.close()
    
    # Function2 
    # 未将打开关闭数据库拆分出来, 请自行修改
    db = pymysql.connect(
                host='localhost',
                user='root',
                passwd='mysql',
                db='scrapyDB',
                charset='utf8',
                cursorclass=pymysql.cursors.DictCursor)
            try:
                # open the cursor
                cursor = db.cursor()
                sql = 'INSERT INTO tb_job(job_name,company,address,bottom_salary,top_salary,salary,time)' \
                      'VALUES ("%s", "%s", "%s", "%s", "%s", "%s", "%s")' % (job_name,item['company'],item['address'],item['bottomSalary'],item['topSalary'],item['salary'],item['time'])
                # execute the sql
                cursor.execute(sql)
                db.commit()
            finally:
                # close the connection
                db.close()
    

    最终的数据库代码中, 暂时删除了unique_info索引, 原因是当前只需要尚不需要进行增量爬取. 使用unique_info索引后, 如果遇到重复的数据将直接RollBack, 而我们是在最后才一次性commit的, 这样肯定不行
    就需要增加开销去每插入一条数据提交一次
    后续将对这个问题进行处理, 敬请期待

    相关文章

      网友评论

        本文标题:爬虫分析之数据存储——基于MySQL,Scrapy

        本文链接:https://www.haomeiwen.com/subject/efkwjxtx.html