美文网首页
Python 经验 - 多线程与多进程

Python 经验 - 多线程与多进程

作者: 千反田爱瑠爱好者 | 来源:发表于2018-08-22 17:52 被阅读11次

    多线程

    GIL

    GIL(Global Interpreter Lock)即全局解释器锁。

    • 在Python中一个线程对应于C(Cpython)中的一个线程;
    • GIL使得同一个时刻只有一个线程在一个cpu上执行字节码,而且无法将多个线程映射到多个cpu上执行(无法利用多核优势),查看Python字节码:
    import dis
    def add(a):
        a = a+1
        return a
    print(dis.dis(add))
    
    • 释放:非线程的整个过程完全占有
      1. 根据执行的字节码行数以及时间片释放GIL;
      2. 在遇到io的操作时候主动释放。
    import threading
    
    total = 0
    
    def add():
        global total
        for i in range(1000000):
            total += 1
    
    def desc():
        global total
        for i in range(1000000):
            total -= 1    # 执行的过程中会释放锁,让给另一个线程
    
    thread1 = threading.Thread(target=add)
    thread2 = threading.Thread(target=desc)
    thread1.start()
    thread2.start()
    
    thread1.join()
    thread2.join()
    
    # 每次执行最终结果都不确定,即加和减的次数不定
    print(total)    
    

    多线程编程

    • 对于io操作来说,多线程和多进程性能差别不大(线程调度更轻量);
    • 可以通过Thread类实例化或集成Thread来实现多线程。
    # 模拟多线程爬虫(并发爬取列表页和详情页)
    
    import time
    import threading
    
    # 爬取详情页
    def get_detail_html(url):
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")
    
    # 从列表页爬取详情页url
    def get_detail_url(url):
        print("get detail url started")
        time.sleep(4)
        print("get detail url end")
    
    
    class GetDetailHtml(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            print("get detail html started")
            time.sleep(2)
            print("get detail html end")
    
    class GetDetailUrl(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            print("get detail url started")
            time.sleep(4)
            print("get detail url end")
    
    if  __name__ == "__main__":
        thread1 = GetDetailHtml("get_detail_html")
        thread2 = GetDetailUrl("get_detail_url")
        start_time = time.time()
        thread1.start()
        thread2.start()
    
        thread1.join()    # 等待完成后再继续执行下面的
        thread2.join()
    
        # 当主线程退出的时候,子线程才会杀死
        print ("last time: {}".format(time.time() - start_time))
    

    线程间通信

    共享变量 + 锁

    import time
    import threading
    
    from threading import Condition
    
    # 生产者当生产10个url以后就就等待,保证detail_url_list中最多只有十个url
    # 当url_list为空的时候,消费者就暂停
    
    detail_url_list = []        # list非线程安全,需要加锁
    # global引用过多时可以创建一个模块专门存放共享变量
    # from chapter11 import variables
    # 不可以from chapter11.variables import detail_url_list
    
    def get_detail_html(lock):
        # 爬取文章详情页
    
        global detail_url_list
    
        while True:
            if len(detail_url_list):
                lock.acquire()
                if len(detail_url_list):
                    url = detail_url_list.pop()
                    lock.release()
                    print("get detail html started")
                    time.sleep(2)
                    print("get detail html end")
                else:
                    lock.release()
                    time.sleep(1)
    
    def get_detail_url(lock):
        global detail_url_list
    
        # 爬取文章列表页(列表页爬速度比详情页快,可以开启多个线程爬去详情页)
        while True:
            print("get detail url started")
            time.sleep(4)
            for i in range(20):
                lock.acquire()
                if len(detail_url_list) >= 10:
                    lock.release()
                    time.sleep(1)
                else:
                    detail_url_list.append("http://projectsedu.com/{id}".format(id=i))
                    lock.release()
            print("get detail url end")
    
    if  __name__ == "__main__":
        lock = RLock()
        thread_detail_url = threading.Thread(target=get_detail_url, args=(lock,))
        for i in range(10):
            html_thread = threading.Thread(target=get_detail_html, args=(lock,))
            html_thread.start()
    
        #当主线程退出的时候, 子线程kill掉
        print ("last time: {}".format(time.time() - start_time))
    

    队列

    import time
    import threading
    from queue import Queue
    
    def get_detail_html(queue):
        # 爬取文章详情页
        while True:
            url = queue.get()    # Queue默认阻塞
            # for url in detail_url_list:
            print("get detail html started")
            time.sleep(2)
            print("get detail html end")
    
    def get_detail_url(queue):
        # 爬取文章列表页
        while True:
            print("get detail url started")
            time.sleep(4)
            for i in range(20):
                queue.put("http://projectsedu.com/{id}".format(id=i))
            print("get detail url end")
    
    if  __name__ == "__main__":
        detail_url_queue = Queue(maxsize=1000)
    
        thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,))
        for i in range(10):
            html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,))
            html_thread.start()
        start_time = time.time()
        detail_url_queue.task_done()    # 主动使Queue退出
        detail_url_queue.join()
    
        # 当主线程退出的时候, 子线程kill掉
        print ("last time: {}".format(time.time() - start_time))
    

    锁:线程间同步

    • 使用锁可以实现线程同步,但会影响性能,也可能导致死锁;
    • 重入锁:在同一个线程里,可以连续调用多次acquire, 注意acquire的次数要和release的次数相等;
    from threading import Lock, RLock, Condition
    import threading
    
    total = 0
    lock = RLock()      # 重入锁(在同一线程中可多次acquire)
    # lock = Lock()       # 一般锁,多次申请会造成死锁
    
    def add():
        global lock
        global total
        for i in range(1000000):
            lock.acquire()    # 申请锁(失败则等待)
            lock.acquire()
            total += 1
            lock.release()
            lock.release()
    
    def desc():
        global total
        global lock
        for i in range(1000000):
            lock.acquire()
            total -= 1
            lock.release()
    
    thread1 = threading.Thread(target=add)
    thread2 = threading.Thread(target=desc)
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    print(total)
    """
    死锁:
        互斥
        不可抢占
        请求且占有
        循环等待
    
    A(a, b)
    acquire (a)
    acquire (b)
    
    B(a, b)
    acquire (b)
    acquire (a)
    """
    

    条件变量

    • 只用锁无法确保两个线程交替运行(进度不一致),需要一个“通知-等待”的机制,在本线程工作完成后由下一个线程工作,并等待该线程的通知;
    • 使用通知-等待机制要注意线程启动的顺序(先启动需要被notify的,即被动方);
    • 在调用with...cond之后(在cond的作用域中)才能调用wait或者notify方法;
    • condition有两层锁, 一把底层锁会在线程调用了wait方法的时候释放,上层锁会在每次调用wait时分配一把并放入到cond等待队列中,等到notify方法的唤醒。
    import threading
    from concurrent import futures
    
    class XiaoAi(threading.Thread):
    
        def __init__(self, cond):
            super().__init__(name="小爱")
            self.cond = cond
    
        def run(self):
            with self.cond:
                self.cond.wait()
                print("{} : 在 ".format(self.name))
                self.cond.notify()
    
                self.cond.wait()
                print("{} : 好啊 ".format(self.name))
                self.cond.notify()
    
                self.cond.wait()
                print("{} : 君住长江尾 ".format(self.name))
                self.cond.notify()
    
                self.cond.wait()
                print("{} : 共饮长江水 ".format(self.name))
                self.cond.notify()
    
                self.cond.wait()
                print("{} : 此恨何时已 ".format(self.name))
                self.cond.notify()
    
                self.cond.wait()
                print("{} : 定不负相思意 ".format(self.name))
                self.cond.notify()
    
    class TianMao(threading.Thread):
    
        def __init__(self, cond):
            super().__init__(name="天猫精灵")
            self.cond = cond
    
        def run(self):
            with self.cond:
                print("{} : 小爱同学 ".format(self.name))
                self.cond.notify()
                self.cond.wait()
    
                print("{} : 我们来对古诗吧 ".format(self.name))
                self.cond.notify()
                self.cond.wait()
    
                print("{} : 我住长江头 ".format(self.name))
                self.cond.notify()
                self.cond.wait()
    
                print("{} : 日日思君不见君 ".format(self.name))
                self.cond.notify()
                self.cond.wait()
    
                print("{} : 此水几时休 ".format(self.name))
                self.cond.notify()
                self.cond.wait()
    
                print("{} : 只愿君心似我心 ".format(self.name))
                self.cond.notify()
                self.cond.wait()
    
    if __name__ == "__main__":
        cond = threading.Condition()
        xiaoai = XiaoAi(cond)
        tianmao = TianMao(cond)
    
        xiaoai.start()
        tianmao.start()
    

    信号量

    • 在读写分离的场景,一般只能被一个线程写,但允许多个线程读,通过Semaphore可以控制进入的数量;
    • 对于一个semaphore,调用acquire时数值 - 1,直到数值减到0则会加锁;调用release释放,数值 + 1;
    • 在需要控制并发程度的场景,信号量也能很好地发挥作用。
    import threading
    import time
    
    class HtmlSpider(threading.Thread):
        def __init__(self, url, sem):
            super().__init__()
            self.url = url
            self.sem = sem
    
        def run(self):
            time.sleep(2)
            print("got html text success")
            self.sem.release()
    
    class UrlProducer(threading.Thread):
        def __init__(self, sem):
            super().__init__()
            self.sem = sem
    
        def run(self):
            for i in range(20):
                self.sem.acquire()      # 启动线程前申请,在线程内部释放
                html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)
                html_thread.start()
    
    if __name__ == "__main__":
        sem = threading.Semaphore(3)
        url_producer = UrlProducer(sem)
        url_producer.start()
    

    线程池

    • 使用线程池实现线程重用、状态与返回值管理(使用done方法当一个线程完成的时候主线程能立即知道)
    • futures包中多线程与多进程接口一致,能减少开发难度
    • task的返回容器:Future对象(当时未完成,但完成后可以通过对象获取结果)。
    from concurrent.futures import ThreadPoolExecutor
    import time
    
    def get_html(times):
        time.sleep(times)
        print("get page {} success".format(times))
        return times
    
    executor = ThreadPoolExecutor(max_workers=2)
    # 通过submit函数提交执行的函数到线程池中, 立即返回
    task1 = executor.submit(get_html, (3))
    task2 = executor.submit(get_html, (2))
    task1.done()            # 获取task1执行状态
    task1.result()          # 获取task1执行结果
    task2.cancel()          # 取消task2执行
    

    批量提交线程,获取成功执行的线程

    from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
    
    executor = ThreadPoolExecutor(max_workers=2)
    
    # 使用as_completed生成器,每有一个线程完成即yield
    urls = [3, 2, 4]
    all_task = [executor.submit(get_html, (url)) for url in urls]
    wait(all_task, return_when=FIRST_COMPLETED)     # 等待首个子线程执行完成,主线程再继续执行
    print('main')
    
    for future in as_completed(all_task):
        data = future.result()
        print("get {} page".format(data))
        
    
    # 通过executor的map获取已经完成的task的值(将每个url传入函数中一一执行)
    for data in executor.map(get_html, urls):
        print("get {} page".format(data))
    

    多进程

    • 对于在Python中存在GIL,消耗CPU的操作无法利用多核优势,使用多线程无法实现并行操作,此时应使用多进程;
    • 进程切换代价比较高,对于频繁IO操作使用多线程更好(开销更小、更稳定);
    • Windows下多线程多进程编程必须加入if __name__ == '__main__'

    CPU操作:

    from concurrent.futures import ThreadPoolExecutor, as_completed
    from concurrent.futures import ProcessPoolExecutor
    
    def fib(n):
        if n<=2:
            return 1
        return fib(n-1)+fib(n-2)
    
    # 使用多线程
    with ThreadPoolExecutor(3) as executor:
        all_task = [executor.submit(fib, (num)) for num in range(25, 40)]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))
    
        print("last time is: {}".format(time.time()-start_time))
    
    # 使用多进程
    with ProcessPoolExecutor(3) as executor:
        all_task = [executor.submit(fib, (num)) for num in range(25, 40)]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))
    
        print("last time is: {}".format(time.time()-start_time))
    

    IO操作:

    def random_sleep(n):
        time.sleep(n)
        return n
    
    # 使用多线程
    with ThreadPoolExecutor(3) as executor:
        all_task = [executor.submit(random_sleep, (num)) for num in [2] * 30]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))
        print("last time is: {}".format(time.time() - start_time))
    
    # 使用多进程
    with ProcessPoolExecutor(3) as executor:
        all_task = [executor.submit(fib, (num)) for num in range(25, 40)]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))
        print("last time is: {}".format(time.time()-start_time))
    

    多进程编程

    • 执行fork会马上创建一个子进程,同时主进程继续向下执行;
    • 子进程会把主进程的数据复制一份独自执行,与主进程隔离;
    import os
    # fork只能用于unix/linux中
    pid = os.fork()     
    print("ywh")        # 从这句开始,主进程、子进程都会执行
    if pid == 0:
        print('子进程 {} ,父进程是: {}.' .format(os.getpid(), os.getppid()))
    else:
        print('我是父进程:{}.'.format(pid))
    

    使用multiprocessing和concurrent.futures包

    def get_html(n):
        time.sleep(n)
        print("sub_progress success")
        return n
    
    # 方法1
    progress = multiprocessing.Process(target=get_html, args=(2,))
    print(progress.pid)
    progress.start()
    print(progress.pid)
    progress.join()
    print("main progress end")
    
    # 方法2
    pool = multiprocessing.Pool(multiprocessing.cpu_count())    # 默认为系统CPU数
    result = pool.apply_async(get_html, args=(3,))      # 异步提交
    pool.close()            # 必须关闭,不再接收新的任务
    pool.join()             # 等待任务完成
    print(result.get())     # 获取返回结果
    
    # 方法3
    for result in pool.imap(get_html, [1,5,3]):
        print("{} sleep success".format(result))
        
    # 方法4
    for result in pool.imap_unordered(get_html, [1, 5, 3]):
        print("{} sleep success".format(result))
    

    进程间通信

    • 注意多线程和多进程通信的包不一样,不能重用;
    • 多线程中共享全局变量的方法不能用于多进程(数据全部复制到子进程);
    • 线程池:multiprocessing中的Queue不能用于进程池,而应使用Manager.Queue;
    • 管道性能比Queue高,但只适用于两个进程之间的通信;
    • Python内置有很多内存共享的数据结构,在multiprocessing.Manager,需要注意数据同步。

    多线程通信

    import time
    from multiprocessing import Process, Queue, Pool
    
    def producer(queue):
        queue.put("a")
        time.sleep(2)
    
    def consumer(queue):
        time.sleep(2)
        data = queue.get()
        print(data)
    
    queue = Queue(10)
    my_producer = Process(target=producer, args=(queue,))
    my_consumer = Process(target=consumer, args=(queue,))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()
    

    进程池

    from multiprocessing import Process, Manager
    
    def producer(queue):
        queue.put("a")
        time.sleep(2)
    
    def consumer(queue):
        time.sleep(2)
        data = queue.get()
        print(data)
    
    queue = Manager().Queue(10)
    pool = Pool(2)
    
    pool.apply_async(producer, args=(queue,))
    pool.apply_async(consumer, args=(queue,))
    
    pool.close()
    pool.join()
    
    from queue import Queue                 # 多线程
    from multiprocessing import Queue       # 多进程
    from multiprocessing import Manager     # 进程池
    

    管道

    from multiprocessing import Process, Pipe
    
    def producer(pipe):
        pipe.send("bobby")
    
    def consumer(pipe):
        print(pipe.recv())
    
    if __name__ == "__main__":
        recevie_pipe, send_pipe = Pipe()
        my_producer = Process(target=producer, args=(send_pipe,))
        my_consumer = Process(target=consumer, args=(recevie_pipe,))
    
        my_producer.start()
        my_consumer.start()
        my_producer.join()
        my_consumer.join()
    

    相关文章

      网友评论

          本文标题:Python 经验 - 多线程与多进程

          本文链接:https://www.haomeiwen.com/subject/xaiyiftx.html