并发:指的是任务数多余cpu核数,通过操作系统的各种任务调度算法,实现用多个任务“一起”执行(实际上总有一些任务不在执行,因为切换任务的速度相当快,看上去一起执行而已)
并行:指的是任务数小于等于cpu核数,即任务真的是一起执行的
线程
多线程执行
#coding=utf-8
import threading
import time
def saySorry():
for i in range(5):
print("亲爱的,我错了,我能吃饭了吗?")
time.sleep(1)
def do():
for i in range(5):
print("亲爱的,我错了,我给你按摩")
time.sleep(1)
if __name__ == "__main__":
td1 = threading.Thread(target=saySorry)
td1.start() #启动线程,即让线程开始执行
td2 = threading.Thread(target=saySorry)
td2.start() #启动线程,即让线程开始执行
- threading.Thread参数介绍
target:线程执行的函数
name:线程名称
args:执行函数中需要传递的参数,元组类型 另外:注意daemon参数
如果某个子线程的daemon属性为False,主线程结束时会检测该子线程是否结束,如果该子线程还在运行,则主线程会等待它完成后再退出;
如果某个子线程的daemon属性为True,主线程运行结束时不对这个子线程进行检查而直接退出,同时所有daemon值为True的子线程将随主线程一起结束,而不论是否运行完成。
属性daemon的值默认为False,如果需要修改,必须在调用start()方法启动线程之前进行设置
多线程-共享全局变量
互斥锁
上锁解锁过程当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。
每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“阻塞”,直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。
线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。
- 锁的好处:
确保了某段关键代码只能由一个线程从头到尾完整地执行
- 锁的坏处:
阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了
由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁
多线程爬虫
import queue
import requests
import threading
from lxml import etree
import json
'''
#maxsize=0:队列中能够存储的最大的数据量
data_que = queue.Queue(maxsize=40)
for i in range(0,50):
if not data_que.full():
data_que.put(i)
#判断队列是否为空
isempty = data_que.empty()
print(isempty)
#判断队列是否存满
isfull = data_que.full()
print(isfull)
#返回队列的大小
size = data_que.qsize()
print(size)
#FIFO先进先出
print(data_que.get())
print(data_que.get())
'''
#注意:队列是线程之间常用的数据交换形式,因为队列在线程间,是线程安全的
'''
1.创建一个任务队列:存放的是待爬取的url地址
2.创建爬取线程,执行任务下载
3.创建数据队列:存放爬取线程获取到的页面源码
4.创建解析线程:提取解析html源码,提取目标数据,存储本地,进行数据持久化
'''
#获取jobbole的文章列表
# http://blog.jobbole.com/all-posts/page/1/
# http://blog.jobbole.com/all-posts/page/2/
# http://blog.jobbole.com/all-posts/page/3/
def download_page_data(taskQueue,dataQueue):
"""
执行下载任务
:param taskQueue:从任务队列里面取出任务
:param dataQueue: 将获取到的页面源码存到dataQueue队列中
:return:
"""
while not taskQueue.empty():
page = taskQueue.get()
print('正在下载'+str(page)+'页',threading.currentThread().name)
full_url = 'http://blog.jobbole.com/all-posts/page/%s/'%str(page)
req_header = {
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36'
}
response = requests.get(full_url,headers=req_header)
if response.status_code == 200:
#将获取到的页面源码存到dataQueue队列中
dataQueue.put(response.text)
else:
taskQueue.put(page)
def parse_data(dataQueue,lock):
"""
解析数据,从dataQueue里取出数据进行解析
:param dataQueue:
:return:
"""
while not dataQueue.empty():
print('正在解析',threading.currentThread().name)
html = dataQueue.get()
html_element = etree.HTML(html)
articles = html_element.xpath('.//div[@class="post floated-thumb"]')
for article in articles:
articleInfo = {}
#标题
articleInfo['title'] = article.xpath('.//a[@class="archive-title"]/text()')[0]
#封面
img_element = article.xpath('.//div[@class="post-thumb"]/a/img')
if len(img_element) > 0:
articleInfo['coverImage'] = img_element[0].xpath('./@src')[0]
else:
articleInfo['coverImage'] = '暂无图片'
p_as = article.xpath('.//div[@class="post-meta"]/p[1]//a')
if len(p_as) > 2:
#tag类型
articleInfo['tag'] = p_as[1].xpath('./text()')[0]
#评论量
articleInfo['commentNum'] = p_as[2].xpath('./text()')[0]
else:
# tag类型
articleInfo['tag'] = p_as[1].xpath('./text()')[0]
# 评论量
articleInfo['commentNum'] = '0'
#简介
articleInfo['content'] = article.xpath('.//span[@class="excerpt"]/p/text()')[0]
#时间
articleInfo['publishTime'] = ''.join(article.xpath('.//div[@class="post-meta"]/p[1]/text()')).replace('\n','').replace(' ','').replace('\r','').replace('·','')
lock.acquire() #加锁
with open('jobbole.json','a+',encoding='utf-8') as file:
json_str = json.dumps(articleInfo,ensure_ascii=False) + '\n'
file.write(json_str)
lock.release() #解锁
if __name__ == '__main__':
#1.创建任务队列
taskQueue = queue.Queue()
for i in range(1,201):
taskQueue.put(i)
# 创建数据队列
dataQueue = queue.Queue()
#创建线程执行下载任务
threadingname = ['下载线程1号','下载线程2号','下载线程3号','下载线程4号']
crawl_thread = []
for name in threadingname:
#创建线程
thread_crawl = threading.Thread(
target=download_page_data,
name=name,
args=(taskQueue,dataQueue)
)
crawl_thread.append(thread_crawl)
#开始线程
thread_crawl.start()
#让所有的爬取线程执行完毕,在回到主线程中继续执行
for thread in crawl_thread:
thread.join()
#####################################################################
#加线程锁
lock = threading.Lock()
#创建解析线程,从dataQueue队列中取出页面源码,进行解析
threadingname = ['解析线程1号', '解析线程2号', '解析线程3号', '解析线程4号']
parse_thread = []
for name in threadingname:
# 创建线程
thread_parse = threading.Thread(
target=parse_data,
name=name,
args=(dataQueue,lock)
)
parse_thread.append(thread_parse)
# 开始线程
thread_parse.start()
# 让所有的爬取线程执行完毕,在回到主线程中继续执行
for thread in crawl_thread:
thread.join()
print('over')
多进程
- 进程的创建-multiprocessing
from multiprocessing import Process
import time
def run_proc():
"""子进程要执行的代码"""
while True:
print("----2----")
time.sleep(1)
if __name__=='__main__':
p = Process(target=run_proc)
p.start()
while True:
print("----1----")
time.sleep(1)
创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动
网友评论