美文网首页大数据,机器学习,人工智能
Python进阶 - 高性能计算之多线程

Python进阶 - 高性能计算之多线程

作者: ChaoesLuol | 来源:发表于2020-02-22 15:17 被阅读0次

    写在前面

    这个系列是笔者学习python一些进阶功能的笔记和思考,水平有限,错漏在所难免,还请方家不吝指教。

    什么是线程?

    首先应该了解操作系统如何支持多任务运行的,这里有“并行”和“并发”两个概念。

    • 并行: 在同一时刻有多条指令在多个处理器上同时执行,通俗点就是CPU数>=任务数的情况;
    • 并发:在同一时刻只能有一条指令执行,但是多个任务被快速轮换执行,使得宏观上让人感觉到有多个任务同时执行的效果。通俗点来说就是CPU数<任务数的情况。

    在操作系统中运行的一个程序就是一个进程(Process),它是代码+资源的组合。而在一个进程中,有一个或者多个线程(Thread)。线程是进程的组成部分,一个进程至少有一个主线程来完成进程从开始到结束的全部操作。

    单核CPU如何运行多个线程?

    • 时间片轮转:操作系统让每个程序依次运行极短的时间
    • 优先级调度:让高优先级的任务优先占用CPU

    python中多线程的实现

    threading模块

    t1 = threading.Thread(target = someFunction, args = ()),此时会创建一个线程对象,但是不会直接创建线程。用args关键词可以为函数传入参数,但是注意这里传入的参数因为数量不定,因此一定要是一个元组

    调用线程对象的start方法才会创建线程,并且让线程开始运行。

    如下面的例子:

    import threading
    
    
    def my_func(cnt):
        for i in range(cnt):
            print(i)
    
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=my_func, args=(10,))  # 创建一个线程
        t1.start() # 启动子线程
    

    主线程需要负责回收分配给子线程的资源,因此主线程一定会晚于子线程结束。

    继承thread类

    用继承了Thread等类也可以实现线程创建。但是这个类中一定需要定义run方法,这样在start启动线程后会自动调用run方法,例如以下程序:

    from time import sleep
    import threading
    
    class myThread(threading.Thread):
        def run(self):
            for i in range(10):
                sleep(1)
                msg = "I'm " + self.name + " @ " + str(i)
                print(msg)
    
    def main():
        testThread = myThread()
        testThread.start()
    
    if __name__ == "__main__":
        main()
    

    会得到以下结果:

    I'm Thread-1 @ 0
    I'm Thread-1 @ 1
    I'm Thread-1 @ 2
    I'm Thread-1 @ 3
    I'm Thread-1 @ 4
    I'm Thread-1 @ 5
    I'm Thread-1 @ 6
    I'm Thread-1 @ 7
    I'm Thread-1 @ 8
    I'm Thread-1 @ 9
    

    如果需要调用一系列的函数,那么可以将其封装在这个类中,然后在run方法里调用。

    不同线程中全局变量的共享

    在不同的线程中,全局变量是共享的,这使得不同线程之间协作处理数据非常方便。但是这种共享也会有负面影响 -- 造成资源竞争,例如如下代码:

    from time import sleep
    import threading
    
    g_num = 0
    
    def test1(num):
        global g_num
        for i in range(num):
            g_num += 1
        print("g_num in test1: %d" %g_num)
    
    
    def test2(num):
        global g_num
        for i in range(num):
            g_num += 1
        print("g_num in test2: %d" %g_num)
    
    
    def main():
        t1 = threading.Thread(target=test1, args=(1000000,))
        t2 = threading.Thread(target=test2, args=(1000000,))
    
        t1.start()
        t2.start()
        sleep(5)
    
        print("g_num in main thread: %d" % g_num)
    
    
    if __name__ == "__main__":
        main()
    

    执行后会得到如下结果:

    g_num in test1: 1221968
    g_num in test2: 1323741
    g_num in main thread: 1323741
    

    这里得到的g_num并不是我们想象的2000000,因为在执行python代码时,我们使得数据自加的一句python代码会被翻译为好几句机器码:

    • 读取数据
    • 数据+1
    • 存储数据

    在执行时,由于只使用了一个CPU,CPU会采用时间片轮转的方法来模拟多任务。因此实际上会在任意一步被cpu打断,例如在完成数据+1后,在存储数据时被打断,这样增加后的数据就没有存入内存,下一个线程从内存读取时,读到的就是没有自增前的数,这样可能使得两个线程中自增的数据相互覆盖,导致加到最后得到的值要比想象中的小。这种问题,也叫做“数据不同步”。

    线程锁

    当多个线程几乎同时修改一个数据时,需要进行同步控制。线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入<u>互斥锁</u>。

    互斥锁会给资源附加一个状态:锁定/非锁定。

    当某个线程需要修改资源时,先将其锁定,此时其他线程不可以修改该资源;到该线程修改结束后,释放资源,使其变为非锁定,其他的线程才能再次锁定该资源,进行修改。这样互斥锁就保证了每次只有一个线程进行写入操作,保证了多线程下全局变量的正确性。

    为了实现互斥锁,threading模块中提供了Lock和RLock两个类:

    • threading.Lock是一个基本锁对象,每次只能锁定一次,其余的锁请求需要等锁释放后才能获取;
    • threading.RLock是可重入锁(Reentrant Lock),在同一个线程中可以进行多次锁定和释放,但是锁定和释放的方法必须成堆出现,如果调用了n次锁定,那么只有n次释放才能解锁。

    Lock和RLock两个类都提供了以下方法来锁定和释放:

    • acquire(blocking = True, timeout = -1)进行锁定,blocking为True时会堵塞当前线程,直到其他线程释放该锁,当前线程获取到这个锁为止;而blocking为False的情况下则不会堵塞当前线程,而是往下执行
    • release()释放锁

    对于上面的问题,我们可以用互斥锁来解决多个线程之间的资源竞争问题,如夏例:

    from time import sleep
    import threading
    
    g_num = 0
    mutex = threading.Lock() # 建立互斥锁
    
    def test1(num):
        global g_num
        mutex.acquire(True) # 在更改数据之前获得互斥锁
        for i in range(num):
            g_num += 1
        mutex.release() # 更改完数据之后释放锁定
        print("g_num in test1: %d" %g_num)
    
    
    def test2(num):
        global g_num
        mutex.acquire(True) # 在更改数据之前获得互斥锁
        for i in range(num):
            g_num += 1
        mutex.release() # 更改完数据之后释放锁定
        print("g_num in test2: %d" %g_num)
    
    
    def main():
        t1 = threading.Thread(target=test1, args=(1000000,))
        t2 = threading.Thread(target=test2, args=(1000000,))
    
        t1.start()
        t2.start()
        sleep(5)
    
        print("g_num in main thread: %d" % g_num)
    
    
    if __name__ == "__main__":
        main()
    

    这样,我们在最后得到的就是我们预期的结果了:

    g_num in test1: 1000000
    g_num in test2: 2000000
    g_num in main thread: 2000000
    

    但是需要注意,用互斥锁会带来以下两个问题:

    • 阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式运行,其他模式处于堵塞的状态,效率就大大下降了;
    • 由于可以存在多个锁,不同的线程持有不同锁,并试图获取对方持有的锁时,可能会造成死锁。

    死锁

    线程间共享多个资源时,如果两个线程分别占有一部分资源并且同时等待对方的资源,就可能造成死锁。当出现死锁时,会造成应用停止响应,如下例:

    import threading
    from time import sleep
    
    
    lockA = threading.Lock()
    lockB = threading.Lock()
    
    def testA():
        # 为进程A上锁
        lockA.acquire()
        print("----Lock A acquired in testA----")
        sleep(1)
        # 中间需要操作另一批数据,上锁B
        lockB.acquire()
        print("----Lock B acquired in testA----")
        sleep(2)
        # 数据操作完成,解开锁B
        lockB.release()
        print("----Lock B released in testA----")
        # 完成操作,释放锁A
        lockA.release()
        print("----Lock A released in testA----")
    
    
    def testB():
        # 开始操作时为进程B上锁
        lockB.acquire()
        print("----Lock B acquired in testB----")
        sleep(1)
        # 进行下一步操作前需要获取锁A
        lockA.acquire()
        print("----Lock A acquired in testB----")
        sleep(2)
        # 进行完数据操作释放锁A
        lockA.release()
        print("----Lock A released in testB----")
        # 释放锁B
        lockB.release()
        print("----Lock B released in testB----")
    
    
    def main():
        t1 = threading.Thread(target=testA)
        t2 = threading.Thread(target=testB)
    
        t1.start()
        t2.start()
    
    
    if __name__ == "__main__":
        main()
    

    在这个例子中,线程t1开始时,为lockA上锁,并进入睡眠(模拟一些耗时操作),而同时开始的线程t2为lockB上锁,等t1向下执行时,需要lockB,lockB处于上锁状态,因此线程t1堵塞,等待lockB被释放;而t2在执行时,acquire lockA失败,也进入了堵塞状态,等待lockA被释放。

    这样,两个线程互相需要对方释放互斥锁,两个线程都无法向下执行,进入了死锁状态。

    死锁的避免

    • 程序设计时尽量避免(例如银行家算法)
    • 添加超时时间限制

    ThreadLocal

    除了使用互斥锁以外,还有一种办法可以实现各线程间的数据隔离,那就是使用threading.local()。它会为各个变量创建完全属于他们自己的副本(也就是线程局部变量),这样各个线程操作的就是属于自己的私有资源,可以杜绝数据不同步的问题。

    import threading
    from time import sleep
    
    from time import sleep
    import threading
    
    g_num = 0
    local = threading.local()
    
    
    def test1(num):
        global g_num
        local.g_num = g_num  # 将g_num绑定一个线程局部变量
        for i in range(num):
            local.g_num += 1
        print("g_num in test1: %d" % local.g_num)
    
    
    def test2(num):
        global g_num
        local.g_num = g_num  # 将g_num绑定一个线程局部变量
        for i in range(num):
            local.g_num += 1
        print("g_num in test2: %d" % local.g_num)
    
    
    def main():
        t1 = threading.Thread(target=test1, args=(1000000,))
        t2 = threading.Thread(target=test2, args=(1000000,))
    
        t1.start()
        t2.start()
        t1.join()  # 等待线程t1执行完毕
        t2.join()  # 等待线程t2执行完毕
    
        print("g_num in main thread: %d" % g_num)
    
    
    if __name__ == "__main__":
        main()
    

    结果为:

    g_num in test1: 1000000
    g_num in test2: 1000000
    g_num in main thread: 0
    

    可以看到,尽管我们将全局变量g_num绑定在线程局部变量中,但是每个线程操作的实际上是自己的线程局部变量,并不会作用于我们绑定上去的全局变量。

    Python中多线程的问题

    在Python中(当前使用版本3.7,在3.8中据说会用局部解释器绕开GIL的问题),多线程并不能真正有效利用多核,例如如下代码:

    import threading, time, multiprocessing
    
    
    def my_counter(num):
        i = 0
        for _ in range(num):
            i += 1
        return True
    
    
    def main():
        # 单线程顺序执行
        thread_array = {}
        start_time = time.time()
        count_num = int(1e8)
        for tid in range(2):
            t = threading.Thread(target=my_counter, args=(count_num,))
            t.start()
            t.join()
        end_time = time.time()
        print("Total time for two sequential threads: ", round(end_time - start_time, 2), " s")
    
        # 双线程并行
        print("CPU num for current machine: ",multiprocessing.cpu_count())
        thread_array = {}
        start_time = time.time()
        for tid in range(2):
            t = threading.Thread(target=my_counter, args=(count_num,))
            t.start()
            thread_array[tid] = t
        for tid in thread_array.keys():
            thread_array[tid].join()
        end_time = time.time()
        print("Total time for multi-threads: ", round(end_time - start_time, 2), " s")
    
    if __name__ == '__main__':
        main()
    

    运行的结果如下:

    Total time for two sequential threads:  11.32  s
    CPU num for current machine:  4
    Total time for multi-threads:  11.69  s
    

    可以看到,在四核的电脑上,两个线程用多线程并行和单线程串行,速度并没有任何提高。

    因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

    GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

    解决方法

    • 用多进程替代多线程

    multiprocess库的出现很大程度上是为了弥补thread库因为GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方便迁移。唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的GIL,因此也不会出现进程之间的GIL争抢。

    当然multiprocess也不是万能良药。它的引入会增加程序实现时线程间数据通讯和同步的困难。就拿计数器来举例子,如果我们要多个线程累加同一个变量,对于thread来说,申明一个global变量,用thread.Lock的context包裹住三行就搞定了。而multiprocess由于进程之间无法看到对方的数据,只能通过在主线程申明一个Queue,put再get或者用share memory的方法。这个额外的实现成本使得本来就非常痛苦的多线程程序编码,变得更加痛苦了。

    在将上面用threading实现的多任务改为multiprocessing:

    import threading, time, multiprocessing
    
    
    def my_counter(num):
        i = 0
        for _ in range(num):
            i += 1
        return True
    
    
    def main():
        # 单进程顺序执行
        process_array = {}
        start_time = time.time()
        count_num = int(1e8)
        for pid in range(2):
            p = multiprocessing.Process(target=my_counter, args=(count_num,))
            p.start()
            p.join()
        end_time = time.time()
        print("Total time for two sequential processes: ", round(end_time - start_time, 2), " s")
    
        # 多进程并行
        print("CPU num for current machine: ", multiprocessing.cpu_count())
        process_array = {}
        start_time = time.time()
        for tid in range(2):
            t = multiprocessing.Process(target=my_counter, args=(count_num,))
            t.start()
            process_array[tid] = t
        for tid in process_array.keys():
            process_array[tid].join()
        end_time = time.time()
        print("Total time for multi-processings: ", round(end_time - start_time, 2), " s")
    
    
    if __name__ == '__main__':
        main()
    

    结果如下,可以看到多进程相比单进程,速度有了明显提升:

    Total time for two sequential processes:  11.3  s
    CPU num for current machine:  4
    Total time for multi-processings:  7.91  s
    
    • 用其他解析器

    之前也提到了既然GIL只是CPython的产物,那么其他解析器是不是更好呢?没错,像JPython和IronPython这样的解析器由于实现语言的特性,他们不需要GIL的帮助。然而由于用了Java/C#用于解析器实现,他们也失去了利用社区众多C语言模块有用特性的机会。所以这些解析器也因此一直都比较小众。毕竟功能和性能大家在初期都会选择前者,Done is better than perfect。

    相关文章

      网友评论

        本文标题:Python进阶 - 高性能计算之多线程

        本文链接:https://www.haomeiwen.com/subject/bcylqhtx.html