python threading模块
import threading
import time
def thread_demo():
print("===线程,来自方法==={}".format(threading.current_thread()))
sum = 0
for i in range(1000):
time.sleep(0.01) # 测试子线程
sum += i
print(sum)
print("===线程,创建子线程前==={}".format(threading.current_thread()))
# 创建线程
t1 = threading.Thread(target=thread_demo) # 参数target 指定要执行的任务;参数args 指定方法传递的参数
t2 = threading.Thread(target=thread_demo)
# 启动创建的线程,让线程去执行任务
t1.start()
t2.start()
print("====线程数===={}".format(threading.enumerate()))
print("====线程数===={}".format(threading.active_count()))
# 阻塞当前线程,默认等待指定的线程执行结束
t1.join() # 等t1回来再继续往下执行
t2.join() # 等t2回来再继续往下执行
print("===线程==={}".format(threading.current_thread()))
# 主线程就是代码从上往下运行
print("主线程结束")
重写threading.Thread来实现:
import threading
import time
def thread_demo():
print("===线程,来自方法==={}".format(threading.current_thread()))
sum = 0
for i in range(1000):
time.sleep(0.01) # 测试子线程
sum += i
print(sum)
# 重写threading,继承threading.Thread
class MyThread(threading.Thread):
# 重写run方法,待执行的任务
def run(self):
thread_demo() #
print("===线程,创建子线程前==={}".format(threading.current_thread()))
# 创建线程
m1=MyThread()
m2=MyThread()
# 运行线程
m1.start()
m2.start()
print("====线程数===={}".format(threading.enumerate()))
print("====线程数===={}".format(threading.active_count()))
# 阻塞当前线程,默认等待指定的线程执行结束
m1.join()
m2.join()
print("主线程结束")
守护线程:
# 守护线程
def thread_demo():
print("===线程,来自方法==={}".format(threading.current_thread()))
sum = 0
for i in range(1000):
time.sleep(0.01) # 测试子线程
sum += i
print(sum)
t1=threading.Thread(target=thread_demo)
t1.daemon=True # 表示当前线程就是t1的守护线程 ,通过注释更好理解,主线程结束,守护线程也会结束
t1.start()
time.sleep(2)
print("主线程结束")
线程的生命周期:
- 实例化threading.Thread时,创建了线程 --->新建
- start() 准备干活-->就绪 ,等待CPU的调度
- cpu的切换分为2种:
- 就绪 <---> 运行 反复切换 (高频率计算的)
- 就绪<--->运行--->阻塞--->就绪--->运行。对于一些需要等待结果的,比如等待数据库返回、比如time.sleep(),cpu就会把状态由运行改为阻塞,不会再去调度你。
- 直到你等待的结果回来,比如数据库返回了、sleep()结束。你的状态由阻塞变为就绪,等待cpu重新调度。
- 运行-->死亡 计算结束、报异常等,线程死亡
同步锁和互斥锁
a = 0
def add_one():
global a
for i in range(10000000):
b = a + 1
a = b
# 创建线程
thread_list = [threading.Thread(target=add_one) for i in range(5)]
for i in thread_list:
i.start()
for i in thread_list:
i.join()
print(a) # 远小于500000000,
为什么a远小于500000000
- e.g. 线程1 b=1000 线程2 b=2000 线程3 b=3000 线程4 b=4000 线程5 b=5000 ,此时cpu再开始到线程1 a=b=1000,那么相当于后面线程的都是无用功
- 怎么办呢?加锁
互斥锁
a = 0
lock=threading.Lock() # 创建一把锁 互斥锁
def add_one():
# 加锁
lock.acquire() # 已锁定,其他线程就无法加锁成功
global a
for i in range(10000000):
b = a + 1
a = b
lock.release() # 结束锁定
# 创建线程
thread_list = [threading.Thread(target=add_one) for i in range(5)]
for i in thread_list:
i.start()
for i in thread_list:
i.join()
print(a) # 500000000,
死锁
a = 0
lock=threading.Lock() # 创建一把锁 互斥锁
def add_one():
# 加锁
lock.acquire() # 已锁定,其他线程就无法加锁成功
global a
for i in range(10000000):
b = a + 1
raise NameError("线程挂掉了") # 假设在加锁后、未释放锁之前挂掉了
a = b
lock.release() # 结束锁定
# 创建线程
thread_list = [threading.Thread(target=add_one) for i in range(5)]
for i in thread_list:
i.start()
for i in thread_list:
i.join()
print(a) # 500000000,
加了代码raise NameError("线程挂掉了")
模拟线程在加锁后、未释放锁之前挂掉了,那么程序会一直卡在那里,该线程无法继续,其他线程要等待加锁的线程释放,导致也无法运行。这种情况就是死锁。
通过try... finally... ,无论结果如何,都要释放锁,这样程序不会卡在那里
lock=threading.Lock() # 创建一把锁 互斥锁
def add_one():
# 加锁
lock.acquire() # 已锁定,其他线程就无法加锁成功
try:
global a
for i in range(10000000):
b = a + 1
raise NameError("线程挂掉了")
a = b
finally:
lock.release() # 结束锁定
逻辑死锁
a = 0
lock_1=threading.Lock() # 互斥锁
lock_2=threading.Lock() # 互斥锁
def add_one():
print("线程{}进入方法add_one".format(threading.current_thread()))
lock_1.acquire() # 线程2 lock_1 加锁
lock_2.acquire() #
print("线程{}开始计算".format(threading.current_thread()))
lock_1.release() #
time.sleep(0.1) # 线程1 阻塞 lock_2锁定,休眠时间过了之后,lock_1加锁,但lock_1已被占用,
lock_1.acquire()
global a
for i in range(10000000):
b = a + 1
raise NameError("线程挂掉了")
a = b
lock_2.acquire()
lock_1.release()
print("线程{}退出方法add_one".format(threading.current_thread()))
# 创建线程
thread_list = [threading.Thread(target=add_one) for i in range(5)]
for i in thread_list:
i.start()
for i in thread_list:
i.join()
print(a)
锁的滥用容易出现逻辑死锁,故应避免使用多把锁。
线程间的通信
线程间的通信:
Condition线程通信
消费者-生产者设计模式
con=threading.Condition() # Condition默认自带互斥锁
ball=[]
def set_data():
for i in range(10):
con.acquire()
for j in range(20):
ball.append("丸子")
print("第{}个丸子放进去了,吃货们快来吃".format(len(ball)))
con.notify_all() # 激活其他所有线程进入就绪状态
con.wait() # wait()释放当前锁,并进入阻塞状态
def get_data():
while True:
con.acquire() #加锁
time.sleep(0.5)
if(len(ball)==0):
print("锅里没丸子了")
con.notify_all()
con.wait()
else:
ball.pop()
print("吃货{}夹走了一个丸子,锅里还剩下{}个丸子".format(threading.current_thread(),len(ball)))
con.release()
product=threading.Thread(target=set_data)
customers=[threading.Thread(target=get_data) for i in range(3)]
product.start()
for i in customers:
i.start()
product.join()
Queue队列实现线程通信
con=threading.Condition() # Condition默认自带互斥锁
# 用队列储存丸子
q=queue.Queue(maxsize=10) # 创建队列
def set_data():
for i in range(10):
con.acquire()
for j in range(20):
if q.full(): # 判断队列是否已满
print("当前有{}个丸子,吃货们来吃吧".format(q.qsize())) #q.qsize()返回队列中数据的个数
con.notify_all() # 激活其他所有线程进入就绪状态
con.wait() # wait()释放当前锁,并进入阻塞状态
else:
q.put("丸子") # 往队列里放东西
def get_data():
while True:
con.acquire() #加锁
time.sleep(0.5)
if q.empty(): #判断队列是否为空
print("锅里没丸子了")
con.notify_all()
con.wait()
else:
q.get() # 获取队列中的数据
print("吃货{}夹走了一个丸子,锅里还剩下{}个丸子".format(threading.current_thread(),q.qsize()))
con.release()
product=threading.Thread(target=set_data)
customers=[threading.Thread(target=get_data) for i in range(3)]
product.start()
for i in customers:
i.start()
product.join()
通过Event实现通信 Event可以代替Condition
product_env=threading.Event() # 生产者和消费者不能共用一个evn
customer_env=threading.Event()
# 用队列储存丸子
q=queue.Queue(maxsize=10) # 创建队列
def set_data():
for i in range(10):
for j in range(20):
if q.full(): # 判断队列是否已满
print("当前有{}个丸子,吃货们来吃吧".format(q.qsize())) #q.qsize()返回队列中数据的个数
customer_env.set() # 通知消费者
product_env.clear() # 把生产者线程里的flag设置为 False
product_env.wait() # 阻塞当前线程
else:
q.put("丸子") # 往队列里放东西
def get_data():
while True:
time.sleep(0.5)
if q.empty(): #判断队列是否为空
print("锅里没丸子了")
product_env.set()
customer_env.clear()
customer_env.wait()
else:
q.get() # 获取队列中的数据
print("吃货{}夹走了一个丸子,锅里还剩下{}个丸子".format(threading.current_thread(),q.qsize()))
product=threading.Thread(target=set_data)
customers=[threading.Thread(target=get_data) for i in range(3)]
product.start()
for i in customers:
i.start()
product.join()
线程池
线程池
product_env=threading.Event() # 生产者和消费者不能共用一个evn
customer_env=threading.Event()
q=queue.Queue(maxsize=10) # 创建队列
def set_data():
for i in range(10):
for j in range(20):
if q.full(): # 判断队列是否已满
print("当前有{}个丸子,吃货们来吃吧".format(q.qsize())) #q.qsize()返回队列中数据的个数
customer_env.set() # 通知消费者
product_env.clear() # 把生产者线程里的flag设置为 False
product_env.wait() # 阻塞当前线程
else:
q.put("丸子") # 往队列里放东西
return "没丸子,别吃了"
def get_data():
while True:
time.sleep(0.5)
if q.empty(): #判断队列是否为空
print("锅里没丸子了")
product_env.set()
customer_env.clear()
customer_env.wait()
else:
q.get() # 获取队列中的数据
print("吃货{}夹走了一个丸子,锅里还剩下{}个丸子".format(threading.current_thread(),q.qsize()))
def get_result(feature):
print(feature.result())
pool=ThreadPoolExecutor(max_workers=4) # 创建线程池,max_workers 指定最大线程数
product_future=pool.submit(set_data)# 往线程池中添加任务,并激活线程中的线程,即改为就绪状态
product_future.add_done_callback() # 等同于result()
customer_future1=pool.submit(get_data) #
customer_future2=pool.submit(get_data)
customer_future3=pool.submit(get_data)
# res=product_future.result() #result方法,获取程序的返回值
# print(res)
pool.shutdown() # 关闭线程池
map方法:
# 使用map方法,map使用于有参的方法
product_env=threading.Event() # 生产者和消费者不能共用一个evn
customer_env=threading.Event()
q=queue.Queue(maxsize=10) # 创建队列
def set_data(d=None):
for i in range(10):
for j in range(20):
if q.full(): # 判断队列是否已满
print("当前有{}个丸子,吃货们来吃吧".format(q.qsize())) #q.qsize()返回队列中数据的个数
customer_env.set() # 通知消费者
product_env.clear() # 把生产者线程里的flag设置为 False
product_env.wait() # 阻塞当前线程
else:
q.put("丸子") # 往队列里放东西
return "没丸子,别吃了"
def get_data(d=None):
while True:
time.sleep(0.5)
if q.empty(): #判断队列是否为空
print("锅里没丸子了")
product_env.set()
customer_env.clear()
customer_env.wait()
else:
q.get() # 获取队列中的数据
print("吃货{}夹走了一个丸子,锅里还剩下{}个丸子".format(threading.current_thread(),q.qsize()))
def get_result(feature):
print(feature.result())
pool=ThreadPoolExecutor(max_workers=4) # 创建线程池,max_workers 指定最大线程数
product=pool.map(set_data,(None,))
customer=pool.map(get_data,(None,None,None)) #启动三次
# 获取返回值
print(next(product))
pool.shutdown() # 关闭线程池
网友评论