Python多线程

作者: Recalcitrant | 来源:发表于2019-07-06 12:00 被阅读0次

    目录:
    一、线程的创建
    二、多线程互斥锁
    三、线程间通信
    四、线程池

    Python并发之多线程

    一、线程的创建

    单线程示例:

    import time
    def run(n):
         print("task ", n)
         time.sleep(2)
    
    
    t0 = time.time()
    run("t1")
    run("t2")
    ts = time.time()
    print(ts - t0)
    
    运行结果

    1.启动多个线程(函数方式)

    在Python3中,Python提供了一个内置模块 threading.Thread,可以很方便地让我们创建多线程。
    threading.Thread() 一般接收两个参数:

    • 线程函数名:要放置线程让其后台执行的函数,由我们自已定义,注意不要加()。
    • 线程函数参数:线程函数名所需的参数,以元组的形式传入。若不需要参数,可以不指定。
      示例:
    import threading
    import time
    def run(n):
        print("task", n)
        time.sleep(2)
        print("{} finished!".format(n))
        ts = time.time()
        print("线程{}耗时:{}".format(n, ts-t0))
    
    
    t1 = threading.Thread(target=run, args=("t1",))
    t2 = threading.Thread(target=run, args=("t2",))
    t0 = time.time()
    t1.start()
    t2.start()
    print("main finished!")
    
    运行结果

    t1与t2几乎同时完成。

    2.启动多个线程(类方式)

    相比较函数而言,使用类创建线程,会比较麻烦一点。 首先,我们要自定义一个类,对于这个类有两点要求:

    • 必须继承 threading.Thread 这个父类;
    • 必须覆写 run 方法。

    这里的 run 方法,和我们上面线程函数的性质是一样的,可以写我们的业务逻辑程序。在 start() 后将会被自动调用。

    import threading
    import time
    class MyThread(threading.Thread):
        def __init__(self, n, sleep_time):
            super(MyThread, self).__init__()
            self.n = n
            self.sleep_time = sleep_time
    
        def run(self):
            print("running task {}".format(self.n))
            time.sleep(self.sleep_time)
            print("task {} done!".format(self.n))
            ts = time.time()
            print("线程{}耗时:{}".format(self.n, ts - t0))
    
    
    t1 = MyThread("t1", 2)
    t2 = MyThread("t2", 4)
    t0 = time.time()
    t1.start()
    t2.start()
    ts = time.time()
    print("main finished!")
    
    运行结果
    • join()方法

    .join():程序会等待该线程结束后,再执行后面的语句。

    import threading
    import time
    class MyThread(threading.Thread):
        def __init__(self, n, sleep_time):
            super(MyThread, self).__init__()
            self.n = n
            self.sleep_time = sleep_time
    
        def run(self):
            print("running task {}".format(self.n))
            time.sleep(self.sleep_time)
            print("task {} done!".format(self.n))
            ts = time.time()
            print("线程{}耗时:{}".format(self.n, ts - t0))
    
    
    t1 = MyThread("t1", 2)
    t2 = MyThread("t2", 4)
    t0 = time.time()
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    ts = time.time()
    print("main finished!")
    
    运行结果

    二、多线程互斥锁

    1.定义和使用锁

    import threading
    # 生成锁对象(全局唯一)
    lock = threading.Lock()
    
    # 获取锁(未获取到会阻塞程序,直到获取到锁才会往下执行)
    lock.acquire()
    
    # 释放锁
    lock.release()
    

    注意:lock.acquire() 和 lock.release()必须成对出现。否则就有可能造成死锁。
    可以使用使用上下文管理器来加锁:

    import threading
    lock = threading.Lock()
    with lock:
        操作代码
    
    • 不加锁示例:
    import threading
    import time
    g_num = 0
    
    
    def work1(num):
        global g_num
        for i in range(num):
            g_num += 1
        print("----in work1, g_num is %d----" % g_num)
    
    
    def work2(num):
        global g_num
        for i in range(num):
            g_num += 1
        print("----in work2, g_num is %d----" % g_num)
    
    
    print("---线程创建之前g_num is %d---" % g_num)
    t1 = threading.Thread(target=work1, args=(1000000,))
    t1.start()
    t2 = threading.Thread(target=work2, args=(1000000,))
    t2.start()
    
    # 等待线程执行完毕
    while len(threading.enumerate()) != 1:
        time.sleep(1)
    
    print("2个线程对同一个全局变量操作之后的最终结果是:%s" % g_num)
    
    运行结果

    如果多个线程同时对同一个全局变量操作,会出现资源竞争问题,从而数据结果会不正确。

    • 加锁示例:
    import threading
    import time
    g_num = 0
    lock = threading.Lock()
    
    
    def work1(num):
        global g_num
        with lock:
            for i in range(num):
                g_num += 1
        print("----in work1, g_num is %d----" % g_num)
    
    
    def work2(num):
        global g_num
        with lock:
            for i in range(num):
                g_num += 1
        print("----in work2, g_num is %d----" % g_num)
    
    
    print("---线程创建之前g_num is %d---" % g_num)
    t1 = threading.Thread(target=work1, args=(1000000,))
    t1.start()
    t2 = threading.Thread(target=work2, args=(1000000,))
    t2.start()
    
    # 等待线程执行完毕
    while len(threading.enumerate()) != 1:
        time.sleep(1)
    
    print("2个线程对同一个全局变量操作之后的最终结果是:%s" % g_num)
    
    运行结果

    2.死锁

    示例:

    import threading
    import time
    
    
    class MyThread1(threading.Thread):
        def run(self):
            # 对mutexA上锁
            mutexA.acquire()
    
            # mutexA上锁后,延时1秒,等待另外那个线程 把mutexB上锁
            print(self.name+'----do1---up----')
            time.sleep(1)
    
            # 此时会堵塞,因为这个mutexB已经被另外的线程抢先上锁了
            mutexB.acquire()
            print(self.name+'----do1---down----')
            mutexB.release()
    
            # 对mutexA解锁
            mutexA.release()
    
    
    class MyThread2(threading.Thread):
        def run(self):
            # 对mutexB上锁
            mutexB.acquire()
    
            # mutexB上锁后,延时1秒,等待另外那个线程 把mutexA上锁
            print(self.name+'----do2---up----')
            time.sleep(1)
    
            # 此时会堵塞,因为这个mutexA已经被另外的线程抢先上锁了
            mutexA.acquire()
            print(self.name+'----do2---down----')
            mutexA.release()
    
            # 对mutexB解锁
            mutexB.release()
    
    
    mutexA = threading.Lock()
    mutexB = threading.Lock()
    
    if __name__ == '__main__':
        t1 = MyThread1()
        t2 = MyThread2()
        t1.start()
        t2.start()
    
    运行结果

    3.全局锁(GIL)

    多线程和多进程是不一样的:
    多进程是真正的并行,而多线程是伪并行,实际上只是线程交替执行。

    GIL(Global Interpreter Lock,全局解释器锁)

    任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

    GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。而Python解释器,并不是只有CPython,除它之外,还有PyPy,Psyco,JPython,IronPython等。在绝大多数情况下,我们通常都认为 Python == CPython,所以也就默许了Python具有GIL锁这个事。

    如何避免性能受到GIL的影响:

    • 使用多进程代替多线程。
    • 更换Python解释器,不使用CPython

    通常情况下:

    • I/O密集型:适用多线程
    • CPU密集型:适用多进程

    三、线程间通信

    1.Queue队列

    import queue
    # maxsize默认为0,不受限
    # 一旦>0,而消息数又达到限制,q.put()也将阻塞
    q = queue.Queue(maxsize=0)
    
    # 阻塞程序,等待获取队列消息
    q.get()
    # 获取消息,设置超时时间
    q.get(timeout=5.0)
    
    # 发送消息
    q.put()
    
    # 等待所有的消息都被消费完
    q.join()
    
    # 以下三个方法代码中不要使用(由于具有瞬时性,所以没有参考价值):
    # 查询当前队列的消息个数
    q.qsize()
    
    # 队列消息是否都被消费完,True/False
    q.empty()
    
    # 检测队列里消息是否已满
    q.full()
    
    • 生产者-消费者模式



      示例:

    import queue
    q = queue.Queue(maxsize=0)
    
    
    def producer():     # 生产者
        for i in range(1000):
            q.put(i)
    
    
    def consumer():     # 消费者
        for i in range(1000):
            data = q.get()
            print(data)
    
    
    t1 = threading.Thread(target=producer,)
    t2 = threading.Thread(target=consumer,)
    t1.start()
    t2.start()
    

    四、线程池

    在使用多线程处理任务时也不是线程越多越好,由于在切换线程的时候,需要切换上下文环境,依然会造成cpu的大量开销。为解决这个问题,线程池的概念被提出来了。预先创建好一个较为优化的数量的线程,放到队列中,让过来的任务立刻能够使用,就形成了线程池。
    在Python3中,创建线程池是通过concurrent.futures函数库中的ThreadPoolExecutor类来实现的。
    future对象:在未来的某一时刻完成操作的对象。submit方法可以返回一个future对象。
    示例:

    import concurrent.futures as futures
    
    
    # 线程执行的函数
    def add(n1, n2):
        v = n1 + n2
        print('add:', v, ', tid:', threading.currentThread().ident)
        time.sleep(n1)
        return v
    
    
    # 创建一个线程池
    ex = futures.ThreadPoolExecutor(max_workers=3)      # 指定最多运行3个线程
    # 通过submit把需要执行的函数扔进线程池中
    f1 = ex.submit(add, 2, 3)       # submit返回一个future对象
    f2 = ex.submit(add, 2, 2)       # submit返回一个future对象
    print('main thread running!')
    print(f1.done())        # .done():看看任务结束了没
    print(f1.result())      # 获取结果(阻塞方法)
    print(f1.done())        # .done():看看任务结束了没
    print(f2.done())        # .done():看看任务结束了没
    print(f2.result())      # 获取结果(阻塞方法)
    print(f2.done())        # .done():看看任务结束了没
    
    运行结果

    获取线程执行结果方法:

    • 1.f.result()
    • 2.map()
    • 3.as_completed
    • 4.wait
    • 5.add_done_callback

    相关文章

      网友评论

        本文标题:Python多线程

        本文链接:https://www.haomeiwen.com/subject/bzsbcctx.html