关于的生产者和消费者的实现,刚好最近有用到,简单总结记录下:
多进程
是系统独立调度核分配系统资源(CPU、内存)的基本单位,进程之间是相互独立的,每启动一个新的进程相当于把数据进行了一次克隆。
python提供了多种方法实现了多进程中间的(可以修改同一份数据)。
关于GIL
GIL的全称是 Global Interpreter Lock(全局解释器锁),来源是 Python 设计之初的考虑,为了数据安全所做的决定。
某个线程想要执行,必须先拿到 GIL,我们可以把 GIL 看作是“通行证”,并且在一个 Python 进程中,GIL 只有一个,这就导致了多线程抢占GIL耗时。这就是为什么在多核CPU上,Python 的多线程效率并不高的根本原因。
所以有必要学习下多进程的使用。
多进程使用示例
#!/usr/bin/env Python
# -- coding: utf-8 --
"""
@version: v1.0
@author: narutohyc
@file: multiprocessing_queue.py
@Description: 多进程队列使用示例
@time: 2020/5/14 15:53
"""
from multiprocessing import Process, Queue, Manager
from multiprocessing import cpu_count
import os
import time
class MultiProcessingQueue:
def __init__(self):
# 进程数
self.num_of_worker = cpu_count()
# 进程队列大小,根据不同的任务需求
self.size_of_queue = 10
def start_work(self):
print("start_work 开始")
# 进程队列
process_list = []
# 新建一个大小为10的队列
work_queue = Queue(self.size_of_queue)
# 进程间共享列表, 其他的还有共享字典等,都是进程安全的
dealed_sample_lst = Manager().list()
# 多个生产者
# 这里要注意任务的拆分,数据的产生来源是不是可以被多个生产者共享
num_os_producer = 4
for _ in range(num_os_producer):
sent = Process(target=self.productor, args=(work_queue, dealed_sample_lst,))
sent.start()
process_list.append(sent)
# 多个消费者
for _ in range(self.num_of_worker - num_os_producer):
process = Process(target=self.consumer, args=(work_queue, dealed_sample_lst,))
process.start()
process_list.append(process)
[process.join() for process in process_list[:num_os_producer]]
# 这里需要加入结束标识,还有就是JoinableQueue的方式
for _ in range(self.num_of_worker - 1):
work_queue.put(None)
[process.join() for process in process_list[num_os_producer:]]
print("start_work 结束")
return dealed_sample_lst
def productor(self, work_queue: Queue, dealed_sample_lst):
print("生产者开始工作")
for ii in range(100):
work_queue.put(Task(task_name=f'{str(os.getpid())}-{str(ii)}', data=[ii for _ in range(2)]))
if ii % 30 == 0:
time.sleep(1)
print("生产者休息ing")
'''
JoinableQueue 比Queue多了task_done() 与join()两个函数,多用于生产者消费者问题。
task_done()是用在get()后,发送通知说我get完了
join()是说Queue里所有的task都已处理。
'''
print("生产者工作结束")
def consumer(self, work_queue: Queue, dealed_sample_lst):
while True:
task: Task = work_queue.get()
if task is None:
break
# 处理数据
task.data = [ii * 2 for ii in task.data]
dealed_sample_lst.append(task)
print(task)
print(f'进程{os.getpid()} 处理结束')
# 注意这里不能放太多的数据,不然会导致多进程效率变很低
class Task:
def __init__(self, task_name: str, data: list, **kwargs):
self.task_name = task_name
self.data = data
def __repr__(self):
return f'task_name:{self.task_name} data:{self.data}'
def multiprocessing_queue_test():
multiprocessing_queue = MultiProcessingQueue()
dealed_sample_lst = multiprocessing_queue.start_work()
# for sample in dealed_sample_lst:
# print(sample)
print("测试结束")
if __name__ == '__main__':
multiprocessing_queue_test()
# 删了一些多余的输出
start_work 开始
生产者开始工作
task_name:28868-0 data:[0, 0]
生产者休息ing
task_name:28868-1 data:[2, 2]
...
task_name:28868-6 data:[12, 12]
生产者休息ing
task_name:28868-31 data:[62, 62]
...
task_name:28868-58 data:[116, 116]
生产者休息ing
task_name:28868-61 data:[122, 122]
...
生产者休息ing
task_name:28868-91 data:[182, 182]
...
task_name:28868-96 data:[192, 192]
生产者工作结束
进程29208 处理结束
进程28496 处理结束
task_name:28868-98 data:[196, 196]
进程30200 处理结束
task_name:28868-99 data:[198, 198]
进程30072 处理结束
start_work 结束
测试结束
网友评论