美文网首页
python多线程与多进程

python多线程与多进程

作者: Byte猫 | 来源:发表于2019-05-21 12:07 被阅读0次

    一、基本概念

    1、进程process

    什么是进程。最直观的就是一个个pid,官方的说法就:进程是程序在计算机上的一次执行活动。
    从内核的观点看,进程的目的就是担当分配系统资源(CPU时间、内存等)的基本单位。
    进程有独立的地址空间,一个进程崩溃后不会对其它进程产生影响。

    2、线程thead

    线程是进程的一个执行流,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
    一个进程由几个线程组成,线程与同属一个进程的其他的线程共享进程所拥有的全部资源。
    线程没有独立的地址空间,一个线程死掉就等于整个进程死掉。

    3、多线程和多进程

    在 Python 中,对于计算密集型任务,多进程占优势,对于 I/O 密集型任务,多线程占优势。
    当然对运行一个程序来说,随着 CPU 的增多执行效率肯定会有所提高,这是因为一个程序基本上不会是纯计算或者纯 I/O,所以我们只能相对的去看一个程序到底是计算密集型还是 I/O 密集型。

    二、Python多进程技术

    python中的多进程主要使用到 multiprocessing 这个库

    1、不使用进程池

    # -*- coding:utf-8 -*-
    from multiprocessing import Process
    import time,os
    
    def worker():
        print("子进程{}执行中, 父进程{}".format(os.getpid(),os.getppid()))
        time.sleep(2)
        print("子进程{}终止".format(os.getpid()))
    
    if __name__ == "__main__":
        print("本机为",os.cpu_count(),"核 CPU")
        print("主进程{}执行中, 开始时间={}".format(os.getpid(), time.strftime('%Y-%m-%d %H:%M:%S')))
        start = time.time()
    
        l=[]
        # 创建子进程实例
        for i in range(10):
            p=Process(target=worker,name="worker"+str(i),args=())
            l.append(p)
    
        # 开启进程
        for i in range(10):
            l[i].start()
     
        # 阻塞进程
        for i in range(10):
            l[i].join()
        
        stop = time.time()
        print("主进程终止,结束时间={}".format(time.strftime('%Y-%m-%d %H:%M:%S')))
        print("总耗时 %s 秒" % (stop - start))
    

    2、使用进程池

    # -*- coding:utf-8 -*-
    from multiprocessing import Pool
    import time,os
    
    def worker(arg):
        print("子进程{}执行中, 父进程{}".format(os.getpid(),os.getppid()))
        time.sleep(2)
        print("子进程{}终止".format(os.getpid()))
    
    if __name__ == "__main__":
        print("本机为",os.cpu_count(),"核 CPU")
        print("主进程{}执行中, 开始时间={}".format(os.getpid(), time.strftime('%Y-%m-%d %H:%M:%S')))
        start = time.time()
    
        l = Pool(processes=5)
        # 创建子进程实例
        for i in range(10):
            # l.apply(worker,args=(i,))      # 同步执行(Python官方建议废弃)
            l.apply_async(worker,args=(i,))  # 异步执行
    
        # 关闭进程池,停止接受其它进程
        l.close()
        # 阻塞进程
        l.join()
        
        stop = time.time()
        print("主进程终止,结束时间={}".format(time.strftime('%Y-%m-%d %H:%M:%S')))
        print("总耗时 %s 秒" % (stop - start))
    

    三、Python多线程技术

    python中的多进程主要使用到 threading 这个库

    1、不使用线程池

    # -*- coding:utf-8 -*-
    from threading import Thread
    import time,os
    
    def worker(arg):
        print("子线程执行中>>> 编号={}".format(arg))
        time.sleep(2)
        print("子线程终止>>> 编号={}".format(arg))
    
    if __name__ == "__main__":
        print("本机为",os.cpu_count(),"核 CPU")  # 本机为4核
        
        l = []
        # 创建子线程实例
        for i in range(10):
            t = Thread(target=worker, name='one', args=(i,))
            t.start()
            l.append(t)
        for p in l:
            p.join()
    

    关于线程互斥、同步等

    2、使用线程池(multiprocessing库的线程池)

    "from multiprocessing import Pool "这样导入的 Pool 表示的是进程池,"from multiprocessing.dummy import Pool"这样导入的 Pool表示的是线程池。

    # -*- coding:utf-8 -*-
    from multiprocessing.dummy import Pool as ThreadPool
    import time,os
    
    def worker(arg):
        print("子线程{}执行中".format(arg))
        time.sleep(2)
        print("子线程{}终止".format(arg))
    
    if __name__ == "__main__":
        print("本机为",os.cpu_count(),"核 CPU")
        print("主线程执行中, 开始时间={}".format(time.strftime('%Y-%m-%d %H:%M:%S')))
        start = time.time()
    
        pool = ThreadPool(5)
        results = pool.map(worker, range(10))
    
        pool.close()
        pool.join()
        
        stop = time.time()
        print("主线程终止,结束时间={}".format(time.strftime('%Y-%m-%d %H:%M:%S')))
        print("总耗时 %s 秒" % (stop - start))
    

    3、使用线程池(自定义线程池)

    """
    思路
    1,将任务放在队列
        1)创建队列:(初始化)
        2)设置大小,线程池的最大容量
        3)真实创建的线程 列表
        4)空闲的线程数量
    
    2,着手开始处理任务
        1)创建线程
            2)空闲线程数量大于0,则不再创建线程
            3)创建线程池的数量 不能高于线程池的限制
            4)根据任务个数判断  创建线程的数量
        2)线程去队列中取任务
            1)取任务包(任务包是一个元祖)
            2)任务为空时,不再取(终止)
    """
    
    import time
    import threading
    import queue
    
    stopEvent = object()  # 停止任务的标志
    
    class ThreadPool(object):
        def __init__(self, max_thread):
            # 创建任务队列,可以放无限个任务
            self.queue = queue.Queue()
            # 指定最大线程数
            self.max_thread = max_thread
            # 停止标志
            self.terminal = False
            # 创建真实线程数
            self.generate_list = []
            # 空闲线程数
            self.free_thread = []
    
        def run(self, action, args, callback=None):
            """
            线程池执行一个任务
            INPUT ->
                action:任务函数
                args:任务参数
                callback:执行完任务的回调函数,成功或者失败的返回值。
            """
            # 线程池运行的条件:1)
            if len(self.free_thread) == 0 and len(self.generate_list) < self.max_thread:
                self.generate_thread()
            task = (action, args, callback)
            self.queue.put(task)
    
        def callback(self):
            """
            回调函数:循环取获取任务,并执行任务函数
            """
            # 获取当前线程
            current_thread = threading.current_thread()
            self.generate_list.append(current_thread)
            # 取任务并执行
            event = self.queue.get()
            # 事件类型是任务
            while event != stopEvent:  # 重点是这个判断  使任务终止
                # 解开任务包 ,(任务是一个元祖)
                # 执行任务
                # 标记:执行任务前的状态,执行任务后的状态
                action, args, callback = event
                try:
                    ret = action(*args)
                    success = True
                except Exception as x:
                    success = False
                    ret = x
                if callback is not None:
                    try:
                        callback(success, ret)
                    except Exception as e:
                        print(e)
                else:
                    pass
                if not self.terminal:
                    self.free_thread.append(current_thread)
                    event = self.queue.get()
                    self.free_thread.remove(current_thread)
                else:
                    # 停止进行取任务
                    event = stopEvent
            else:
                # 不是元祖,不是任务,则清空当前线程,不在去取任务
                self.generate_list.remove(current_thread)
    
        def generate_thread(self):
            """
            创建一个线程
            """
            t = threading.Thread(target=self.callback)
            t.start()
    
        # 终止取任务
        def terminals(self):
            """
            无论是否还有任务,终止线程
            """
            self.terminal = True
    
        def close(self):
            """
            执行完所有的任务后,所有线程停止
            """
            num = len(self.generate_list)
            self.queue.empty()
            while num:
                self.queue.put(stopEvent)
                num -= 1
    
    
    def test(pi):
        time.sleep(0.5)
        print(pi)
    
    
    pool = ThreadPool(10)
    
    for i in range(100):
        pool.run(action=test, args=(i,))
    
    pool.terminals()
    pool.close()
    

    相关文章

      网友评论

          本文标题:python多线程与多进程

          本文链接:https://www.haomeiwen.com/subject/hvtnmqtx.html