# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# useful for handling different item types with a single interface
import asyncio
from product.items import CategoryItem, SubCategoryItem, SKUItem
from control.models import Category, SubCategory, SKU
from concurrent.futures import ThreadPoolExecutor
class ProductPipeline:
# 使用异步存储的时候,使用sqlite3会报错,因为sqlite3是单线程的,我们是一个线程池对象,并发存储会被sqlite3拒绝(database was locked)
# 创建事件循环对象
loop = asyncio.get_event_loop()
# 创建线程池
executor = ThreadPoolExecutor()
# 任务队列
tasks = []
counter = {'cate': 0, 'sub_cate': 0, 'sku': 0}
async def process_item(self, item, spider):
# 存在则更新 get_update
print(item)
if isinstance(item, CategoryItem):
return self.process_category_item(item, spider)
elif isinstance(item, SubCategoryItem):
return self.process_sub_category_item(item, spider)
else:
return self.process_sku_item(item, spider)
def process_category_item(self, item, spider):
'''将保存数据的处理方法加入到任务队列'''
self.counter['cate'] += 1
task = self.loop.run_in_executor(self.executor, self.executor_func(Category, item), )
self.tasks.append(task)
return item
def process_sub_category_item(self, item, spider):
'''将保存数据的处理方法加入到任务队列'''
self.counter['sub_cate'] += 1
task = self.loop.run_in_executor(self.executor, self.executor_func(SubCategory, item), )
self.tasks.append(task)
return item
def process_sku_item(self, item, spider):
'''将保存数据的处理方法加入到任务队列'''
self.counter['sku'] += 1
task = self.loop.run_in_executor(self.executor, self.executor_func(SKU, item), )
self.tasks.append(task)
return item
@staticmethod
def executor_func(model, item):
'''主要作用是将有参数的函数转换为无参数的函数返回,方便run_in_executor方法调用,这个方法它只接受位置传参,不接受关键字传参'''
def func():
if isinstance(item, CategoryItem):
return model.objects.get_or_create(defaults=item['familyID'], **item) # 一般是create
elif isinstance(item, SubCategoryItem):
d = dict(item)
try:
category = Category.objects.get(familyID=d.pop('category'))
except Category.DoesNotExist:
print('未添加进去 -----', d)
else:
return model.objects.get_or_create(defaults=d['sub_cate_id'], category=category, **d)
else:
d = dict(item)
try:
sub_cate = SubCategory.objects.get(sub_cate=d.pop('sub_cate'))
except SubCategory.DoesNotExist:
print('未添加进去 -----', d)
else:
return model.objects.get_or_create(defaults=d['sku'], sub_cate=sub_cate, **d)
return func
def close_spider(self, spider):
'''当爬虫关闭的时候调用这个方法保存数据'''
print(self.counter)
self.loop.run_until_complete(asyncio.wait(self.tasks))
以上代码有问题,下级模型往往找不到上级,暂时贴出来
正确的,虽然不够优雅,但能实现,能保存。不需要scrapy中item,在spider.py中yield 字典
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# useful for handling different item types with a single interface
from asgiref.sync import sync_to_async #同步变异步
from control.models import Category, SubCategory, SKU
class ProductPipeline:
counter = {
'cate': 0,
'sub_cate': 0,
'sku': 0
}
@sync_to_async
def process_item(self, item, spider):
print(item)
try:
cate = Category.objects.get(familyID=item['familyID'])
except Category.DoesNotExist:
cate = Category()
cate.familyID = item['familyID']
cate.familyName = item['familyName']
cate.metricsFamilyName = item['metricsFamilyName']
cate.url = item['url']
cate.level = item['level']
cate.isLeaf = item['isLeaf']
cate.count = item['count']
cate.save()
self.counter['cate'] += 1
try:
sub_category = SubCategory.objects.get(sub_cate_id=item['sub_cate_id'])
except SubCategory.DoesNotExist:
sub_category = SubCategory()
sub_category.category = cate
sub_category.sub_cate_id = item['sub_cate_id']
sub_category.sub_cate_name = item['sub_cate_name']
sub_category.save()
self.counter['sub_cate'] += 1
# 保存
sku = SKU(sub_cate=sub_category, sku=item['sku'])
sku.save()
self.counter['sku'] += 1
print(self.counter)
return item
网友评论