保持多线程执行顺序的可行方案有以下几种:
锁,Condition,信号量,Event,Barrier,队列
示例代码
- 目标:按顺序打印BAC
import threading
import asyncio
import queue
# 需要重写来实现多线程调用
class Foo:
def __init__(self):
self.l = threading.Lock()
self.l1 = threading.Lock()
self.l.acquire()
self.l1.acquire()
pass
def printA(self):
self.l.acquire()
print("A", threading.currentThread().getName())
self.l1.release()
def printB(self):
print("B",threading.currentThread().getName())
self.l.release()
def printC(self):
self.l1.acquire()
print("C", threading.currentThread().getName())
if __name__ == '__main__':
foo = Foo()
async def test1():
threading.Thread(target=foo.printA).start()
async def test2():
threading.Thread(target=foo.printB).start()
async def test3():
threading.Thread(target=foo.printC).start()
# 异步测试打印
asyncio.run(test1())
asyncio.run(test2())
asyncio.run(test3())
锁
- 初始化一个锁
self.l = threading.Lock()
- 获取一个锁,获取后除非释放,否则代码会在下次获取之前阻塞
self.l.acquire() # 如果不release,如果再次调用acquire就会被阻塞,直至调用release
exec...
- 执行以下代码后锁可以被其他线程获取
self.l.release()
注意:
一个锁,只能保证一组数据按顺序,如果有3个资源需要保证顺序就需要2把锁,4个就需要3把锁
Condition
- 初始化:
self.c = threading.Condition()
- 使用with语法,使用这个条件
with self.c:
- 设置等待条件
# 一个bool表达式,如果条件达到会继续执行,否则
self.c.wait_for(lambda: a == 10)
exec... # 执行被阻塞的代码
- 通知其他线程,我执行完了,谁需要执行就执行
self.c.notify_all() # 通知所有线程开始抢占资源
# 或
self.c.notify(n) # 激活几个线程
注意:
一个条件可以多处使用,用来保证顺序可以一个条件做到
示例
class Foo:
def __init__(self):
self.c = threading.Condition()
self.t = 0;
pass
def printA(self):
with self.c:
self.c.wait_for(lambda : self.t == 1)
print("A", threading.currentThread().getName())
self.t += 1
self.c.notify_all()
def printB(self):
with self.c:
print("B",threading.currentThread().getName())
self.t += 1
self.c.notify_all()
def printC(self):
with self.c:
self.c.wait_for(lambda : self.t == 2)
print("C", threading.currentThread().getName())
信号量
- 初始化一个信号量
self.s1 = threading.Semaphore(0)
# 使用说明,如果信号量当前为0,当需要获取信号量时就会出现阻塞,
# 直到有其他线程释放信号量
- 获取信号量开始执行
self.ls.acquire()
exec... # 执行代码
- 释放信号量,执行上面被阻塞的代码
self.s1.release()
注意:使用0的信号量,效果和锁是一样的。
区别:
* 锁先调用了一次acquire,让下一次调用acquire处于阻塞状态。
* 信号量为0表示当前需要调用acquire的线程已经会处于阻塞状态。
示例
class Foo:
def __init__(self):
self.s = threading.Semaphore(0)
self.s1 = threading.Semaphore(0)
pass
def printA(self):
self.s.acquire()
print("A", threading.currentThread().getName())
self.s1.release()
def printB(self):
print("B",threading.currentThread().getName())
self.s.release()
def printC(self):
self.s1.acquire()
print("C", threading.currentThread().getName())
Event对象
wait 方法作为阻塞;set方法来释放线程【默认类赋值就是阻塞】
- 创建一个Event对象
self.e = threading.Event()
- 在需要等待的时候调用
self.e.wait()
exec ... # 执行被阻塞的代码
- 执行完前期条件后释放阻塞
self.e.set()
示例
class Foo:
def __init__(self):
self.e = threading.Event()
self.e1 = threading.Event()
pass
def printA(self):
self.e.wait()
print("A", threading.currentThread().getName())
self.e1.set()
def printB(self):
print("B",threading.currentThread().getName())
self.e.set()
def printC(self):
self.e1.wait()
print("C", threading.currentThread().getName())
Barrier对象
- 创建一个barrier对象
self.b = threading.Barrier(parties)
# 如果parties为2 ,则说明,需要调用两次wait方法,
# 才可以让被阻塞的线程执行
- 让线程开始执行
self.b.wait() # 执行次数与parties保持一致
self.b.wait()
exec ...
示例
class Foo:
def __init__(self):
self.b = threading.Barrier(2)
self.b1 = threading.Barrier(2)
pass
def printA(self):
self.b.wait()
print("A", threading.currentThread().getName())
self.b1.wait()
def printB(self):
print("B",threading.currentThread().getName())
self.b.wait()
def printC(self):
self.b1.wait()
print("C", threading.currentThread().getName())
空Queue
阻塞队列使用;对于队列为空时,get方法就会自动阻塞,直到put使之非空才会释放进程
- 定义一个空队列
self.q = queue.Queue(0)
- 如果想阻塞列
self.q.get() # 直接调用会因为队列中没有数据而阻塞,需要等到向队列put后,才会让get
exec ...
- 解除阻塞队列
self.q.put(0)
示例
class Foo:
def __init__(self):
self.q = queue.Queue(0)
self.q1 = queue.Queue(0)
pass
def printA(self):
self.q.get()
print("A", threading.currentThread().getName())
self.q1.put(0)
def printB(self):
print("B",threading.currentThread().getName())
self.q.put(0)
def printC(self):
self.q1.get()
print("C", threading.currentThread().getName())
满Queue
定容队列方式;如果队列满了,put方法会被阻塞
- 创建一个队列,并将其填充满
self.q = queue.Queue(1) # 队列中最多只能放1个元素
self.q.put(0)
- 此时由于存储的队列已经满了,此时以下操作会被阻塞
self.q.put(0)
exec....
- 执行以下代码可以解除上面的阻塞
self.q.get()
示例代码
class Foo:
def __init__(self):
self.q = queue.Queue(1)
self.q1 = queue.Queue(1)
self.q.put(0)
self.q1.put(0)
pass
def printA(self):
self.q.put(0)
print("A", threading.currentThread().getName())
self.q1.get()
def printB(self):
print("B",threading.currentThread().getName())
self.q.get()
def printC(self):
self.q1.put(0)
print("C", threading.currentThread().getName())
网友评论