美文网首页
scrapy异步使用Django 模型存储

scrapy异步使用Django 模型存储

作者: mutang | 来源:发表于2021-11-22 10:35 被阅读0次
    # Define your item pipelines here
    #
    # Don't forget to add your pipeline to the ITEM_PIPELINES setting
    # See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
    
    
    # useful for handling different item types with a single interface
    import asyncio
    
    from product.items import CategoryItem, SubCategoryItem, SKUItem
    from control.models import Category, SubCategory, SKU
    from concurrent.futures import ThreadPoolExecutor
    
    
    class ProductPipeline:
        # 使用异步存储的时候,使用sqlite3会报错,因为sqlite3是单线程的,我们是一个线程池对象,并发存储会被sqlite3拒绝(database was locked)
        # 创建事件循环对象
        loop = asyncio.get_event_loop()
        # 创建线程池
        executor = ThreadPoolExecutor()
        # 任务队列
        tasks = []
        counter = {'cate': 0, 'sub_cate': 0, 'sku': 0}
    
        async def process_item(self, item, spider):
            # 存在则更新 get_update
            print(item)
            if isinstance(item, CategoryItem):
                return self.process_category_item(item, spider)
            elif isinstance(item, SubCategoryItem):
                return self.process_sub_category_item(item, spider)
            else:
                return self.process_sku_item(item, spider)
    
        def process_category_item(self, item, spider):
            '''将保存数据的处理方法加入到任务队列'''
            self.counter['cate'] += 1
            task = self.loop.run_in_executor(self.executor, self.executor_func(Category, item), )
            self.tasks.append(task)
            return item
    
        def process_sub_category_item(self, item, spider):
            '''将保存数据的处理方法加入到任务队列'''
            self.counter['sub_cate'] += 1
            task = self.loop.run_in_executor(self.executor, self.executor_func(SubCategory, item), )
            self.tasks.append(task)
            return item
    
        def process_sku_item(self, item, spider):
            '''将保存数据的处理方法加入到任务队列'''
            self.counter['sku'] += 1
            task = self.loop.run_in_executor(self.executor, self.executor_func(SKU, item), )
            self.tasks.append(task)
            return item
    
        @staticmethod
        def executor_func(model, item):
            '''主要作用是将有参数的函数转换为无参数的函数返回,方便run_in_executor方法调用,这个方法它只接受位置传参,不接受关键字传参'''
    
            def func():
                if isinstance(item, CategoryItem):
                    return model.objects.get_or_create(defaults=item['familyID'], **item)  # 一般是create
                elif isinstance(item, SubCategoryItem):
                    d = dict(item)
                    try:
                        category = Category.objects.get(familyID=d.pop('category'))
                    except Category.DoesNotExist:
                        print('未添加进去 -----', d)
                    else:
                        return model.objects.get_or_create(defaults=d['sub_cate_id'], category=category, **d)
                else:
                    d = dict(item)
                    try:
                        sub_cate = SubCategory.objects.get(sub_cate=d.pop('sub_cate'))
                    except SubCategory.DoesNotExist:
                        print('未添加进去 -----', d)
                    else:
                        return model.objects.get_or_create(defaults=d['sku'], sub_cate=sub_cate, **d)
    
            return func
    
        def close_spider(self, spider):
            '''当爬虫关闭的时候调用这个方法保存数据'''
            print(self.counter)
            self.loop.run_until_complete(asyncio.wait(self.tasks))
    
    

    以上代码有问题,下级模型往往找不到上级,暂时贴出来

    正确的,虽然不够优雅,但能实现,能保存。不需要scrapy中item,在spider.py中yield 字典

    # Define your item pipelines here
    #
    # Don't forget to add your pipeline to the ITEM_PIPELINES setting
    # See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
    
    
    # useful for handling different item types with a single interface
    from asgiref.sync import sync_to_async #同步变异步
    
    from control.models import Category, SubCategory, SKU
    
    
    class ProductPipeline:
        counter = {
            'cate': 0,
            'sub_cate': 0,
            'sku': 0
        }
        @sync_to_async
        def process_item(self, item, spider):
            print(item)
            try:
                cate = Category.objects.get(familyID=item['familyID'])
            except Category.DoesNotExist:
                cate = Category()
                cate.familyID = item['familyID']
                cate.familyName = item['familyName']
                cate.metricsFamilyName = item['metricsFamilyName']
                cate.url = item['url']
                cate.level = item['level']
                cate.isLeaf = item['isLeaf']
                cate.count = item['count']
                cate.save()
                self.counter['cate'] += 1
            try:
                sub_category = SubCategory.objects.get(sub_cate_id=item['sub_cate_id'])
            except SubCategory.DoesNotExist:
                sub_category = SubCategory()
                sub_category.category = cate
                sub_category.sub_cate_id = item['sub_cate_id']
                sub_category.sub_cate_name = item['sub_cate_name']
                sub_category.save()
                self.counter['sub_cate'] += 1
            # 保存
            sku = SKU(sub_cate=sub_category, sku=item['sku'])
            sku.save()
            self.counter['sku'] += 1
            print(self.counter)
            return item
    
    

    相关文章

      网友评论

          本文标题:scrapy异步使用Django 模型存储

          本文链接:https://www.haomeiwen.com/subject/baavtrtx.html