爬虫:IO密集型程序<=涉及很多网络IO以及本地磁盘IO操作=>消耗时间,降低效率
多线程,一定程度上提升执行效率(IO密集型)
- scrapy-redis
Scrapy-redis
是为了更方便地实现Scrapy
分布式爬取,而提供了一些以redis
为基础的组件(pip install scrapy-redis
) - 分布式策略
Master
端(核心服务器) :搭建一个Redis
数据库,不负责爬取,只负责url
指纹判重、Request
的分配,以及数据的存储。
-
_threading
:偏底层,较threading
功能有限 -
threading
=>推荐*;=_thread
+其他方法
多线程使用流程
-
threading.currentThread()
# 返回当前的线程变量 -
threading.enumerate()
# 返回一个所有正在运行的线程的列表 -
threading.activeCount()
# 返回正在运行的线程数量
具体方法:
from threading import Thread
# 线程创建、启动、回收
t = Thread(target="函数名") # 创建线程对象
t.start() # 创建并启动线程
t.join() # 阻塞等待回收线程
创建多线程:
def function_name():
# print(time.strptime()time.time())
time.sleep(1)
print(time.strftime("%Y/%m/%d %H:%M:%S"))
from threading import Thread
# 线程创建、启动、回收
t_list = []
for _ in range(5):
t = Thread(target=function_name) # 创建线程对象
t_list.append(t)
t.start() # 创建并启动线程
for t in t_list:
t.join() # 阻塞等待回收线程
输出结果:

注意:
多线程不能用于操作同一数据=>数据不确定=>通过threading模块的Lock对象保证数据正确性
即执行完操作时主动释放,继续让其他线程获取。
没有搞懂
# 上锁操作
from threading import Lock
lock = Lock()
lock.acquire() # 获取锁
---
lock.release()
Queue队列模型
GIL全局解释器=>同一时刻仅一个线程占据解释器
线程遇到 IO 操作时就会主动让出解释器,让其他处于等待状态的线程去获取解释器来执行程序,而该线程则回到等待状态,这主要是通过线程的调度机制实现的。
queue
:队列先进先出,提供创建共享数据的队列模型
from queue import Queue # 导入模块
q = Queue() # 创建队列对象
q.put(url) # 向队列中添加爬取的url
q.get() # 获取一个url;当队列为空时,阻塞
q.empty() # 判断是否为空,True/false
实例:
#-*-coding:utf-8-*-
"""
@author:百草
@file:test_xiaomi.py
@time:2022/09/08
"""
# https://app.mi.com/
"""
标签
<h5>
<a href="/details?id=com.tencent.tmgp.jx3m">剑网3:指尖江湖</a>
</h5>
跳转链接 https://app.mi.com/details?id=com.tencent.tmgp.jx3m
xpath //ul[@class="applist"]//h5/a
- 实例:queue的使用
"""
from urllib.request import urlopen, Request
from faker import Faker
from pyquery import PyQuery
from queue import Queue
class XiaoMi:
def __init__(self):
self.url = "https://app.mi.com/"
self.q = Queue()
def get_html(self, url):
# 获取相应内容
req = Request(url, headers={"User-Agent": Faker(local="zh_CN").user_agent()})
resp = urlopen(req)
return resp.read()
def para_html(self, html):
# 解析html
# tag = PyQuery(html)(".applist")
tags = PyQuery((PyQuery(html)(".applist")))("h5")
for tag in tags:
url_tail = PyQuery(tag)("a").attr("href")
self.q.put(self.url + url_tail)
# print(url_tail)
def run(self):
"""主函数"""
html = self.get_html(self.url) # 获取主页面
self.para_html(html)
# print(self.q) # <queue.Queue object at 0x000001B60E2B83D0>
while not (self.q.empty()): # 非空时执行
detail_url = self.q.get() # 先出
# detail_url = "https://app.mi.com/details?id=com.kimoo.shizi"
detail_html = self.get_html(detail_url)
res = PyQuery(detail_html)(".intro-titles")
name = PyQuery(res)("h3").text() # 获取h3标签对应的文本信息
clas = PyQuery(res)(".special-font").text()
print(name, clas, detail_url) # name、clas存在获取失败情况
if __name__ == "__main__":
a = XiaoMi()
a.run()
- Lock+queue+threading的使用
import time
from queue import Queue
import csv
from threading import Lock
from faker import Faker
from pyquery import PyQuery
from urllib.request import urlopen, Request
from threading import Thread
class Spider:
def __init__(self):
"""
爬虫:https://app.mi.com/catTopList/0?page=3
"""
self.url = "https://app.mi.com"
self.q = Queue() # 存放所有URL的队列
self.i = 0
self.id_list = [] # 存放所有类型id的空列表
self.f = open("Xiaomi.csv", "a", encoding="utf-8", newline="") # 打开文件
self.writer = csv.writer(self.f)
self.lock = Lock() # 创建锁
self.fake = Faker(local="zh_CN") # 获取ua
# 获取包链接
def get_cateid(self):
"""获取类名id;=>获取包名+app分类+下载详情页"""
resp = Request(self.url, headers={"User-Agent": self.fake.user_agent()})
html = urlopen(resp).read()
res = PyQuery(html)(".applist")
for i in res:
tags = PyQuery(i)("h5")
for tag in tags:
url_tail = PyQuery(tag)("a").attr("href")
self.id_list.append(self.url + url_tail)
# print(self.id_list)
# url加入队函数,拼接url并将url加入队列
def url_in(self):
self.get_cateid()
for url in self.id_list:
self.q.put(url)
# 请求:多线程处理
def get_req(self):
while not self.q.empty():
url = self.q.get()
resp = Request(url, headers={"User-Agent": self.fake.user_agent()})
html = urlopen(resp).read()
self.parse_html(html)
# 解析函数
def parse_html(self, html):
res = PyQuery(html)(".intro-titles")
name = PyQuery(res)("h3").text() # 获取h3标签对应的文本信息
clas = PyQuery(res)(".special-font").text()
# clas1, clas2 = clas.split("|")
# print(name, clas) # name、clas存在获取失败情况
# 写入文件
self.lock.acquire()
self.writer.writerow([name, clas])
self.lock.release()
# 主函数
def main(self):
self.url_in() # url加入队列
t_list = []
# 创建多线程
for i in range(1):
t = Thread(target=self.get_req)
t_list.append(t)
t.start() # 启动线程
# 回收线程
for t in t_list:
t.join()
# 文件保存
self.f.close()
if __name__ == "__main__":
start = time.time()
s = Spider()
s.main()
end = time.time()
print("执行时间:", end - start)
# 执行时间: 10.322103023529053

网友评论