场景
在线程中,举例了多线程操作全局变量时是不安全的,因为底层的字节码并不能一次全部执行完成
import threading
import time
num = 0
def test():
global num
for i in range(100000):
num += 1
print('t1结束')
def test1():
global num
for i in range(100000):
num += 1
print('t2结束')
if __name__ == "__main__":
t1 = threading.Thread(target=test)
t2 = threading.Thread(target=test1)
t1.start()
t2.start()
time.sleep(1)
print(num)
>>>t1结束
>>>t2结束
>>>176111
线程锁的使用
import threading
import time
num = 0
lock = threading.Lock()
def test():
global num
for i in range(100000):
lock.acquire()
num += 1
lock.release()
print('t1结束')
def test1():
global num
for i in range(100000):
lock.acquire()
num += 1
lock.release()
print('t2结束')
if __name__ == "__main__":
t1 = threading.Thread(target=test)
t2 = threading.Thread(target=test1)
t1.start()
t2.start()
time.sleep(1)
print(num)
>>>t1结束
>>>t2结束
>>>200000
假设t1先获取到锁,t2执行到acquire()
时会处于阻塞状态
通过lock
可以确保num+1
完整的执行,即底层的字节码会全部执行完成才切换到其他线程
锁的使用虽然解决了多线程共享变量的问题,但也带来了其他问题:
1、获取锁和释放锁需要开销,导致性能下降
2、容易造成死锁
死锁的情况
一、单个线程内再次获取锁
def test():
global num
for i in range(100000):
lock.acquire()
lock.acquire() #多获取了一次
num += 1
lock.release()
print('t1结束')
一个线程中重复获取锁,第二次获取时会等待锁的释放,但是又没有释放,会导致代码阻塞
二、多个线程互相等待
现在假设有两把锁,让线程t1先获取到lock_a然后等待t2获取到lock_b,再获取对方已获取的锁,导致了死锁
import threading
import time
num = 0
lock_a = threading.Lock()
lock_b = threading.Lock()
def test():
global num
for i in range(100000):
lock_a.acquire()
time.sleep(1)
lock_b.acquire()
num += 1
lock_b.release()
lock_a.release()
print('t1结束')
def test1():
global num
for i in range(100000):
lock_b.acquire()
lock_a.acquire()
num += 1
lock_a.release()
lock_b.release()
print('t2结束')
if __name__ == "__main__":
t1 = threading.Thread(target=test)
t2 = threading.Thread(target=test1)
t1.start()
t2.start()
time.sleep(1)
print(num)
>>>0
>>>一直等待...
可重入锁-RLock
同一个线程中,可以多次调用acquire()
,但是acquire()
和release()
的次数要一样
import threading
import time
num = 0
lock = threading.RLock() #使用可重入锁
def test():
global num
for i in range(100000):
lock.acquire()
lock.acquire()#重复获取不会导致死锁
num += 1
lock.release()#释放
lock.release()
print('t1结束')
...
条件锁condition的使用
通过条件锁,可以实现多线程的协同工作,本例实现轮流执行的效果
import threading
class XiaoAi(threading.Thread):
def __init__(self,cond):
super().__init__()
self.name = '小爱'
self.cond = cond
def run(self):
with self.cond:#获取Condition初始化时的RLock锁,后面简称RLock锁
print(1)
self.cond.wait()#创建一个Lock锁放到双端队列中,此时代码阻塞并一直尝试获取该锁。然后释放RLock锁
print('{} : 在'.format(self.name))
self.cond.notify()
self.cond.wait()
print('{} : 好啊'.format(self.name))
class TianMao(threading.Thread):
def __init__(self,cond):
super().__init__()
self.name='天猫精灵'
self.cond = cond
def run(self):
#with等价于self.cond.acquire() self.cond.release()
with self.cond:#tianmao线程等待获取RLock锁,等待xiaoai线程释放后获取到
print('{} : 小爱同学'.format(self.name))
self.cond.notify()#从双端队列取出Lock锁,并释放。由于xiaoai线程一直在wait(),这里一旦释放,xiaoai线程可以立刻获取到锁开始执行
self.cond.wait()
print('{} : 我们说说话吧'.format(self.name))
self.cond.notify()
if __name__ == "__main__":
cond = threading.Condition()
xiaoai = XiaoAi(cond)
tianmao = TianMao(cond)
xiaoai.start()#xiaoai线程先执行
tianmao.start()
>>>1
>>>天猫精灵 : 小爱同学
>>>小爱 : 在
>>>天猫精灵 : 我们说说话吧
>>>小爱 : 好啊
1、xiaoai.start()
要放在tianmao.start()
前执行,保证xiaoai线程先进入等待状态,tianmao线程调用notify()方法xiaoai才能继续执行,如果tianmao线程先执行发送了notify()后,xiaoai线程还没有调用wait(),代码的结果就只打印了天猫精灵 : 小爱同学
问题:经过多次测试,调换顺序后结果确实是只打印了天猫精灵 : 小爱同学
。即使先让tianmao线程start,我觉得xiaoai线程也是有可能先执行到的,因为当tianmao线程启动后,恰好cpu分配给了主线程让xiaoai线程也启动了起来,这时有三个线程,然后恰好cpu又分配给了xiaoai线程先执行了一会,不就出现了正确的结果了吗?
2、一开始是xiaoai先获取RLock,然后调用wait(),wait()方法里又创建了一把Lock,wait()源码如下,调用acquire()尝试获取Lock,然后释放了RLock,但是如果一开始就获取到了Lock xiaoai线程不就可以继续执行了吗,理论上应该是阻塞了,等待tianmao调用notify()去释放Lock,如果xiaoai线程阻塞的话,RLock又是怎么释放的?tianmao如何拿到RLock呢
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
控制线程并发执行数量
threading.Semaphore
类内部使用了threading.Condition
,可以控制每次并发的线程数量
不控制并发数量的情况,下面的代码产生20并发大概2秒左右全部完成
import threading
import time
class DealData(threading.Thread):
def run(self):
time.sleep(2)
print("get data ok")
if __name__ == "__main__":
for i in range(20):
t = DealData()
t.start()
使用Semaphore
控制并发数量,可以避免过多的并发数量,比如爬虫场景
import threading
import time
class DealData(threading.Thread):
def __init__(self,sem):
super().__init__()
self.sem = sem
def run(self):
time.sleep(2)
print("get data ok")
self.sem.release()
if __name__ == "__main__":
sem = threading.Semaphore(3)
for i in range(20):
sem.acquire()
t = DealData(sem)
t.start()
线程池概念
创建固定数量的线程(有任务才创建,会有个最大值),线程池循环利用里面的线程去执行多任务,避免频繁创建销毁的开销
上面的threading.Semaphore
有点线程池的意思,但只是做了并发数量的限制。
通过from concurrent.futures import ThreadPoolExecutor
除了可以设置并发数量,还能在主线程中获取子线程的状态和返回值。
from concurrent.futures import ThreadPoolExecutor
import time
def get_html(sleep_time):
time.sleep(sleep_time)
print('get {}'.format(sleep_time))
return sleep_time
#创建线程池,设置最多2个并发
executor = ThreadPoolExecutor(max_workers=2)
#通过submit()函数提交任务到线程池,submit()是非阻塞,立刻返回。返回的是from concurrent.futures import Future,Future类对象
task1 = executor.submit(get_html,3)
task2 = executor.submit(get_html,2)
task3 = executor.submit(get_html,4)
print(task3.cancel())#True,最大并发是2,没有开始的任务可以取消
print(task1.cancel())#False,执行中或执行完成的线程无法取消
print(task1.done())#False
print(task1.result())#result()使主线程阻塞,等待子线程执行完
print(task1.done())#True
获取已完成的任务
from concurrent.futures import as_completed
,as_completed()
是一个生成器,内部判断一旦有任务完成,yield返回该任务。
from concurrent.futures import ThreadPoolExecutor,as_completed
import time
def get_html(sleep_time):
time.sleep(sleep_time)
print('get {}'.format(sleep_time))
return sleep_time
#创建线程池,设置最多2个并发
executor = ThreadPoolExecutor(max_workers=2)
#通过submit()函数提交任务到线程池,submit()是非阻塞,立刻返回
task1 = executor.submit(get_html,3)
task2 = executor.submit(get_html,2)
task3 = executor.submit(get_html,4)
all_task = [task1,task2,task3]
for future in as_completed(all_task):
data = future.result()
print('get {} success'.format(data))
print('done')
>>>get 2
>>>get 2 success
>>>get 3
>>>get 3 success
>>>get 4
>>>get 4 success
>>>done
as_completed()
会阻塞,一直到获取到所有成功的任务,最后打印了done
主线程等待某些线程完成
使用wait()
函数可以指定主线程等待哪些线程完成后可以继续
from concurrent.futures import ThreadPoolExecutor,wait,ALL_COMPLETED,FIRST_COMPLETED
import time
def get_html(sleep_time):
time.sleep(sleep_time)
print('get {}'.format(sleep_time))
return sleep_time
#创建线程池,设置最多2个并发
executor = ThreadPoolExecutor(max_workers=2)
#通过submit()函数提交任务到线程池,submit()是非阻塞,立刻返回
task1 = executor.submit(get_html,3)
task2 = executor.submit(get_html,2)
task3 = executor.submit(get_html,4)
all_task = [task1,task2,task3]
wait(all_task,return_when=ALL_COMPLETED) #如果换成FIRST_COMPLETED,输出:get 2、done 、get 3、get 4
print('done')
>>>get 2
>>>get 3
>>>get 4
>>>done
网友评论