美文网首页Python初见
关于python多进程使用(Queue、生产者和消费者)

关于python多进程使用(Queue、生产者和消费者)

作者: 超神雷鸣 | 来源:发表于2020-05-14 16:50 被阅读0次
关于python多进程使用(Queue、生产者和消费者)

关于\color{#D86683}{多进程}的生产者和消费者的实现,刚好最近有用到,简单总结记录下:

多进程

\color{#D86683}{进程}是系统独立调度核分配系统资源(CPU、内存)的基本单位,进程之间是相互独立的,每启动一个新的进程相当于把数据进行了一次克隆。
python提供了多种方法实现了多进程中间的\color{#D86683}{通信和数据共享}(可以修改同一份数据)。

关于GIL

GIL的全称是 Global Interpreter Lock(全局解释器锁),来源是 Python 设计之初的考虑,为了数据安全所做的决定。
某个线程想要执行,必须先拿到 GIL,我们可以把 GIL 看作是“通行证”,并且在一个 Python 进程中,GIL 只有一个,这就导致了多线程抢占GIL耗时。这就是为什么在多核CPU上,Python 的多线程效率并不高的根本原因。
所以有必要学习下多进程的使用。

多进程使用示例

\color{#68A6D3}{multiprocessing\_queue.py}

#!/usr/bin/env Python
# -- coding: utf-8 --

"""
@version: v1.0
@author: narutohyc
@file: multiprocessing_queue.py
@Description: 多进程队列使用示例
@time: 2020/5/14 15:53
"""

from multiprocessing import Process, Queue, Manager
from multiprocessing import cpu_count
import os
import time

class MultiProcessingQueue:
    def __init__(self):
        # 进程数
        self.num_of_worker = cpu_count()
        # 进程队列大小,根据不同的任务需求
        self.size_of_queue = 10

    def start_work(self):
        print("start_work 开始")

        # 进程队列
        process_list = []

        # 新建一个大小为10的队列
        work_queue = Queue(self.size_of_queue)

        # 进程间共享列表, 其他的还有共享字典等,都是进程安全的
        dealed_sample_lst = Manager().list()

        # 多个生产者
        # 这里要注意任务的拆分,数据的产生来源是不是可以被多个生产者共享
        num_os_producer = 4
        for _ in range(num_os_producer):
            sent = Process(target=self.productor, args=(work_queue, dealed_sample_lst,))
            sent.start()
            process_list.append(sent)

        # 多个消费者
        for _ in range(self.num_of_worker - num_os_producer):
            process = Process(target=self.consumer, args=(work_queue, dealed_sample_lst,))
            process.start()
            process_list.append(process)
        
        [process.join() for process in process_list[:num_os_producer]]
        # 这里需要加入结束标识,还有就是JoinableQueue的方式
        for _ in range(self.num_of_worker - 1):
            work_queue.put(None)
        [process.join() for process in process_list[num_os_producer:]]
        print("start_work 结束")
        return dealed_sample_lst

    def productor(self, work_queue: Queue, dealed_sample_lst):
        print("生产者开始工作")
        for ii in range(100):
            work_queue.put(Task(task_name=f'{str(os.getpid())}-{str(ii)}', data=[ii for _ in range(2)]))
            if ii % 30 == 0:
                time.sleep(1)
                print("生产者休息ing")

        '''
        JoinableQueue 比Queue多了task_done() 与join()两个函数,多用于生产者消费者问题。
        task_done()是用在get()后,发送通知说我get完了
        join()是说Queue里所有的task都已处理。
        '''
        print("生产者工作结束")

    def consumer(self, work_queue: Queue, dealed_sample_lst):
        while True:
            task: Task = work_queue.get()
            if task is None:
                break

            # 处理数据
            task.data = [ii * 2 for ii in task.data]
            dealed_sample_lst.append(task)
            print(task)

        print(f'进程{os.getpid()} 处理结束')

\color{#68A6D3}{任务对象}

# 注意这里不能放太多的数据,不然会导致多进程效率变很低
class Task:
    def __init__(self, task_name: str, data: list, **kwargs):
        self.task_name = task_name
        self.data = data

    def __repr__(self):
        return f'task_name:{self.task_name} data:{self.data}'

\color{#68A6D3}{调用示例}

def multiprocessing_queue_test():
    multiprocessing_queue = MultiProcessingQueue()
    dealed_sample_lst = multiprocessing_queue.start_work()
    # for sample in dealed_sample_lst:
    #     print(sample)
    print("测试结束")

if __name__ == '__main__':
    multiprocessing_queue_test()

\color{#68A6D3}{结束输出}

# 删了一些多余的输出
start_work 开始
生产者开始工作
task_name:28868-0 data:[0, 0]
生产者休息ing
task_name:28868-1 data:[2, 2]
...
task_name:28868-6 data:[12, 12]
生产者休息ing
task_name:28868-31 data:[62, 62]
...
task_name:28868-58 data:[116, 116]
生产者休息ing
task_name:28868-61 data:[122, 122]
...
生产者休息ing
task_name:28868-91 data:[182, 182]
...
task_name:28868-96 data:[192, 192]
生产者工作结束
进程29208 处理结束
进程28496 处理结束
task_name:28868-98 data:[196, 196]
进程30200 处理结束
task_name:28868-99 data:[198, 198]
进程30072 处理结束
start_work 结束
测试结束

相关文章

  • 关于python多进程使用(Queue、生产者和消费者)

    关于的生产者和消费者的实现,刚好最近有用到,简单总结记录下: 多进程 是系统独立调度核分配系统资源(CPU、内存)...

  • ActiveMQ(三)整合spring

    配置 生产者 topic 生产者 queue 接受应答 消费者Queue应答 消费者topic

  • 2018-07-27

    17.7.queue- 同步队列类源代码:Lib / queue.py 该queue模块实现了多生产者,多消费者队...

  • Python: Queue实现生产者及消费者模型

    Python Queue模块是一个线程安全的队列。基于Queue实现生产者及消费者模型如下。参考Effective...

  • 生产者和消费者问题

    生产者和消费者问题 问题简述 组成 系统中有一组生产者和消费者进程 生产者生产者将产品放入缓冲区 消费者消费者进程...

  • 生产者-消费者问题与python Queue模块

    生产者-消费者问题与python Queue模块生产者-消费者模型是一个典型的场景,在这个场景下,商品或服务的生产...

  • 生产者消费者问题、死锁问题

    生产者消费者问题 系统中有一组生产者进程和一组消费者进程,生产者进程每次生产一个产品放入缓冲区,消费者进程每次从缓...

  • ActiveMQ(二)原生API

    Maven配置 生产者 消费者 queue 消费者 topic

  • 说说Python中的queue

    背景 最近在看Python基础,刚好看到生产者与消费者这快内容,视频使用了queue模块来处理,这里记录一下学习的...

  • worker-thread模式

    相比生产者-消费者模式:生产者-消费者模式的queue更像是一个缓冲件,因为生产者的生产速度和消费者的消费速度很可...

网友评论

    本文标题:关于python多进程使用(Queue、生产者和消费者)

    本文链接:https://www.haomeiwen.com/subject/dyztohtx.html