美文网首页Python初见
关于python多进程使用(Queue、生产者和消费者)

关于python多进程使用(Queue、生产者和消费者)

作者: 超神雷鸣 | 来源:发表于2020-05-14 16:50 被阅读0次
    关于python多进程使用(Queue、生产者和消费者)

    关于\color{#D86683}{多进程}的生产者和消费者的实现,刚好最近有用到,简单总结记录下:

    多进程

    \color{#D86683}{进程}是系统独立调度核分配系统资源(CPU、内存)的基本单位,进程之间是相互独立的,每启动一个新的进程相当于把数据进行了一次克隆。
    python提供了多种方法实现了多进程中间的\color{#D86683}{通信和数据共享}(可以修改同一份数据)。

    关于GIL

    GIL的全称是 Global Interpreter Lock(全局解释器锁),来源是 Python 设计之初的考虑,为了数据安全所做的决定。
    某个线程想要执行,必须先拿到 GIL,我们可以把 GIL 看作是“通行证”,并且在一个 Python 进程中,GIL 只有一个,这就导致了多线程抢占GIL耗时。这就是为什么在多核CPU上,Python 的多线程效率并不高的根本原因。
    所以有必要学习下多进程的使用。

    多进程使用示例

    \color{#68A6D3}{multiprocessing\_queue.py}

    #!/usr/bin/env Python
    # -- coding: utf-8 --
    
    """
    @version: v1.0
    @author: narutohyc
    @file: multiprocessing_queue.py
    @Description: 多进程队列使用示例
    @time: 2020/5/14 15:53
    """
    
    from multiprocessing import Process, Queue, Manager
    from multiprocessing import cpu_count
    import os
    import time
    
    class MultiProcessingQueue:
        def __init__(self):
            # 进程数
            self.num_of_worker = cpu_count()
            # 进程队列大小,根据不同的任务需求
            self.size_of_queue = 10
    
        def start_work(self):
            print("start_work 开始")
    
            # 进程队列
            process_list = []
    
            # 新建一个大小为10的队列
            work_queue = Queue(self.size_of_queue)
    
            # 进程间共享列表, 其他的还有共享字典等,都是进程安全的
            dealed_sample_lst = Manager().list()
    
            # 多个生产者
            # 这里要注意任务的拆分,数据的产生来源是不是可以被多个生产者共享
            num_os_producer = 4
            for _ in range(num_os_producer):
                sent = Process(target=self.productor, args=(work_queue, dealed_sample_lst,))
                sent.start()
                process_list.append(sent)
    
            # 多个消费者
            for _ in range(self.num_of_worker - num_os_producer):
                process = Process(target=self.consumer, args=(work_queue, dealed_sample_lst,))
                process.start()
                process_list.append(process)
            
            [process.join() for process in process_list[:num_os_producer]]
            # 这里需要加入结束标识,还有就是JoinableQueue的方式
            for _ in range(self.num_of_worker - 1):
                work_queue.put(None)
            [process.join() for process in process_list[num_os_producer:]]
            print("start_work 结束")
            return dealed_sample_lst
    
        def productor(self, work_queue: Queue, dealed_sample_lst):
            print("生产者开始工作")
            for ii in range(100):
                work_queue.put(Task(task_name=f'{str(os.getpid())}-{str(ii)}', data=[ii for _ in range(2)]))
                if ii % 30 == 0:
                    time.sleep(1)
                    print("生产者休息ing")
    
            '''
            JoinableQueue 比Queue多了task_done() 与join()两个函数,多用于生产者消费者问题。
            task_done()是用在get()后,发送通知说我get完了
            join()是说Queue里所有的task都已处理。
            '''
            print("生产者工作结束")
    
        def consumer(self, work_queue: Queue, dealed_sample_lst):
            while True:
                task: Task = work_queue.get()
                if task is None:
                    break
    
                # 处理数据
                task.data = [ii * 2 for ii in task.data]
                dealed_sample_lst.append(task)
                print(task)
    
            print(f'进程{os.getpid()} 处理结束')
    

    \color{#68A6D3}{任务对象}

    # 注意这里不能放太多的数据,不然会导致多进程效率变很低
    class Task:
        def __init__(self, task_name: str, data: list, **kwargs):
            self.task_name = task_name
            self.data = data
    
        def __repr__(self):
            return f'task_name:{self.task_name} data:{self.data}'
    

    \color{#68A6D3}{调用示例}

    def multiprocessing_queue_test():
        multiprocessing_queue = MultiProcessingQueue()
        dealed_sample_lst = multiprocessing_queue.start_work()
        # for sample in dealed_sample_lst:
        #     print(sample)
        print("测试结束")
    
    if __name__ == '__main__':
        multiprocessing_queue_test()
    

    \color{#68A6D3}{结束输出}

    # 删了一些多余的输出
    start_work 开始
    生产者开始工作
    task_name:28868-0 data:[0, 0]
    生产者休息ing
    task_name:28868-1 data:[2, 2]
    ...
    task_name:28868-6 data:[12, 12]
    生产者休息ing
    task_name:28868-31 data:[62, 62]
    ...
    task_name:28868-58 data:[116, 116]
    生产者休息ing
    task_name:28868-61 data:[122, 122]
    ...
    生产者休息ing
    task_name:28868-91 data:[182, 182]
    ...
    task_name:28868-96 data:[192, 192]
    生产者工作结束
    进程29208 处理结束
    进程28496 处理结束
    task_name:28868-98 data:[196, 196]
    进程30200 处理结束
    task_name:28868-99 data:[198, 198]
    进程30072 处理结束
    start_work 结束
    测试结束
    

    相关文章

      网友评论

        本文标题:关于python多进程使用(Queue、生产者和消费者)

        本文链接:https://www.haomeiwen.com/subject/dyztohtx.html