一、GIL锁相关
验证GIL锁的存在方式
from threading import Thread
from multiprocessing import Process
def task():
while True:
pass
if __name__ == '__main__':
for i in range(6): # 这里测试的是6核的cpu
# t = Thread(target=task) # 由于有GIL锁存在,同一时刻只有一条线程在运行,所有cpu不会满
t = Process(target=task) # 由于是多进程,进程中的线程会被cpu调度执行,6个cpu全在工作,就会跑满
t.start()
对于cpython解释器
- 在单核情况下:
- 开多线程还是开多进程?不管干什么都是开线程
- 在多核情况下:
- 如果是计算密集型,需要开进程,能被多个cpu调度执行
- 如果是io密集型,需要开线程,cpu遇到io会切换到其他线程执行
GIL锁和普通互斥锁的区别是:GIL锁不能保证数据的安全,而普通互斥锁可以保证数据的安全。
计算密集型:
from threading import Thread
from multiprocessing import Process
import time
# 计算密集型
def task():
count = 0
for i in range(100000000):
count += i
if __name__ == '__main__':
ctime = time.time()
ll = []
for i in range(10):
# t = Thread(target=task) # 开线程:125.71705794334412
t = Process(target=task) # 开进程:65.63824129104614
t.start()
ll.append(t)
for t in ll:
t.join()
print(time.time()-ctime)
io密集型:
from threading import Thread
from multiprocessing import Process
import time
def task():
time.sleep(2)
if __name__ == '__main__':
ctime = time.time()
ll = []
for i in range(400):
t = Thread(target=task) # 开线程:2.0559656620025635
# t = Process(target=task) # 开进程:9.506720781326294
t.start()
ll.append(t)
for t in ll:
t.join()
print(time.time()-ctime)
二、死锁(哲学家就餐问题)
死锁是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
from threading import Thread, Lock
import time
lockA = Lock()
lockB = Lock()
# 吃苹果需要先获取A锁再获取B锁
def eat_apple(name):
lockA.acquire()
print('%s 获取到了a锁' % name)
lockB.acquire()
print('%s 获取到了b锁' % name)
print('开始吃苹果,并且吃完了')
lockB.release()
print('%s 释放了b锁' % name)
lockA.release()
print('%s 释放了a锁' % name)
# 吃鸡蛋需要先获取B锁再获取A锁
def eat_egg(name):
lockB.acquire()
print('%s 获取到了b锁' % name)
time.sleep(2)
lockA.acquire()
print('%s 获取到了a锁' % name)
print('开始吃鸡蛋,并且吃完了')
lockA.release()
print('%s 释放了a锁' % name)
lockB.release()
print('%s 释放了b锁' % name)
if __name__ == '__main__':
l1 = ["小明", "小红", "小华"]
for name in l1:
t = Thread(target=eat_apple, args=(name,))
t1 = Thread(target=eat_egg, args=(name,))
t.start()
t1.start()
结果就是一个人获取到了A锁,在他获取A锁的延迟时间中另一个获取到了B锁,得到A锁的人需要等B锁释放,得到B锁的人需要等A锁释放,就造成了死锁现象。
死锁现象可以使用递归锁(RLock)解决
使用递归锁时,同一个人可以多次acquire,每acquire一次,内部计数器加一,每release一次,内部计数器减一。计数器只要不为0,其他人都无法获取这把锁。
三、Semaphore信号量
Semaphore:信号量可以理解为多把锁,同时允许多个线程来更改数据
from threading import Thread,Semaphore
import time
import random
sm=Semaphore(3) # 数字表示可以同时有多少个线程操作
def task(name):
sm.acquire()
print('%s 正在蹲坑'%name)
time.sleep(random.randint(1,5))
sm.release()
四、Event事件
一些线程需要等到其他线程执行完成之后才能执行,类似于发射信号
比如一个线程等待另一个线程执行结束再继续执行。
from threading import Thread, Event
import time
event = Event()
def girl(name):
print('%s 现在不单身,正在谈恋爱'%name)
time.sleep(10)
print('%s 分手了,给屌丝男发了信号'%name)
event.set()
def boy(name):
print('%s 在等着女孩分手'%name)
event.wait() # 只要没来信号,就卡在者
print('女孩分手了,机会来了,冲啊')
if __name__ == '__main__':
lyf = Thread(target=girl, args=('小红',))
lyf.start()
for i in range(10):
b = Thread(target=boy, args=('屌丝男%s号' % i,))
b.start()
起两个线程,第一个线程读文件的前半部分,读完发一个信号,另一个进程读后半部分,并打印:
from threading import Event, Thread
import time
import os
event = Event()
size = os.path.getsize("a.txt")
# 获取前面的一半内容
def head():
with open("a.txt", "r", encoding="utf-8") as f:
n = size // 2
data = f.read(n)
print(data)
print("前面一半已经读完")
event.set()
# 获取后面的一半内容
def behind():
event.wait()
with open("a.txt", "r", encoding="utf-8") as f:
n = size // 2
f.seek(n, 0)
data = f.read()
print(data)
if __name__ == '__main__':
t = Thread(target=head)
t1 = Thread(target=behind)
t.start()
t1.start()
五、线程的queue
进程的queue和线程的不是同一个。
线程的queue这样导入:
from queue import Queue,LifoQueue,PriorityQueue
三种线程Queue:
- Queue:队列,先进先出
- PriorityQueue:优先级队列,谁小谁先出
- LifoQueue:栈,后进先出
六、线程池和进程池
不论是开进程还是开线程,都不可能无限制的开,可以使用池来限制一次开启的线程或进程的数目。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import Thread
import time
import random
pool = ThreadPoolExecutor(3)
def task(name):
print(f"{name}任务开始")
time.sleep(random.randint(1, 3))
print("任务结束")
return f"{name}返回"
def call_back(f):
print(f.result())
if __name__ == '__main__':
for i in range(10):
pool.submit(task, "%s号" % i).add_done_callback(call_back)
pool.shutdown(wait=True) # 等待所有任务完成并且把池关闭
七、定时器
from threading import Timer
def task(name):
print(f"{name}真帅")
if __name__ == '__main__':
t = Timer(3, task, args=("yan",)) # 延迟3秒执行task函数,传参为"yan"
t.start()
网友评论