写在前面
这个系列是笔者学习python一些进阶功能的笔记和思考,水平有限,错漏在所难免,还请方家不吝指教。
什么是线程?
首先应该了解操作系统如何支持多任务运行的,这里有“并行”和“并发”两个概念。
- 并行: 在同一时刻有多条指令在多个处理器上同时执行,通俗点就是CPU数>=任务数的情况;
- 并发:在同一时刻只能有一条指令执行,但是多个任务被快速轮换执行,使得宏观上让人感觉到有多个任务同时执行的效果。通俗点来说就是CPU数<任务数的情况。
在操作系统中运行的一个程序就是一个进程(Process),它是代码+资源的组合。而在一个进程中,有一个或者多个线程(Thread)。线程是进程的组成部分,一个进程至少有一个主线程来完成进程从开始到结束的全部操作。
单核CPU如何运行多个线程?
- 时间片轮转:操作系统让每个程序依次运行极短的时间
- 优先级调度:让高优先级的任务优先占用CPU
python中多线程的实现
threading模块
t1 = threading.Thread(target = someFunction, args = ())
,此时会创建一个线程对象,但是不会直接创建线程。用args
关键词可以为函数传入参数,但是注意这里传入的参数因为数量不定,因此一定要是一个元组
调用线程对象的start
方法才会创建线程,并且让线程开始运行。
如下面的例子:
import threading
def my_func(cnt):
for i in range(cnt):
print(i)
if __name__ == '__main__':
t1 = threading.Thread(target=my_func, args=(10,)) # 创建一个线程
t1.start() # 启动子线程
主线程需要负责回收分配给子线程的资源,因此主线程一定会晚于子线程结束。
继承thread类
用继承了Thread
等类也可以实现线程创建。但是这个类中一定需要定义run
方法,这样在start
启动线程后会自动调用run
方法,例如以下程序:
from time import sleep
import threading
class myThread(threading.Thread):
def run(self):
for i in range(10):
sleep(1)
msg = "I'm " + self.name + " @ " + str(i)
print(msg)
def main():
testThread = myThread()
testThread.start()
if __name__ == "__main__":
main()
会得到以下结果:
I'm Thread-1 @ 0
I'm Thread-1 @ 1
I'm Thread-1 @ 2
I'm Thread-1 @ 3
I'm Thread-1 @ 4
I'm Thread-1 @ 5
I'm Thread-1 @ 6
I'm Thread-1 @ 7
I'm Thread-1 @ 8
I'm Thread-1 @ 9
如果需要调用一系列的函数,那么可以将其封装在这个类中,然后在run
方法里调用。
不同线程中全局变量的共享
在不同的线程中,全局变量是共享的,这使得不同线程之间协作处理数据非常方便。但是这种共享也会有负面影响 -- 造成资源竞争,例如如下代码:
from time import sleep
import threading
g_num = 0
def test1(num):
global g_num
for i in range(num):
g_num += 1
print("g_num in test1: %d" %g_num)
def test2(num):
global g_num
for i in range(num):
g_num += 1
print("g_num in test2: %d" %g_num)
def main():
t1 = threading.Thread(target=test1, args=(1000000,))
t2 = threading.Thread(target=test2, args=(1000000,))
t1.start()
t2.start()
sleep(5)
print("g_num in main thread: %d" % g_num)
if __name__ == "__main__":
main()
执行后会得到如下结果:
g_num in test1: 1221968
g_num in test2: 1323741
g_num in main thread: 1323741
这里得到的g_num
并不是我们想象的2000000,因为在执行python代码时,我们使得数据自加的一句python代码会被翻译为好几句机器码:
- 读取数据
- 数据+1
- 存储数据
在执行时,由于只使用了一个CPU,CPU会采用时间片轮转的方法来模拟多任务。因此实际上会在任意一步被cpu打断,例如在完成数据+1后,在存储数据时被打断,这样增加后的数据就没有存入内存,下一个线程从内存读取时,读到的就是没有自增前的数,这样可能使得两个线程中自增的数据相互覆盖,导致加到最后得到的值要比想象中的小。这种问题,也叫做“数据不同步”。
线程锁
当多个线程几乎同时修改一个数据时,需要进行同步控制。线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入<u>互斥锁</u>。
互斥锁会给资源附加一个状态:锁定/非锁定。
当某个线程需要修改资源时,先将其锁定,此时其他线程不可以修改该资源;到该线程修改结束后,释放资源,使其变为非锁定,其他的线程才能再次锁定该资源,进行修改。这样互斥锁就保证了每次只有一个线程进行写入操作,保证了多线程下全局变量的正确性。
为了实现互斥锁,threading
模块中提供了Lock和RLock两个类:
-
threading.Lock
是一个基本锁对象,每次只能锁定一次,其余的锁请求需要等锁释放后才能获取; -
threading.RLock
是可重入锁(Reentrant Lock),在同一个线程中可以进行多次锁定和释放,但是锁定和释放的方法必须成堆出现,如果调用了n次锁定,那么只有n次释放才能解锁。
Lock和RLock两个类都提供了以下方法来锁定和释放:
-
acquire(blocking = True, timeout = -1)
进行锁定,blocking为True时会堵塞当前线程,直到其他线程释放该锁,当前线程获取到这个锁为止;而blocking为False的情况下则不会堵塞当前线程,而是往下执行 -
release()
释放锁
对于上面的问题,我们可以用互斥锁来解决多个线程之间的资源竞争问题,如夏例:
from time import sleep
import threading
g_num = 0
mutex = threading.Lock() # 建立互斥锁
def test1(num):
global g_num
mutex.acquire(True) # 在更改数据之前获得互斥锁
for i in range(num):
g_num += 1
mutex.release() # 更改完数据之后释放锁定
print("g_num in test1: %d" %g_num)
def test2(num):
global g_num
mutex.acquire(True) # 在更改数据之前获得互斥锁
for i in range(num):
g_num += 1
mutex.release() # 更改完数据之后释放锁定
print("g_num in test2: %d" %g_num)
def main():
t1 = threading.Thread(target=test1, args=(1000000,))
t2 = threading.Thread(target=test2, args=(1000000,))
t1.start()
t2.start()
sleep(5)
print("g_num in main thread: %d" % g_num)
if __name__ == "__main__":
main()
这样,我们在最后得到的就是我们预期的结果了:
g_num in test1: 1000000
g_num in test2: 2000000
g_num in main thread: 2000000
但是需要注意,用互斥锁会带来以下两个问题:
- 阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式运行,其他模式处于堵塞的状态,效率就大大下降了;
- 由于可以存在多个锁,不同的线程持有不同锁,并试图获取对方持有的锁时,可能会造成死锁。
死锁
线程间共享多个资源时,如果两个线程分别占有一部分资源并且同时等待对方的资源,就可能造成死锁。当出现死锁时,会造成应用停止响应,如下例:
import threading
from time import sleep
lockA = threading.Lock()
lockB = threading.Lock()
def testA():
# 为进程A上锁
lockA.acquire()
print("----Lock A acquired in testA----")
sleep(1)
# 中间需要操作另一批数据,上锁B
lockB.acquire()
print("----Lock B acquired in testA----")
sleep(2)
# 数据操作完成,解开锁B
lockB.release()
print("----Lock B released in testA----")
# 完成操作,释放锁A
lockA.release()
print("----Lock A released in testA----")
def testB():
# 开始操作时为进程B上锁
lockB.acquire()
print("----Lock B acquired in testB----")
sleep(1)
# 进行下一步操作前需要获取锁A
lockA.acquire()
print("----Lock A acquired in testB----")
sleep(2)
# 进行完数据操作释放锁A
lockA.release()
print("----Lock A released in testB----")
# 释放锁B
lockB.release()
print("----Lock B released in testB----")
def main():
t1 = threading.Thread(target=testA)
t2 = threading.Thread(target=testB)
t1.start()
t2.start()
if __name__ == "__main__":
main()
在这个例子中,线程t1开始时,为lockA上锁,并进入睡眠(模拟一些耗时操作),而同时开始的线程t2为lockB上锁,等t1向下执行时,需要lockB,lockB处于上锁状态,因此线程t1堵塞,等待lockB被释放;而t2在执行时,acquire lockA失败,也进入了堵塞状态,等待lockA被释放。
这样,两个线程互相需要对方释放互斥锁,两个线程都无法向下执行,进入了死锁状态。
死锁的避免
- 程序设计时尽量避免(例如银行家算法)
- 添加超时时间限制
ThreadLocal
除了使用互斥锁以外,还有一种办法可以实现各线程间的数据隔离,那就是使用threading.local()
。它会为各个变量创建完全属于他们自己的副本(也就是线程局部变量),这样各个线程操作的就是属于自己的私有资源,可以杜绝数据不同步的问题。
import threading
from time import sleep
from time import sleep
import threading
g_num = 0
local = threading.local()
def test1(num):
global g_num
local.g_num = g_num # 将g_num绑定一个线程局部变量
for i in range(num):
local.g_num += 1
print("g_num in test1: %d" % local.g_num)
def test2(num):
global g_num
local.g_num = g_num # 将g_num绑定一个线程局部变量
for i in range(num):
local.g_num += 1
print("g_num in test2: %d" % local.g_num)
def main():
t1 = threading.Thread(target=test1, args=(1000000,))
t2 = threading.Thread(target=test2, args=(1000000,))
t1.start()
t2.start()
t1.join() # 等待线程t1执行完毕
t2.join() # 等待线程t2执行完毕
print("g_num in main thread: %d" % g_num)
if __name__ == "__main__":
main()
结果为:
g_num in test1: 1000000
g_num in test2: 1000000
g_num in main thread: 0
可以看到,尽管我们将全局变量g_num
绑定在线程局部变量中,但是每个线程操作的实际上是自己的线程局部变量,并不会作用于我们绑定上去的全局变量。
Python中多线程的问题
在Python中(当前使用版本3.7,在3.8中据说会用局部解释器绕开GIL的问题),多线程并不能真正有效利用多核,例如如下代码:
import threading, time, multiprocessing
def my_counter(num):
i = 0
for _ in range(num):
i += 1
return True
def main():
# 单线程顺序执行
thread_array = {}
start_time = time.time()
count_num = int(1e8)
for tid in range(2):
t = threading.Thread(target=my_counter, args=(count_num,))
t.start()
t.join()
end_time = time.time()
print("Total time for two sequential threads: ", round(end_time - start_time, 2), " s")
# 双线程并行
print("CPU num for current machine: ",multiprocessing.cpu_count())
thread_array = {}
start_time = time.time()
for tid in range(2):
t = threading.Thread(target=my_counter, args=(count_num,))
t.start()
thread_array[tid] = t
for tid in thread_array.keys():
thread_array[tid].join()
end_time = time.time()
print("Total time for multi-threads: ", round(end_time - start_time, 2), " s")
if __name__ == '__main__':
main()
运行的结果如下:
Total time for two sequential threads: 11.32 s
CPU num for current machine: 4
Total time for multi-threads: 11.69 s
可以看到,在四核的电脑上,两个线程用多线程并行和单线程串行,速度并没有任何提高。
因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。
解决方法
- 用多进程替代多线程
multiprocess库的出现很大程度上是为了弥补thread库因为GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方便迁移。唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的GIL,因此也不会出现进程之间的GIL争抢。
当然multiprocess也不是万能良药。它的引入会增加程序实现时线程间数据通讯和同步的困难。就拿计数器来举例子,如果我们要多个线程累加同一个变量,对于thread来说,申明一个global变量,用thread.Lock的context包裹住三行就搞定了。而multiprocess由于进程之间无法看到对方的数据,只能通过在主线程申明一个Queue,put再get或者用share memory的方法。这个额外的实现成本使得本来就非常痛苦的多线程程序编码,变得更加痛苦了。
在将上面用threading实现的多任务改为multiprocessing:
import threading, time, multiprocessing
def my_counter(num):
i = 0
for _ in range(num):
i += 1
return True
def main():
# 单进程顺序执行
process_array = {}
start_time = time.time()
count_num = int(1e8)
for pid in range(2):
p = multiprocessing.Process(target=my_counter, args=(count_num,))
p.start()
p.join()
end_time = time.time()
print("Total time for two sequential processes: ", round(end_time - start_time, 2), " s")
# 多进程并行
print("CPU num for current machine: ", multiprocessing.cpu_count())
process_array = {}
start_time = time.time()
for tid in range(2):
t = multiprocessing.Process(target=my_counter, args=(count_num,))
t.start()
process_array[tid] = t
for tid in process_array.keys():
process_array[tid].join()
end_time = time.time()
print("Total time for multi-processings: ", round(end_time - start_time, 2), " s")
if __name__ == '__main__':
main()
结果如下,可以看到多进程相比单进程,速度有了明显提升:
Total time for two sequential processes: 11.3 s
CPU num for current machine: 4
Total time for multi-processings: 7.91 s
- 用其他解析器
之前也提到了既然GIL只是CPython的产物,那么其他解析器是不是更好呢?没错,像JPython和IronPython这样的解析器由于实现语言的特性,他们不需要GIL的帮助。然而由于用了Java/C#用于解析器实现,他们也失去了利用社区众多C语言模块有用特性的机会。所以这些解析器也因此一直都比较小众。毕竟功能和性能大家在初期都会选择前者,Done is better than perfect。
网友评论