线程是一个程序级别的概念,可以暂时理解为一个程序中的某个函数,当我们同时执行很多个函数的时候就是多线程。比如说一个程序有需要从网上下载东西的函数,还有要从磁盘中加载数据的函数,这两个函数都是比较耗时的。如果是单进程的话需要一个一个函数运行,下载完数据然后才能从磁盘加载,这样程序就会很卡顿。如果使用多进程的话,我们可以在下载数据的同时加载数据,这样就快的多了。
一、启动多个线程
在说线程的创建之前,我们先来看一个单线程的例子,作为对比再来看之后的多线程,一般来说我们之前写的程序都是单线程,也就是逻辑流与指令流一致的程序。
import time
def run(n):
print("task", n)
time.sleep(2)
print('task done', n)
start_time = time.time()
run('t1')
run('t2')
print(time.time() - start_time)
task t1
task done t1
task t2
task done t2
4.001030206680298
从输出上就可以看出来两个函数调用是依次运行的,一共执行了 4 秒多一点。
1. 函数方式启动线程
我们先来看第一种启动线程的方式,使用 threading.Thread() 方法。这个方法需要传入两个参数,
- 线程函数名:要设置线程让其后台执行的函数,这个函数由我们自己定义注意不要加 ( )。
- 线程函数的参数:线程函数名函数所需要的参数,以元组的形式传入。如果不需要传参,可以不指定。
来看一个例子:
import threading
import time
def run(n):
print("task ",n )
time.sleep(2)
print('task done', n)
print(time.time() - start_time)
start_time = time.time()
t1 = threading.Thread(target=run,args=("t1",))
t2 = threading.Thread(target=run,args=("t2",))
t1.start()
t2.start()
task t1
task t2
task done t1
2.001432418823242
task done t2
2.002263307571411
从结果上来看确实两个函数是依次执行的,运行时间也确实从 4s 多缩短到了 2s 多。
2. 通过类方式启动线程
首先,我们需要自己定义一个类,对这个类有两点要求:
- 必须继承 threading.Thread 这个父类
- 必须重写 run 方法
上面的 run 方法,和我们上面的线程函数的性质是一样的,当我们运行 start() 方法的时候就会被自动调用。
import threading
import time
class MyThread(threading.Thread):
def __init__(self, n, sleep_time):
super(MyThread, self).__init__()
self.n = n
self.sleep_time = sleep_time
def run(self):
print("runnint task ", self.n)
time.sleep(self.sleep_time)
print("task done ", self.n)
print(time.time() - start_time)
start_time = time.time()
t1 = MyThread("t1", 2)
t2 = MyThread("t2", 2)
t1.start()
t2.start()
runnint task t1
runnint task t2
task done t1
2.0018975734710693
task done t2
2.0018975734710693
结果和函数的差不多。
三、join 方法
可能看了上面的几个例子你有一个疑问,就是在计算时间的时候为什么不将时间放到代码的最后呢,这样不是更加准确吗,我们先来看一个这样写的例子,我们可以夸张一些,一下创建 50 个线程。
import threading, time
def run(n):
print("task", n)
time.sleep(2)
print('task done ', n)
start_time = time.time()
for i in range(50):
t = threading.Thread(target=run, args=("t-{}".format(i + 1),))
t.start()
print("-"*50, 'all thread has finished', "-"*50)
print(time.time() - start_time)
因为运行结果很长,我只截取了部分,因为很多个函数同时向屏幕输出就会造成这种很乱的感觉。
task t-1
task t-2
task t-3
task t-4
task t-5
task t-6
...
task t-50--------------------------------------------------
all thread has finished --------------------------------------------------
0.012969255447387695
task done t-2
task done t-1
task done task done t-5t-3
...
t-49
task done t-50
task done t-44
首先主线程启动,然后依次启动 50 个线程,同时运行,因为主线程执行时间比较短,所以主线程在子线程结束前就结束了,所以计算的时间只是主线程的运行时间。
![](https://img.haomeiwen.com/i17584033/48eaf6f2442ab9ba.png)
我故意把主线程画的很短,有一种执行时间很短的感觉。从上图也可以看出来主线程的结束对其他的线程是没有影响的。
所以我们就需要一种方法,使得主线程可以在某处等待其他线程的执行,执行完后,说不定还会返回个结果,再继续执行主线程,比如下图这种感觉。
![](https://img.haomeiwen.com/i17584033/7a52687aed3cac42.png)
值得注意的是进程 1 到 进程 50 之间仍然是并列的关系。
下面给出一个实例:
import threading, time
def run(n):
print("task", n)
time.sleep(2)
print('task done ', n)
start_time = time.time()
t_objs = [] # 储存线程实例
for i in range(50):
t = threading.Thread(target=run, args=("t-{}".format(i + 1),))
t.start()
t_objs.append(t) # 防止阻塞后面线程的启动,不在这里 join
# 这段代码可以理解为子进程未结束,主进程也不能结束
for t in t_objs:
t.join()
print("-"*50, 'all thread has finished', "-"*50)
print(time.time() - start_time)
task t-1
task t-2
task t-3
task t-4
task t-5
...
t-46
task done t-50task done t-49task done
t-48
-------------------------------------------------- all thread has finished --------------------------------------------------
2.0129146575927734
这样就可以计算所有线程的运行时间了,可以看到只运行了 2s 多一点,如果要是单进程运行的话恐怕要超过 100s 呢。
在上面的代码中还有一个地方值得注意,就是在线程创建完后我们没有直接 join ,而是先存到一个列表中,等到全部线程创建完后再全部 join。如果我们在线程 1 创建完后直接 join ,那么主线程就会等待线程 1 运行完,然后再创建线程 2 ,这样实际上还是单线程。
根据当前线程的数量查看线程的生命周期
import threading, time
def sing():
for i in range(3):
print('正在唱歌... ', i+1)
time.sleep(1)
def dance():
for i in range(3):
print('正在跳舞... ', i+1)
time.sleep(1)
if __name__ == "__main__":
print('晚会开始:', time.ctime())
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
t1.start()
t2.start()
while True:
length = len(threading.enumerate()) # 枚举,返回一个列表
print('当前运行的线程数量为:', length)
time.sleep(0.7)
if length <= 1:
break
晚会开始: Tue Aug 20 17:28:32 2019
正在唱歌... 1
正在跳舞... 当前运行的线程数量为: 3
1
当前运行的线程数量为: 3
正在跳舞... 2
正在唱歌... 2
当前运行的线程数量为: 3
正在唱歌... 3
正在跳舞... 3
当前运行的线程数量为: 3
当前运行的线程数量为: 3
当前运行的线程数量为: 1
这个程序中没有使用 join 函数,因为这个程序的主线程运行时间很长。实际上在本文的前几个例子中,我们并没有使用 join 但是还是计算出了程序运行时间,就是因为我们通过分析代码判断出来哪个线程最后结束,将停止计时的代码放到这个线程最后就可以了。
四、多线程互斥锁
在不同的线程之间,变量是共享的,有时候多个线程同时维护一个变量,这样就会造成混乱,为了防止混乱,可以使用锁,也就是说当一个变量被访问的时候,其他线程就无法在访问。
1. 如何使用锁
import threading
# 生成锁对象,全局唯一
lock = threading.Lock()
# 获取锁,未获取到会阻塞程序,直到获取到了锁才会往下执行
lock.acquire()
# 释放锁,其他线程可以访问
lock.release()
值得注意的是,lock.acquire() 和 lock.release() 必须要成对出现,但是有时候还是会忘记,所以我们可以使用上下文管理器来实现。
import threading
lock = threading.Lock()
with lock:
# 这里写代码
pass
2. 尝试在程序中使用锁
我们先来看一个不使用锁的例子:
import threading, time
g_num = 0
def work1(num):
global g_num
for i in range(num):
g_num += 1
print('in work1, g_num is ', g_num)
time.sleep(1) # 代表执行其他操作
def work2(num):
global g_num
for i in range(num):
g_num += 1
print('in work2, g_num is ', g_num)
time.sleep(1) # 代表执行其他操作
print('线程创建之前:', g_num)
t1 = threading.Thread(target=work1, args=(10000000,))
t2 = threading.Thread(target=work2, args=(10000000,))
t1.start()
t2.start()
while len(threading.enumerate()) != 1:
time.sleep(1)
print('两个线程对同一个全局变量操作之后的最终结果是:', g_num)
线程创建之前: 0
in work2, g_num is 11500115
in work1, g_num is 11593232
两个线程对同一个全局变量操作之后的最终结果是: 11593232
当我们决定完成一个任务,通常情况下,在计算机中,看似很简单的任务也是有多个不同的步骤共同完成。该步骤是由 cpu 的 一些指令完成的。比如我们常见的 i ++ ;这是一个非原子性操作,因为它先从内存取出 i 的值,然后再增 1,最后再写入内存中,经过三个步骤完成,如果在中间一个步骤被其他线程影响了,那么就可能出现错误。
那么我们就将这些步骤加锁,作为一个原子步骤,比如我们将每次循环中的
import threading, time
g_num = 0
lock = threading.Lock()
def work1(num):
global g_num
for i in range(num):
with lock:
g_num += 1
print('in work1, g_num is ', g_num)
time.sleep(1) # 代表执行其他操作
def work2(num):
global g_num
for i in range(num):
with lock:
g_num += 1
print('in work2, g_num is ', g_num)
time.sleep(1) # 代表执行其他操作
print('线程创建之前:', g_num)
t1 = threading.Thread(target=work1, args=(100000,))
t2 = threading.Thread(target=work2, args=(100000,))
t1.start()
t2.start()
while len(threading.enumerate()) != 1:
time.sleep(1)
print('两个线程对同一个全局变量操作之后的最终结果是:', g_num)
线程创建之前: 0
in work1, g_num is 181967
in work2, g_num is 200000
两个线程对同一个全局变量操作之后的最终结果是: 200000
当然如果你想将整个循环作为一个原子性操作也是可以的,注意这两个地方的不同。这次我不使用上下文管理器来写:
import threading, time
g_num = 0
lock = threading.Lock()
def work1(num):
global g_num
lock.acquire()
for i in range(num):
g_num += 1
lock.release()
print('in work1, g_num is ', g_num)
time.sleep(1) # 代表执行其他操作
def work2(num):
global g_num
lock.acquire()
for i in range(num):
g_num += 1
lock.release()
print('in work2, g_num is ', g_num)
time.sleep(1) # 代表执行其他操作
print('线程创建之前:', g_num)
t1 = threading.Thread(target=work1, args=(100000,))
t2 = threading.Thread(target=work2, args=(100000,))
t1.start()
t2.start()
while len(threading.enumerate()) != 1:
time.sleep(1)
print('两个线程对同一个全局变量操作之后的最终结果是:', g_num)
线程创建之前: 0
in work1, g_num is 100000
in work2, g_num is 200000
两个线程对同一个全局变量操作之后的最终结果是: 200000
3.死锁
在线程中同共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源们就会造成死锁。
尽管死锁很少发生,但是一旦发生就会造成应用的停止响应,下面是一个死锁的例子:
import threading, time
mutexA = threading.Lock()
mutexB = threading.Lock()
class MyThread1(threading.Thread):
def run(self):
# 对 mutexA 上锁
mutexA.acquire()
# mutexA 上锁后,等待一秒,等待另外一个线程释放 mutexB
print(self.name + '---do1---up---')
time.sleep(2)
mutexB.acquire()
print(self.name + '---do1---down---')
mutexB.release()
mutexB.release()
class MyThread2(threading.Thread):
def run(self):
# 对 mutexA 上锁
mutexB.acquire()
# mutexA 上锁后,等待一秒,等待另外一个线程释放 mutexB
print(self.name + '---do2---up---')
time.sleep(2)
mutexA.acquire()
print(self.name + '---do2---down---')
mutexA.release()
mutexB.release()
if __name__ == "__main__":
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
while len(threading.enumerate()) != 1:
time.sleep(1)
length = len(threading.enumerate()) # 枚举,返回一个列表
print('当前运行的线程数量为:', length)
Thread-1---do1---up---
Thread-2---do2---up---
当前运行的线程数量为: 3
当前运行的线程数量为: 3
当前运行的线程数量为: 3
...
程序就一直运行下去了。
五、线程间的通信
我们来研究一下线程间的消息通讯机制,目前我们遇到的问题是这样的:很多个线程发出消息,然后有很多个线程接收消息。其实这件事情很容易出问题,目前最安全的解决办法是使用 queue 中的队列。队列已经实现了锁的功能,使用其他数据结构需要自己来实现。
创建一个被多个线程共享的 Queue 对象,这些线程通过 put() 和 get() 操作来向队列中添加或者删除元素。下面是一些队列常见的函数。
from queue import Queue
# maxsize 默认为 0,不受限制
# 一旦设置大于零,当消息数达到限制,q.put() 也会被阻塞
q = Queue(maxsize=0)
# 阻塞程序,等待队列消息
q.get()
# 获取消息,设置超时时间
q.get(timeout=5.0)
# 发送消息
q.put()
# 所有任务完成时程序才继续执行后面的代码,否则处于阻塞状态。
q.join()
# 查询当前队列的消息个数
q.qsize()
# 队列消息是否都被消费完,True/False
q.empty()
# 队列消息是否已满
q.full()
生产者消费者模型:
线程间通信的问题其实可以抽象成一个生产者、消费者模型,发出信息的进程(们)作为生产者,接收信息的进程(们)作为消费者。
我们希望:在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个结耦的过程。
我们先来看一个例子:
import random
import threading
import time
import queue
q = queue.Queue(maxsize=10)
def producter(name):
count = 1
while True:
q.put("红包{}".format(count))
print('生成了红包', count)
count += 1
time.sleep(random.randrange(3))
def consumer(name):
while True:
print('[{}]抢到了红包[{}]!!'.format(name, q.get()))
time.sleep(random.randrange(5))
p = threading.Thread(target=producter, args=("tencent",))
c1 = threading.Thread(target=consumer, args=('Tim',))
c2 = threading.Thread(target=consumer, args=('King',))
c3 = threading.Thread(target=consumer, args=('Wang',))
p.start()
c1.start()
c2.start()
生成了红包 1
[Tim]抢到了红包[红包1]!!
生成了红包 2
[King]抢到了红包[红包2]!!
生成了红包 3
[Tim]抢到了红包[红包3]!!
生成了红包 4
生成了红包 5
在这个例子中,线程 p 作为发出消息的生产者,发出了一个 str 对象,但是它的生产速度很快,所以保存在队列中,c1, c2, c3 作为接收消息的消费者,依次从队列中获取消息,这样就实现了线程间消息的安全传递。
为了不固化思维,我们再来看一个例子:
这个例子来自知乎@咪咪怪
设想有这样一个情况:下面有 6 个美少女,她们准备去量身高,有三个称重处可以服务。
![](https://img.haomeiwen.com/i17584033/ff4e6c9fa6ac9b56.png)
import queue
import threading
def worker():
while True:
item = q.get()
if item is None:
break
print("妹纸名字{},年龄{}".format(item['name'], item['age']))
q.task_done() # 这个语句与下面的 q.join() 配合使用
q = queue.Queue()
num_worker_threads = 3 # 三个人在同时给妹子测量身高
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
beauty_girls = [{"name": "小H", "age": 23},
{"name": "小E", "age": 22},
{"name": "小D", "age": 21},
{"name": "小C", "age": 20},
{"name": "小B", "age": 19},
{"name": "小A", "age": 18},]
for girl in beauty_girls:
q.put(girl)
# 所有任务完成时才继续执行后面的代码,否则处于阻塞状态
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
妹纸名字小H,年龄23
妹纸名字小E,年龄22
妹纸名字小D,年龄21
妹纸名字小C,年龄20
妹纸名字小B,年龄19
妹纸名字小A,年龄18
这个例子实际上是主进程作为消息的发出者,它每次发出的信息是一个装着美女的字典,三个称重处进程是信息的接收者(也就是接收一个美女啦)。
六、线程池
线程池可以看做是线程的集合。在没有任务时线程处于空闲状态,当请求到来:线程池给这个请求分配一个空闲的线程,任务完成后回到线程池中等待下次任务**(而不是销毁)。这样就实现了线程的重用。
为每个请求都开一个新的线程虽然理论上是可以的,但是会有缺点:
- 线程生命周期的开销非常高。每个线程都有自己的生命周期,创建和销毁线程所花费的时间和资源可能比处理客户端的任务花费的时间和资源更多,并且还会有某些空闲线程也会占用资源。
- 程序的稳定性和健壮性会下降,每个请求开一个线程。如果受到了恶意攻击或者请求过多(内存不足),程序很容易就奔溃掉了。
所以说:我们的线程最好是交由线程池来管理,这样可以减少对线程生命周期的管理,一定程度上提高性能。
在 Python3 中,创建线程池是通过 concurrent.futures 函数库中的 ThreadPoolExecutor 类来实现的。
future 对象(期货对象):在未来的某一时刻完成操作的对象. submit 方法可以返回一个 future 对象.
import threading, time
from concurrent.futures import ThreadPoolExecutor
# 线程执行的函数
def add(n1, n2):
v = n1 + n2
print('add: ', v, ',tid: ', threading.currentThread().ident)
time.sleep(n1)
return v
# 通过submit把需要执行的函数扔进线程池中.
# submit 直接返回一个future对象
ex = ThreadPoolExecutor(max_workers=3) # 制定最多运行N个线程
future1 = ex.submit(add,2,3)
future2 = ex.submit(add,2,2)
print('main thread running')
print(future1.done()) # done 看看任务结束了没
print(future1.result()) # 获取结果 ,阻塞方法
add: 5 ,tid: 5564
add: 4 ,tid: 31484
main thread running
False
5
map 方法,返回是跟你提交序列是一致的. 是有序的
import threading, requests
from concurrent.futures import ThreadPoolExecutor
# 下面是map 方法的简单使用. 注意:map 返回是一个生成器 ,并且是*有序的*
URLS = ['http://www.baidu.com', 'http://www.qq.com', 'http://www.sina.com.cn']
def get_html(url):
print('thread id:',threading.currentThread().ident,' 访问了:',url)
return requests.get(url) # 这里使用了requests 模块
ex = ThreadPoolExecutor(max_workers=3)
res_iter = ex.map(get_html,URLS) # 内部迭代中, 每个url 开启一个线程
for res in res_iter: # 此时将阻塞 , 直到线程完成或异常
print('url:%s ,len: %d'%(res.url,len(res.text)))
thread id: 32500 访问了: http://www.baidu.com
thread id: 35188 访问了: http://www.qq.com
thread id: 34976 访问了: http://www.sina.com.cn
url:http://www.baidu.com/ ,len: 2381
url:https://www.qq.com/ ,len: 231703
url:https://www.sina.com.cn/ ,len: 570395
接下来,使用as_completed . 这个函数为submit 而生。
你总想通过一种办法来解决submit后啥时候完成的吧 , 而不是一次次调用future.done 或者 使用 future.result 吧。
concurrent.futures.as_completed(fs, timeout=None) 返回一个生成器,在迭代过程中会阻塞。
直到线程完成或者异常时,产生一个Future对象。
同时注意, map方法返回是有序的, as_completed 是那个线程先完成/失败 就返回。
import threading, requests, time
from concurrent.futures import ThreadPoolExecutor, as_completed
# as_completed 完整的例子
# as_completed 返回一个生成器,用于迭代, 一旦一个线程完成(或失败) 就返回
URLS = ['http://www.baidu.com', 'http://www.qq.com', 'http://www.sina.com.cn']
def get_html(url):
time.sleep(1)
print('thread id:',threading.currentThread().ident,' 访问了:',url)
return requests.get(url)
ex = ThreadPoolExecutor(max_workers=3) # 最多3个线程
future_tasks = [ex.submit(get_html,url) for url in URLS] # 创建3个future对象
for future in as_completed(future_tasks): # 迭代生成器, 参数是一个列表
try:
resp = future.result()
except Exception as e:
print('%s'%e)
else:
print('%s has %d bytes!'%(resp.url, len(resp.text)))
thread id: 5160 访问了: http://www.baidu.com
thread id: 7752 访问了: http://www.sina.com.cn
thread id: 5928 访问了: http://www.qq.com
http://www.qq.com/ has 240668 bytes!
http://www.baidu.com/ has 2381 bytes!
https://www.sina.com.cn/ has 577244 bytes!
就先写这么多吧!
网友评论