目录:
一、线程的创建
二、多线程互斥锁
三、线程间通信
四、线程池
Python并发之多线程
一、线程的创建
单线程示例:
import time
def run(n):
print("task ", n)
time.sleep(2)
t0 = time.time()
run("t1")
run("t2")
ts = time.time()
print(ts - t0)
运行结果
1.启动多个线程(函数方式)
在Python3中,Python提供了一个内置模块 threading.Thread,可以很方便地让我们创建多线程。
threading.Thread() 一般接收两个参数:
- 线程函数名:要放置线程让其后台执行的函数,由我们自已定义,注意不要加()。
- 线程函数参数:线程函数名所需的参数,以元组的形式传入。若不需要参数,可以不指定。
示例:
import threading
import time
def run(n):
print("task", n)
time.sleep(2)
print("{} finished!".format(n))
ts = time.time()
print("线程{}耗时:{}".format(n, ts-t0))
t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",))
t0 = time.time()
t1.start()
t2.start()
print("main finished!")
运行结果
t1与t2几乎同时完成。
2.启动多个线程(类方式)
相比较函数而言,使用类创建线程,会比较麻烦一点。 首先,我们要自定义一个类,对于这个类有两点要求:
- 必须继承 threading.Thread 这个父类;
- 必须覆写 run 方法。
这里的 run 方法,和我们上面线程函数的性质是一样的,可以写我们的业务逻辑程序。在 start() 后将会被自动调用。
import threading
import time
class MyThread(threading.Thread):
def __init__(self, n, sleep_time):
super(MyThread, self).__init__()
self.n = n
self.sleep_time = sleep_time
def run(self):
print("running task {}".format(self.n))
time.sleep(self.sleep_time)
print("task {} done!".format(self.n))
ts = time.time()
print("线程{}耗时:{}".format(self.n, ts - t0))
t1 = MyThread("t1", 2)
t2 = MyThread("t2", 4)
t0 = time.time()
t1.start()
t2.start()
ts = time.time()
print("main finished!")
运行结果
- join()方法
.join():程序会等待该线程结束后,再执行后面的语句。
import threading
import time
class MyThread(threading.Thread):
def __init__(self, n, sleep_time):
super(MyThread, self).__init__()
self.n = n
self.sleep_time = sleep_time
def run(self):
print("running task {}".format(self.n))
time.sleep(self.sleep_time)
print("task {} done!".format(self.n))
ts = time.time()
print("线程{}耗时:{}".format(self.n, ts - t0))
t1 = MyThread("t1", 2)
t2 = MyThread("t2", 4)
t0 = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
ts = time.time()
print("main finished!")
运行结果
二、多线程互斥锁
1.定义和使用锁
import threading
# 生成锁对象(全局唯一)
lock = threading.Lock()
# 获取锁(未获取到会阻塞程序,直到获取到锁才会往下执行)
lock.acquire()
# 释放锁
lock.release()
注意:lock.acquire() 和 lock.release()必须成对出现。否则就有可能造成死锁。
可以使用使用上下文管理器来加锁:
import threading
lock = threading.Lock()
with lock:
操作代码
- 不加锁示例:
import threading
import time
g_num = 0
def work1(num):
global g_num
for i in range(num):
g_num += 1
print("----in work1, g_num is %d----" % g_num)
def work2(num):
global g_num
for i in range(num):
g_num += 1
print("----in work2, g_num is %d----" % g_num)
print("---线程创建之前g_num is %d---" % g_num)
t1 = threading.Thread(target=work1, args=(1000000,))
t1.start()
t2 = threading.Thread(target=work2, args=(1000000,))
t2.start()
# 等待线程执行完毕
while len(threading.enumerate()) != 1:
time.sleep(1)
print("2个线程对同一个全局变量操作之后的最终结果是:%s" % g_num)
运行结果
如果多个线程同时对同一个全局变量操作,会出现资源竞争问题,从而数据结果会不正确。
- 加锁示例:
import threading
import time
g_num = 0
lock = threading.Lock()
def work1(num):
global g_num
with lock:
for i in range(num):
g_num += 1
print("----in work1, g_num is %d----" % g_num)
def work2(num):
global g_num
with lock:
for i in range(num):
g_num += 1
print("----in work2, g_num is %d----" % g_num)
print("---线程创建之前g_num is %d---" % g_num)
t1 = threading.Thread(target=work1, args=(1000000,))
t1.start()
t2 = threading.Thread(target=work2, args=(1000000,))
t2.start()
# 等待线程执行完毕
while len(threading.enumerate()) != 1:
time.sleep(1)
print("2个线程对同一个全局变量操作之后的最终结果是:%s" % g_num)
运行结果
2.死锁
示例:
import threading
import time
class MyThread1(threading.Thread):
def run(self):
# 对mutexA上锁
mutexA.acquire()
# mutexA上锁后,延时1秒,等待另外那个线程 把mutexB上锁
print(self.name+'----do1---up----')
time.sleep(1)
# 此时会堵塞,因为这个mutexB已经被另外的线程抢先上锁了
mutexB.acquire()
print(self.name+'----do1---down----')
mutexB.release()
# 对mutexA解锁
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
# 对mutexB上锁
mutexB.acquire()
# mutexB上锁后,延时1秒,等待另外那个线程 把mutexA上锁
print(self.name+'----do2---up----')
time.sleep(1)
# 此时会堵塞,因为这个mutexA已经被另外的线程抢先上锁了
mutexA.acquire()
print(self.name+'----do2---down----')
mutexA.release()
# 对mutexB解锁
mutexB.release()
mutexA = threading.Lock()
mutexB = threading.Lock()
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
运行结果
3.全局锁(GIL)
多线程和多进程是不一样的:
多进程是真正的并行,而多线程是伪并行,实际上只是线程交替执行。
GIL(Global Interpreter Lock,全局解释器锁)
任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。而Python解释器,并不是只有CPython,除它之外,还有PyPy,Psyco,JPython,IronPython等。在绝大多数情况下,我们通常都认为 Python == CPython,所以也就默许了Python具有GIL锁这个事。
如何避免性能受到GIL的影响:
- 使用多进程代替多线程。
- 更换Python解释器,不使用CPython
通常情况下:
- I/O密集型:适用多线程
- CPU密集型:适用多进程
三、线程间通信
1.Queue队列
import queue
# maxsize默认为0,不受限
# 一旦>0,而消息数又达到限制,q.put()也将阻塞
q = queue.Queue(maxsize=0)
# 阻塞程序,等待获取队列消息
q.get()
# 获取消息,设置超时时间
q.get(timeout=5.0)
# 发送消息
q.put()
# 等待所有的消息都被消费完
q.join()
# 以下三个方法代码中不要使用(由于具有瞬时性,所以没有参考价值):
# 查询当前队列的消息个数
q.qsize()
# 队列消息是否都被消费完,True/False
q.empty()
# 检测队列里消息是否已满
q.full()
-
生产者-消费者模式
示例:
import queue
q = queue.Queue(maxsize=0)
def producer(): # 生产者
for i in range(1000):
q.put(i)
def consumer(): # 消费者
for i in range(1000):
data = q.get()
print(data)
t1 = threading.Thread(target=producer,)
t2 = threading.Thread(target=consumer,)
t1.start()
t2.start()
四、线程池
在使用多线程处理任务时也不是线程越多越好,由于在切换线程的时候,需要切换上下文环境,依然会造成cpu的大量开销。为解决这个问题,线程池的概念被提出来了。预先创建好一个较为优化的数量的线程,放到队列中,让过来的任务立刻能够使用,就形成了线程池。
在Python3中,创建线程池是通过concurrent.futures函数库中的ThreadPoolExecutor类来实现的。
future对象:在未来的某一时刻完成操作的对象。submit方法可以返回一个future对象。
示例:
import concurrent.futures as futures
# 线程执行的函数
def add(n1, n2):
v = n1 + n2
print('add:', v, ', tid:', threading.currentThread().ident)
time.sleep(n1)
return v
# 创建一个线程池
ex = futures.ThreadPoolExecutor(max_workers=3) # 指定最多运行3个线程
# 通过submit把需要执行的函数扔进线程池中
f1 = ex.submit(add, 2, 3) # submit返回一个future对象
f2 = ex.submit(add, 2, 2) # submit返回一个future对象
print('main thread running!')
print(f1.done()) # .done():看看任务结束了没
print(f1.result()) # 获取结果(阻塞方法)
print(f1.done()) # .done():看看任务结束了没
print(f2.done()) # .done():看看任务结束了没
print(f2.result()) # 获取结果(阻塞方法)
print(f2.done()) # .done():看看任务结束了没
运行结果
获取线程执行结果方法:
- 1.f.result()
- 2.map()
- 3.as_completed
- 4.wait
- 5.add_done_callback
网友评论