美文网首页大数据,机器学习,人工智能
Python进阶 - 高性能计算之多线程

Python进阶 - 高性能计算之多线程

作者: ChaoesLuol | 来源:发表于2020-02-22 15:17 被阅读0次

写在前面

这个系列是笔者学习python一些进阶功能的笔记和思考,水平有限,错漏在所难免,还请方家不吝指教。

什么是线程?

首先应该了解操作系统如何支持多任务运行的,这里有“并行”和“并发”两个概念。

  • 并行: 在同一时刻有多条指令在多个处理器上同时执行,通俗点就是CPU数>=任务数的情况;
  • 并发:在同一时刻只能有一条指令执行,但是多个任务被快速轮换执行,使得宏观上让人感觉到有多个任务同时执行的效果。通俗点来说就是CPU数<任务数的情况。

在操作系统中运行的一个程序就是一个进程(Process),它是代码+资源的组合。而在一个进程中,有一个或者多个线程(Thread)。线程是进程的组成部分,一个进程至少有一个主线程来完成进程从开始到结束的全部操作。

单核CPU如何运行多个线程?

  • 时间片轮转:操作系统让每个程序依次运行极短的时间
  • 优先级调度:让高优先级的任务优先占用CPU

python中多线程的实现

threading模块

t1 = threading.Thread(target = someFunction, args = ()),此时会创建一个线程对象,但是不会直接创建线程。用args关键词可以为函数传入参数,但是注意这里传入的参数因为数量不定,因此一定要是一个元组

调用线程对象的start方法才会创建线程,并且让线程开始运行。

如下面的例子:

import threading


def my_func(cnt):
    for i in range(cnt):
        print(i)


if __name__ == '__main__':
    t1 = threading.Thread(target=my_func, args=(10,))  # 创建一个线程
    t1.start() # 启动子线程

主线程需要负责回收分配给子线程的资源,因此主线程一定会晚于子线程结束。

继承thread类

用继承了Thread等类也可以实现线程创建。但是这个类中一定需要定义run方法,这样在start启动线程后会自动调用run方法,例如以下程序:

from time import sleep
import threading

class myThread(threading.Thread):
    def run(self):
        for i in range(10):
            sleep(1)
            msg = "I'm " + self.name + " @ " + str(i)
            print(msg)

def main():
    testThread = myThread()
    testThread.start()

if __name__ == "__main__":
    main()

会得到以下结果:

I'm Thread-1 @ 0
I'm Thread-1 @ 1
I'm Thread-1 @ 2
I'm Thread-1 @ 3
I'm Thread-1 @ 4
I'm Thread-1 @ 5
I'm Thread-1 @ 6
I'm Thread-1 @ 7
I'm Thread-1 @ 8
I'm Thread-1 @ 9

如果需要调用一系列的函数,那么可以将其封装在这个类中,然后在run方法里调用。

不同线程中全局变量的共享

在不同的线程中,全局变量是共享的,这使得不同线程之间协作处理数据非常方便。但是这种共享也会有负面影响 -- 造成资源竞争,例如如下代码:

from time import sleep
import threading

g_num = 0

def test1(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("g_num in test1: %d" %g_num)


def test2(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("g_num in test2: %d" %g_num)


def main():
    t1 = threading.Thread(target=test1, args=(1000000,))
    t2 = threading.Thread(target=test2, args=(1000000,))

    t1.start()
    t2.start()
    sleep(5)

    print("g_num in main thread: %d" % g_num)


if __name__ == "__main__":
    main()

执行后会得到如下结果:

g_num in test1: 1221968
g_num in test2: 1323741
g_num in main thread: 1323741

这里得到的g_num并不是我们想象的2000000,因为在执行python代码时,我们使得数据自加的一句python代码会被翻译为好几句机器码:

  • 读取数据
  • 数据+1
  • 存储数据

在执行时,由于只使用了一个CPU,CPU会采用时间片轮转的方法来模拟多任务。因此实际上会在任意一步被cpu打断,例如在完成数据+1后,在存储数据时被打断,这样增加后的数据就没有存入内存,下一个线程从内存读取时,读到的就是没有自增前的数,这样可能使得两个线程中自增的数据相互覆盖,导致加到最后得到的值要比想象中的小。这种问题,也叫做“数据不同步”。

线程锁

当多个线程几乎同时修改一个数据时,需要进行同步控制。线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入<u>互斥锁</u>。

互斥锁会给资源附加一个状态:锁定/非锁定。

当某个线程需要修改资源时,先将其锁定,此时其他线程不可以修改该资源;到该线程修改结束后,释放资源,使其变为非锁定,其他的线程才能再次锁定该资源,进行修改。这样互斥锁就保证了每次只有一个线程进行写入操作,保证了多线程下全局变量的正确性。

为了实现互斥锁,threading模块中提供了Lock和RLock两个类:

  • threading.Lock是一个基本锁对象,每次只能锁定一次,其余的锁请求需要等锁释放后才能获取;
  • threading.RLock是可重入锁(Reentrant Lock),在同一个线程中可以进行多次锁定和释放,但是锁定和释放的方法必须成堆出现,如果调用了n次锁定,那么只有n次释放才能解锁。

Lock和RLock两个类都提供了以下方法来锁定和释放:

  • acquire(blocking = True, timeout = -1)进行锁定,blocking为True时会堵塞当前线程,直到其他线程释放该锁,当前线程获取到这个锁为止;而blocking为False的情况下则不会堵塞当前线程,而是往下执行
  • release()释放锁

对于上面的问题,我们可以用互斥锁来解决多个线程之间的资源竞争问题,如夏例:

from time import sleep
import threading

g_num = 0
mutex = threading.Lock() # 建立互斥锁

def test1(num):
    global g_num
    mutex.acquire(True) # 在更改数据之前获得互斥锁
    for i in range(num):
        g_num += 1
    mutex.release() # 更改完数据之后释放锁定
    print("g_num in test1: %d" %g_num)


def test2(num):
    global g_num
    mutex.acquire(True) # 在更改数据之前获得互斥锁
    for i in range(num):
        g_num += 1
    mutex.release() # 更改完数据之后释放锁定
    print("g_num in test2: %d" %g_num)


def main():
    t1 = threading.Thread(target=test1, args=(1000000,))
    t2 = threading.Thread(target=test2, args=(1000000,))

    t1.start()
    t2.start()
    sleep(5)

    print("g_num in main thread: %d" % g_num)


if __name__ == "__main__":
    main()

这样,我们在最后得到的就是我们预期的结果了:

g_num in test1: 1000000
g_num in test2: 2000000
g_num in main thread: 2000000

但是需要注意,用互斥锁会带来以下两个问题:

  • 阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式运行,其他模式处于堵塞的状态,效率就大大下降了;
  • 由于可以存在多个锁,不同的线程持有不同锁,并试图获取对方持有的锁时,可能会造成死锁。

死锁

线程间共享多个资源时,如果两个线程分别占有一部分资源并且同时等待对方的资源,就可能造成死锁。当出现死锁时,会造成应用停止响应,如下例:

import threading
from time import sleep


lockA = threading.Lock()
lockB = threading.Lock()

def testA():
    # 为进程A上锁
    lockA.acquire()
    print("----Lock A acquired in testA----")
    sleep(1)
    # 中间需要操作另一批数据,上锁B
    lockB.acquire()
    print("----Lock B acquired in testA----")
    sleep(2)
    # 数据操作完成,解开锁B
    lockB.release()
    print("----Lock B released in testA----")
    # 完成操作,释放锁A
    lockA.release()
    print("----Lock A released in testA----")


def testB():
    # 开始操作时为进程B上锁
    lockB.acquire()
    print("----Lock B acquired in testB----")
    sleep(1)
    # 进行下一步操作前需要获取锁A
    lockA.acquire()
    print("----Lock A acquired in testB----")
    sleep(2)
    # 进行完数据操作释放锁A
    lockA.release()
    print("----Lock A released in testB----")
    # 释放锁B
    lockB.release()
    print("----Lock B released in testB----")


def main():
    t1 = threading.Thread(target=testA)
    t2 = threading.Thread(target=testB)

    t1.start()
    t2.start()


if __name__ == "__main__":
    main()

在这个例子中,线程t1开始时,为lockA上锁,并进入睡眠(模拟一些耗时操作),而同时开始的线程t2为lockB上锁,等t1向下执行时,需要lockB,lockB处于上锁状态,因此线程t1堵塞,等待lockB被释放;而t2在执行时,acquire lockA失败,也进入了堵塞状态,等待lockA被释放。

这样,两个线程互相需要对方释放互斥锁,两个线程都无法向下执行,进入了死锁状态。

死锁的避免

  • 程序设计时尽量避免(例如银行家算法)
  • 添加超时时间限制

ThreadLocal

除了使用互斥锁以外,还有一种办法可以实现各线程间的数据隔离,那就是使用threading.local()。它会为各个变量创建完全属于他们自己的副本(也就是线程局部变量),这样各个线程操作的就是属于自己的私有资源,可以杜绝数据不同步的问题。

import threading
from time import sleep

from time import sleep
import threading

g_num = 0
local = threading.local()


def test1(num):
    global g_num
    local.g_num = g_num  # 将g_num绑定一个线程局部变量
    for i in range(num):
        local.g_num += 1
    print("g_num in test1: %d" % local.g_num)


def test2(num):
    global g_num
    local.g_num = g_num  # 将g_num绑定一个线程局部变量
    for i in range(num):
        local.g_num += 1
    print("g_num in test2: %d" % local.g_num)


def main():
    t1 = threading.Thread(target=test1, args=(1000000,))
    t2 = threading.Thread(target=test2, args=(1000000,))

    t1.start()
    t2.start()
    t1.join()  # 等待线程t1执行完毕
    t2.join()  # 等待线程t2执行完毕

    print("g_num in main thread: %d" % g_num)


if __name__ == "__main__":
    main()

结果为:

g_num in test1: 1000000
g_num in test2: 1000000
g_num in main thread: 0

可以看到,尽管我们将全局变量g_num绑定在线程局部变量中,但是每个线程操作的实际上是自己的线程局部变量,并不会作用于我们绑定上去的全局变量。

Python中多线程的问题

在Python中(当前使用版本3.7,在3.8中据说会用局部解释器绕开GIL的问题),多线程并不能真正有效利用多核,例如如下代码:

import threading, time, multiprocessing


def my_counter(num):
    i = 0
    for _ in range(num):
        i += 1
    return True


def main():
    # 单线程顺序执行
    thread_array = {}
    start_time = time.time()
    count_num = int(1e8)
    for tid in range(2):
        t = threading.Thread(target=my_counter, args=(count_num,))
        t.start()
        t.join()
    end_time = time.time()
    print("Total time for two sequential threads: ", round(end_time - start_time, 2), " s")

    # 双线程并行
    print("CPU num for current machine: ",multiprocessing.cpu_count())
    thread_array = {}
    start_time = time.time()
    for tid in range(2):
        t = threading.Thread(target=my_counter, args=(count_num,))
        t.start()
        thread_array[tid] = t
    for tid in thread_array.keys():
        thread_array[tid].join()
    end_time = time.time()
    print("Total time for multi-threads: ", round(end_time - start_time, 2), " s")

if __name__ == '__main__':
    main()

运行的结果如下:

Total time for two sequential threads:  11.32  s
CPU num for current machine:  4
Total time for multi-threads:  11.69  s

可以看到,在四核的电脑上,两个线程用多线程并行和单线程串行,速度并没有任何提高。

因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

解决方法

  • 用多进程替代多线程

multiprocess库的出现很大程度上是为了弥补thread库因为GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方便迁移。唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的GIL,因此也不会出现进程之间的GIL争抢。

当然multiprocess也不是万能良药。它的引入会增加程序实现时线程间数据通讯和同步的困难。就拿计数器来举例子,如果我们要多个线程累加同一个变量,对于thread来说,申明一个global变量,用thread.Lock的context包裹住三行就搞定了。而multiprocess由于进程之间无法看到对方的数据,只能通过在主线程申明一个Queue,put再get或者用share memory的方法。这个额外的实现成本使得本来就非常痛苦的多线程程序编码,变得更加痛苦了。

在将上面用threading实现的多任务改为multiprocessing:

import threading, time, multiprocessing


def my_counter(num):
    i = 0
    for _ in range(num):
        i += 1
    return True


def main():
    # 单进程顺序执行
    process_array = {}
    start_time = time.time()
    count_num = int(1e8)
    for pid in range(2):
        p = multiprocessing.Process(target=my_counter, args=(count_num,))
        p.start()
        p.join()
    end_time = time.time()
    print("Total time for two sequential processes: ", round(end_time - start_time, 2), " s")

    # 多进程并行
    print("CPU num for current machine: ", multiprocessing.cpu_count())
    process_array = {}
    start_time = time.time()
    for tid in range(2):
        t = multiprocessing.Process(target=my_counter, args=(count_num,))
        t.start()
        process_array[tid] = t
    for tid in process_array.keys():
        process_array[tid].join()
    end_time = time.time()
    print("Total time for multi-processings: ", round(end_time - start_time, 2), " s")


if __name__ == '__main__':
    main()

结果如下,可以看到多进程相比单进程,速度有了明显提升:

Total time for two sequential processes:  11.3  s
CPU num for current machine:  4
Total time for multi-processings:  7.91  s
  • 用其他解析器

之前也提到了既然GIL只是CPython的产物,那么其他解析器是不是更好呢?没错,像JPython和IronPython这样的解析器由于实现语言的特性,他们不需要GIL的帮助。然而由于用了Java/C#用于解析器实现,他们也失去了利用社区众多C语言模块有用特性的机会。所以这些解析器也因此一直都比较小众。毕竟功能和性能大家在初期都会选择前者,Done is better than perfect。

相关文章

  • Python进阶 - 高性能计算之多线程

    写在前面 这个系列是笔者学习python一些进阶功能的笔记和思考,水平有限,错漏在所难免,还请方家不吝指教。 什么...

  • Python进阶 - 高性能计算之多进程

    什么是进程? 进程是程序运行时,代码+代码运行时用到的资源(如网络带宽、声卡、显卡等),它是操作系统分配资源的基本...

  • Python-02进阶-04多进程多线程

    Python 进阶-04 进程线程协程并发等.md tags: Python 多进程 并发 进阶 必备知识 201...

  • Python多线程

    目录:一、线程的创建二、多线程互斥锁三、线程间通信四、线程池 Python并发之多线程 一、线程的创建 单线程示例...

  • Python进阶 - 高性能计算之协程

    迭代器 可迭代对象 什么是可迭代对象 可迭代对象就是对象的类中实现了__iter__方法的对象。对于可迭代对象,可...

  • iOS进阶之多线程

    写在前面 在平时的公司项目开发过程中,虽然几乎用不到多线程,但是对于了解某些三方实现原理、加深对多线程概念的理解 ...

  • iOS进阶之多线程

    进程 一个进程有一个或多个线程组成。进程只负责资源的调度和分配,线程才是程序正在的执行单元,负责代码的执行。 单线...

  • python之多线程

    注:本文是廖大的教程文章,本人也在学习,因为老是记不住,自己手打一边,代码也是亲自测试。廖大传送门 多进程 多个任...

  • Python之多线程

    什么是线程? 要弄清楚线程的定义,往往就要和进程相互比较,从比较中才能更准确地明白一个东西的定义。首先有个数量关系...

  • python之多线程

    进程的概念:以一个整体的形式暴露给操作系统管理,里面包含各种资源的调用。 对各种资源管理的集合就可以称为进程。线程...

网友评论

    本文标题:Python进阶 - 高性能计算之多线程

    本文链接:https://www.haomeiwen.com/subject/bcylqhtx.html