$ conda install scrapy # 安装
$ scrapy startproject test # 新建
$ scrapy crawl test # 运行
-
test
-
items.py
数据类型
-
pipelines.py
连接数据库
setting.py
spiders/test.py
# test/items.py
import scrapy
class TestItem(scrapy.Item): # 数据类型
name = scrapy.Field()
pass
# test/pipelines.py
import pymongo
class TestPipeline(object):
def __init__(self): # 连接mongodb
client = pymongo.MongqClient('mongodb://localhost:27017')
self.db = client['testdb']
self.col = self.db['test']
def process_item(self, item, spider):
self.col.insert_one(dict(item)) # 爬取的内容插入数据库
# return item
# test/setting.py
ITEM_PIPELINES = { 'test.pipelines.TestPipeline': 300 }
ROBOTSTXT_OBEY = False
CONCURRENT_REQUESTS = 1
# test/spiders/test.py
import scrapy
from test.items import TestItem;
class Test(scrapy.Spider):
name = 'test' # 爬虫名
host = 'https://www.test.com' # 目标网站
keyword = 'test' # 关键词
page = 1
def start_requests(self): # 起始页面
start_url = 'https://www.test.com/search/{}/{}'.format(self.keyword, self.page)
yield scrapy.Request(url=start_url, callback=self.parse)
def parse(self, response): # 解析列表
linka = response.css('.test ul li') # 爬取列表元素
for item in linka: # 爬取所有a链接
name = item.css('a::text').extract_first()
link = self.host + item.css('a::attr(href)').extract_first()
yield scrapy.Request(link, callback=self.parsePage)
if(len(list(linka.extract())) == 15):
self.page += 1
nextLink = 'https://www.test.com/search/{}/{}'.format(self.keyword, self.page)
yield scrapy.Request(nextLink, callback=self.parse)
def parsePage(self, response): # 解析页面
name = response.css('div.name').extract_first()
item = TestItem()
item['name'] = name
yield item # 爬取的内容以item形式返回迭代
网友评论