多进程
多任务:生活中来看,就是多个任务同时进行,喝酒聊天,开车,手脚并用,唱歌跳舞
电脑中:录屏、sublime、vnc服务端、浏览器打开等
代码中:实现多任务,多进程、多线程
进程:电脑中,每一个软件启动起来都是一个进程,
代码中:没有运行的时候称之为程序,运行起来之后就是一个进程
多进程:进程只有一个,称之为主进程,子进程,要实现两个函数同时执行,就要通过主进程来创建子进程
操作系统实现,只是在进程之间来回切换,切换的非常快,看着像同时执行一样
如何实现?
面向过程:(process)
p = Process(target=xxx, name=xxx, args=(xxx,))
target: 进程启动之后要执行的函数
name: 给进程起个名字
args: 给子进程传递的参数,是一个元组
p.start() 启动进程
p.join() 让主进程等待子进程结束
os.getpid() 获取当前进程id号
os.getppid() 获取父进程的id号
from multiprocessing import Process
import time
import os
# 想让子进程1执行sing函数
def sing(a):
print('接受的参数为%s' % a)
# 进程id号
print('进程-%s-开始运行' % os.getpid())
print('父进程为%s' % os.getppid())
for x in range(1, 5):
print('我在唱小情歌')
time.sleep(1)
# 想让子进程2执行dance函数
def dance(a):
print('进程-%s-开始运行' % os.getpid())
print('父进程为%s' % os.getppid())
for x in range(1, 5):
print('我在跳钢管舞')
time.sleep(1)
def main():
# 主进程
print('主进程id号为%s' % os.getpid())
a = '青花瓷'
# 创建进程
p_sing = Process(target=sing, name='唱歌', args=(a,))
p_dance = Process(target=dance, name='跳舞', args=(a,))
# 启动进程
p_sing.start()
p_dance.start()
# 获取进程名字
print(p_sing.name)
print(p_dance.name)
# 因为主进程中有子进程的信息,所以主进程必须等子进程结束之后再结束
p_sing.join()
p_dance.join()
print('主进程结束')
if __name__ == '__main__':
main()
面向对象:
from multiprocessing import Process
import time
class SingProcess(Process):
def __init__(self, a):
# 如果要重写构造方法,一定得手动调用父类的构造方法
super().__init__()
self.a = a
def run(self):
print('传递的参数为%s' % self.a)
for x in range(1, 5):
print('我在唱小情歌')
time.sleep(1)
class DanceProcess(Process):
def run(self):
for x in range(1, 5):
print('我在跳钢管舞')
time.sleep(1)
def main():
a = '现在很多老歌手为什么不唱歌了'
p_sing = SingProcess(a)
# 启动进程,进程启动之后默认执行类里面的run方法
p_sing.start()
# p_dance = DanceProcess()
# p_dance.start()
p_sing.join()
# p_dance.join()
print('主进程结束')
if __name__ == '__main__':
main()
多进程拷贝,拷贝文件夹,假如文件夹里面只有文件。假如有100个文件,那么拷贝的时候先拷贝第一个,然后第二个,然后第三个====
拷贝一个文件就是一个任务,那一共100文件,那难道要开辟100个进程吗?
进程并不是越多越好,切换占用的时间越大
练习:多进程拷贝
拷贝之前记录时间戳,拷贝之后记录时间戳,计算拷贝的时间
多进程拷贝也是一样,哪个时间短
引入进程池,规定能创建几个进程,来了任务,5个进程
进程之间是否共享全局变量
全局变量
进程之间是否共享全局变量,不共享
每一个进程都是单独的代码
from multiprocessing import Process
import os
import time
count = 100
# 该进程用来修改全局变量的值
def change():
global count
count += 100
print('进程%s修改后的值为%s' % (os.getpid(), count))
# 该进程用来读取全局变量的值
def read():
print('进程%s读取的值为%s' % (os.getpid(), count))
def test(c):
a = 100
if c == 'hello':
a += 100
time.sleep(2)
print('进程%s修改a的值为%s' % (os.getpid(), a))
else:
time.sleep(5)
print('进程%s读取a的值为%s' % (os.getpid(), a))
def main():
'''
p1 = Process(target=change)
p1.start()
time.sleep(2)
p2 = Process(target=read)
p2.start()
'''
a = 'hello'
b = 'world'
p1 = Process(target=test, args=(a, ))
p2 = Process(target=test, args=(b, ))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
main()
进程池
from multiprocessing import Process
from multiprocessing import Pool
import os
import time
def test(name):
print('任务%s正在运行,进程id号为%s' % (name, os.getpid()))
time.sleep(2)
def main():
# 创建一个进程池对象
po = Pool(3)
# 给进程池添加任务
lt = ['关羽', '赵云', '张飞', '马超', '黄忠', '吕布', '孙策', '大乔']
for name in lt:
po.apply_async(test, args=(name, ))
# 进程池使用完毕之后,关闭
po.close()
# 让主进程等待结束
po.join()
print('主进程、进程池全部结束')
if __name__ == '__main__':
main()
多线程
线程:比如qq。比如暴风影音,比如word
可以同时语音、同时视频、同时聊天,多线程
暴风影音,视频播放、音频播放,多线程
word,打字,拼写检查,等,多线程
多任务的实现:多进程、多线程
主进程-子进程1-子进程2
特点:进程之间没有关系,如果一个进程挂了,不影响其它子进程
进程-主线程-子线程1-子线程2
特点:线程之间有关系,如果一个线程挂了,整个进程就挂了
实现方式:(thread)
面向过程
t = Thread(target=xxx, name=xxx, args=(xxx,))
target: 线程启动之后要执行的函数
name: 线程的名字
args: 给线程传递的参数
t.start(): 启动线程
t.join(): 让主线程等待子线程结束
threading.current_thread().name : 获取线程名字
import threading
import time
def sing(song):
print('传递过来的参数为%s' % song)
print('获取线程的名字为%s' % threading.current_thread().name)
for x in range(1, 5):
print('我在唱老情歌')
time.sleep(1)
def dance():
for x in range(1, 5):
print('我在跳广场舞')
time.sleep(1)
def main():
a = '广岛之恋'
# 这是一个进程,进程里面有一个主线程,然后主线程创建子线程1(唱歌),子线程2(跳舞)
t_sing = threading.Thread(target=sing, name='唱歌', args=(a, ))
t_dance = threading.Thread(target=dance, name='跳舞')
# 启动线程
t_sing.start()
t_dance.start()
t_sing.join()
t_dance.join()
print('主线程、子线程同时结束')
if __name__ == '__main__':
main()
面向对象
import threading
import time
# 滕王阁序 王勃
# 命硬
# 沁园春-雪
# 出师表
class MyThread(threading.Thread):
def __init__(self, a):
super().__init__()
self.a = a
def run(self):
print('传递过来的参数为%s' % self.a)
for x in range(1, 5):
print('凤凰台上凤凰游,凤去台空江自流')
time.sleep(1)
def main():
a = '落霞与孤鹜齐飞,秋水共长天一色'
t = MyThread(a)
t.start()
t.join()
print('主线程、子线程全部结束')
if __name__ == '__main__':
main()
是否共享
全局变量
共享全局变量
局部变量
不共享局部变量
线程安全问题
线程之间可以共享全局变量
import threading
import os
import time
count = 100
# 该线程用来修改全局变量的值
def change():
global count
count += 100
print('线程%s修改后的值为%s' % (threading.current_thread().name, count))
# 该线程用来读取全局变量的值
def read():
print('线程%s读取的值为%s' % (threading.current_thread().name, count))
def test():
a = 100
name = threading.current_thread().name
if name == 'lala':
a += 100
time.sleep(2)
print('线程%s修改a的值为%s' % (name, a))
else:
time.sleep(5)
print('线程读取a的值为%s' % a)
def main():
'''
t1 = threading.Thread(target=change, name='修改thread')
t1.start()
time.sleep(2)
t2 = threading.Thread(target=read, name='读取thread')
t2.start()
'''
t1 = threading.Thread(target=test, name='lala')
t2 = threading.Thread(target=test)
t1.start()
t2.start()
t1.join()
t2.join()
if __name__ == '__main__':
main()
加锁
加锁
lock.acquire()
释放锁
lock.release()
import threading
import time
count = 100
# 创建一把锁
lock = threading.Lock()
def test(number):
start = time.time()
# 在这里面修改count的值
global count
for x in range(1, 10000000):
# 加锁
lock.acquire()
count += number
count -= number
# 释放锁
lock.release()
end = time.time()
# 上厕所,大号,如何解决,加锁
# 用之前,加锁,用完之后,释放锁
# 牺牲了效率了
print('循环计算的时间为%s' % (start - end))
def main():
t1 = threading.Thread(target=test, args=(3, ))
t2 = threading.Thread(target=test, args=(5, ))
t2.start()
t1.start()
t1.join()
t2.join()
print('主线程中打印的count的值为%s' % count)
if __name__ == '__main__':
main()
队列
队列:买火车票,电动三轮,特点:先进先出
用在哪?
线程之间使用队列进行交互,让线程解耦合,生产者-消费者模型
线程1-生产数据
交互渠道:队列
线程2-消费数据
while 1:
生产数据
消费数据
队列使用:
from queue import Queue
q = Queue(5)
q.put() 添加元素
q.put(False) 如果队列已满,添加元素立即抛出异常
q.put(True, 5) 如果队列已满,添加元素5s之后抛出异常
q.get() 获取元素
q.get(False) 如果队列为空,获取元素立即抛出异常
q.get(True, 5) 如果队列为空,获取元素5s之后抛出异常
q.empty() 队列是否为空
q.full() 队列是否已满
q.qsize() 队列长度
from queue import Queue
# 创建一个队列
# 如果写,代表队列的长度,如果不写,队列长度无限
q = Queue(5)
print(q.empty())
print(q.full())
print(q.qsize())
# 向队列中添加数据
q.put('吴彦祖')
q.put('岳云鹏')
q.put('王宝强')
q.put('黄渤')
q.put('刘德华')
print(q.empty())
print(q.full())
print(q.qsize())
# q.put('古天乐', True, 5)
# 从队列中获取数据
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# print(q.get(True, 5))
多线程爬虫
分析:
爬虫里面如何分多线程,
循环:
拼接url,发送请求,获取响应
解析响应,保存到文件
涉及到:
采集线程,3个线程同时采集
解析线程,3个线程同时解析
页码队列:里面是要爬取的页码数据
数据队列:采集线程向队列中添加数据
解析线程从队列中获取数据
保存到同一个文件中,锁机制
import threading
from queue import Queue
import time
from lxml import etree
import requests
import json
class CrawlThread(threading.Thread):
def __init__(self, name, page_queue, data_queue):
super().__init__()
self.name = name
# 保存页码队列
self.page_queue = page_queue
self.data_queue = data_queue
# url
self.url = 'http://www.fanjian.net/duanzi-{}'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
}
def run(self):
print('%s线程开始启动' % self.name)
# 这里面的思路是什么?
while 1:
if self.page_queue.empty():
break
# 1、从页码队列中获取页码
page = self.page_queue.get()
# 2、将url和页码进行拼接
url = self.url.format(page)
# 3、发送请求,获取响应
r = requests.get(url=url, headers=self.headers)
time.sleep(1)
# 4、将响应内容放入到数据队列中
self.data_queue.put(r.text)
print('%s线程结束' % self.name)
class ParseThread(threading.Thread):
def __init__(self, name, data_queue, lock, fp):
super().__init__()
self.name = name
# 保存数据队列
self.data_queue = data_queue
self.lock = lock
self.fp = fp
def run(self):
print('%s线程开始启动' % self.name)
# 解析线程解析步骤
while 1:
if self.data_queue.empty():
break
# 1、从数据队列中取出一个数据
content = self.data_queue.get()
# 2、解析这个数据
items = self.parse_content(content)
# 3、写入到文件中
string = json.dumps(items, ensure_ascii=False)
# 加锁
self.lock.acquire()
self.fp.write(string)
# 释放锁
self.lock.release()
print('%s线程结束' % self.name)
# 解析数据函数
def parse_content(content):
# 生成tree对象
tree = etree.HTML(content)
# 先找到所有的li标签
li_list = tree.xpath('//li[@class="cont-item"]')
items = []
for oli in li_list:
# 获取头像
face = oli.xpath('.//div[@class="cont-list-reward"]//img/@data-src')[0]
# 获取名字
name = oli.xpath('.//div[@class="cont-list-head"]/a/text()')[0]
# 获取内容
text = oli.xpath('.//div[@class="cont-list-main"]/p/text()')[0]
# 获取时间
shijian = oli.xpath('.//div[@class="cont-list-info fc-gray"]/text()')[-1]
item = {
'头像': face,
'名字': name,
'内容': text,
'时间': shijian,
}
# 将字典添加到列表中
items.append(item)
return items
def create_queue():
page_queue = Queue()
data_queue = Queue()
# 向页码队列中添加页码
for page in range(1, 11):
page_queue.put(page)
return page_queue, data_queue
def main():
# 做什么?
# 创建锁
lock = threading.Lock()
# 打开文件
fp = open('duanzi.txt', 'w', encoding='utf8')
# 创建两个队列
page_queue, data_queue = create_queue()
# 创建采集、解析线程
crawlname_list = ['采集线程1', '采集线程2', '采集线程3']
parsename_list = ['解析线程1', '解析线程2', '解析线程3']
# 列表,用来保存所有的采集线程和解析线程
t_crawl_list = []
t_parse_list = []
for crawlname in crawlname_list:
t_crawl = CrawlThread(crawlname, page_queue, data_queue)
t_crawl.start()
# 将对应的采集线程保存起来
t_crawl_list.append(t_crawl)
for parsename in parsename_list:
t_parse = ParseThread(parsename, data_queue, lock, fp)
# 将对应的解析线程保存起来
t_parse_list.append(t_parse)
t_parse.start()
# 让主线程等待子线程结束之后再结束
for t_crawl in t_crawl_list:
t_crawl.join()
for t_parse in t_parse_list:
t_parse.join()
fp.close()
print('主线程、子线程全部结束')
if __name__ == '__main__':
main()
# 留给大家了,为什么里面没有写数据呢?
网友评论