python的thread模块是比较底层的模块(可能不同的操作系统不一样),python的threading模块是对thread做了一些包装的,可以更加方便的被使用(跨平台)
ps:看程序代码的时候,切忌从上往下看,而是主要看程序的框架(比如C语言中,主要看main函数)
Thread对象基础
threading模块
threading模块中的对象列表
- Thread:表示一执行线程的对象
- Lock:锁原语对象(和thread模块中的锁一样)
- RLock:可重入锁对象(可读不可写)
- Condition:条件变量对象,使得一个线程等待另一个线程满足特等的“条件”,比如改变状态或某个数据值
- Event:条件变量的通用版本,任意数量的线程等待某个事件的发生,在该事件发生后所有线程将被激活
- Semaphore:为线程间共享的有限资源提供一个“计数器”,如果没有可用资源将会被阻塞
- BoundeSemaphore:与Semaphore相似,不过它不允许超过初始值
- Timer:与Thread相似,不过它要在运行前等待一段时间
- Barrier:创建一个“障碍”,必须达到指定数量的线程后才可以继续
ps:我们通过Python实现过线程编程,猪油用到的是threading.Thread对象
Thread对象常用的方法和属性
数据属性:
-
name:线程名
-
ident:线程的标识符
-
daemon:布尔标志,表示这个线程是否是守护线程
- 用法:线程对象.setDaemon(True)
- 如果把子线程设置为守护线程,表示该线程不重要,主线程结束,子线程结束
对象方法:
- __init__(group=None, target=None, name=None, args=(), kwargs={}, verbose=None,daemon=None):实例化一个线程对象,需要有一个可调用的target(函数),以及其参数args或kwargs。还可以传递name或group参数,不过后者还未实现。此外,verbose标志也是可接受的,而daemon的值将会设定thread.daemon标志/属性
- start():开始执行该线程
- run():定义线程功能的方法,通常在子类中被开发者重写
- join(timeout=None):直至启动的线程终止之前一直:除非给出了timeout(秒),否则会一直阻塞,相当于在此处等待调用者线程完成
线程详解
多线程执行
法一:函数方式
import threading #threading模块与Process很类似
import time
def saySorry():
print("亲爱的,我错了,我能吃饭了吗?")
time.sleep(1) #睡一秒
if __name__ == "__main__":
for _ in range(10):
t = threading.Thread(target=saySorry) #创建一个线程对象t
t.start()
#会发现其执行时间一共也就在1s左右(如果不用线程,应该在10s作用)
#主线程要等待所有子线程结束后才能结束
进程、程序与线程:
-
程序是死的,是代码的集合;
-
程序的运行被称之为进程——是拥有资源的最小单位;
-
线程是程序调度的最小单位
ps:如果多个线程执行的都是同一个函数的话,各自之间不会有影响,各是个的
方式二:类的方式
import threading,time
class MyThread(threading.Thread):
#与Process相似,重写run方法
def run(self):
for i in range(10):
time.sleep(1)
msg = "I'm {name} @ {count}".format(name = self.name, count = i)
print(msg)
if __name__ == "__main__":
t = MyThread(name = "test_thread") #创建线程并为线程指定名称。当函数结束的时候,t这个线程就结束了
t.start() #线程开始执行
print("当前线程:" + str(threading.enumerate())) #enumerate()能够获得当前时刻 程序中的所有线程(包括自己)
# ps:enumerate回忆:
# names = ["aa","bb","cc"]
# for temp in enumerate(names):
# print(temp) # 会输出序号几列表元素所组成的元组
#对于线程,主线程一般也要等待子线程(为了收回子线程占有的一点点资源)
线程的执行顺序
线程的执行顺序不确定,与进程一样,取决于操作系统的调度算法
多线程对全局变量的共享
from threading import Thread
import time
g_num = 100
def work1():
global g_num #注意:全局变量只要没有改指向,则在函数里面就不需要加global,如果可能改指向,就需要加global
for _ in range(3):
g_num += 1
print("In work1, g_num is {}".format(g_num))
def work2():
global g_num
print("In work2, g_num is {}".format(g_num))
print("线程创建之前,g_num is {}".format(g_num))
t1 = Thread(target=work1)
t1.start()
time.sleep(1) #睡眠1s,保证t1执行完毕
t2 = Thread(target=work2)
t2.start()
'''执行结果如下:
线程创建之前,g_num is 100
In work1, g_num is 103
In work2, g_num is 103
'''
对于进程,全局变量不共享,而对于多线程,全局变量是可以共享的,因为进程是拥有资源的最小单位,而线程是共享其所属进程的资源,线程自己字拥有执行所必不可少的一点点资源。也因此,线程之间的通信比进程之间的通信方便
但是线程对全局变量的共享也会出现问题,如下:
from threading import Thread
import time
g_num = 0
def work1():
global g_num
for _ in range(1000000):
g_num += 1
print("g_num is {}".format(g_num))
print("线程创建之前,g_num is {}".format(g_num))
t1 = Thread(target=work1)
t1.start()
# time.sleep(1) #睡眠1s,保证t1执行完毕
t2 = Thread(target=work1)
t2.start()
'''本次运行结果如下:(进行了2000000次加法,但是结果并不是2000000)
线程创建之前,g_num is 0
g_num is 1153259
g_num is 1247721
'''
ps:对于变量,线程除了可以以全局变量的形式共享,还可以以参数的形式共享(在Thread ()中以args = ()的形式传递
对于以上代码运行结果不是2000000的解析:
在线程执行g_num += 1的时候,实际上是g_num = g_num + 1,先取值加1,然后赋值,一共两步。而在多线程的时候,由于cpu的调度,一个线程中的这两步可能会被打断,所以运行结果不为2000000
原子性:一段代码,要么不执行,要么就直接执行完,不允许被打断
如何保证代码执行的原子性——互斥锁
threading模块中定义了Lock锁,可以方便的处理锁定:
import threading
mutex = threading.Lock() #创建锁
mutex.acquire([blockign]) #获得锁:锁定
mutext.release() #释放锁
互斥锁的应用:acquire/release
from threading import Thread,Lock
import time,threading
g_num = 0
def work1():
# print("%s"%threading.current_thread().name) #此句可以输出当前线程的名称
global g_num
mutex.acquire() #对g_num操作前上锁,如果一方获得了锁,另一方如果还要获得锁,就必须阻塞(一直等待),直到另一方释放这个锁
for _ in range(1000000):
g_num += 1
print("g_num is {}".format(g_num))
mutex.release() #对g_num的操作完毕后,释放锁,以让其他人可以获得锁从而对g_num进行操作
#ps:把锁的释放放在for外面,相当于把多线程硬生生弄成了单线程,如果只是要最后的结果是2000000,则可以把锁的释放法在for里面,紧跟g_num += 1。其实通常是能不加的代码就不加(即加锁的地方尽可能小)
print("线程创建之前,g_num is {}".format(g_num))
mutex = Lock()
t1 = Thread(target=work1)
t1.start()
# time.sleep(1) #睡眠1s,保证t1执行完毕
t2 = Thread(target=work1)
t2.start()
'''运行结果如下:
线程创建之前,g_num is 0
g_num is 1000000
g_num is 2000000
'''
`ps:等待解锁的方式:通知,而不是轮询
互斥锁的应用:with
import threading
import time
g_num = 0
lock = threading.Lock()
def work1(num):
global g_num
with lock:
for i in range(num):
time.sleep(0.01)
g_num += 1
print("work1, g_num is %d"%g_num)
def work2(num):
global g_num
try:
lock.acquire() #获得对数据的封锁
for i in range(num):
time.sleep(0.01)
g_num += 1
finally:
lock.release() #释放对数据的封锁。acquire和release与with语句效果相同
print("work2, g_num is %d"%g_num)
print("---线程创建之前,g_num is %d---"%g_num)
t1 = threading.Thread(target=work1,args=(100,))
t1.start()
t2 = threading.Thread(target=work2,args=(100,))
t2.start()
while len(threading.enumerate()) != 1:
time.sleep(2)
print("---线程操作之后,g_num is %d---"%g_num)
#加锁之后输出结果为:
# ---线程创建之前,g_num is 0---
# work1, g_num is 100
# work2, g_num is 200
# ---线程操作之后,g_num is 200---
对于互斥锁,通常是对值进行修改时才加锁,不修改的话不用加锁
多线程使用非共享变量(函数里面的变量)
即当多个线程所用的代码相同时,其中变量的情况如何
from threading import Thread
import threading,time
def test():
name = threading.current_thread().name
print("Thread name is :{}".format(name))
num = 100
if name == "Thread-1":
num += 1
else:
time.sleep(2)
print("Thread is {}, num is {}".format(name, num))
t1 = Thread(target=test)
t1.start()
t2 = Thread(target=test)
t2.start()
'''运行结果如下:
Thread name is :Thread-1
Thread is Thread-1, num is 101
Thread name is :Thread-2
Thread is Thread-2, num is 100
'''
#说明虽然两个线程都是到同一个函数里面执行,但是他们函数里面的数据“各人是各人的”,互不影响,所以不需要加锁。而全局变量是公用的
线程间使用Queue通信
from queue import Queue
import queue
import threading
import time
q = Queue(maxsize=10) #队列的最大容量为10
def producer():
for i in range(10):
q.put(i)
def customer():
for i in range(10):
data = q.get()
print(data, end=" ")
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=customer)
t1.start()
t2.start()
#输出结果如下:
# 0 1 2 3 4 5 6 7 8 9
# ps:stack = queue.LifoQueue() #栈
# ps:队列的其他属性
# q.empty()
# q.full()
# q.maxsize
# q.qsize()
线程池
线程池中线程的创建、执行、销毁都由线程池自己执行
线程池的基类是 concurrent.futures 模块中d Executor,Executor 提供了两个子类,即ThreadPoolExecutor 和ProcessPoolExecutor,其中ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。
Exectuor 提供了如下常用方法:
- submit(fn, *args, kwargs):将 fn 函数提交给线程池。args 代表传给 fn 函数的参数,kwargs 代表以关键字参数的形式为 fn 函数传入参数。
- map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
- shutdown(wait=True):关闭线程池。
在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。
- 使用线程池的步骤
- 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
- 定义一个普通函数作为线程任务。
- 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
- 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。
import threading
import time
def add(n1, n2):
v = n1 + n2
time.sleep(n1)
return v
import concurrent.futures as futures
ex = futures.ThreadPoolExecutor(max_workers = 3) #创建线程池并设置线程池容量
#ps:futures的意义:结果要未来才能获得
f1 = ex.submit(add, 2, 3) #创建线程并提交到线程池(ps:提交后线程即开始执行),返回值为一个Future对象
f2 = ex.submit(add, 2, 2)
print(f1.done()) #判断线程是否执行结束。输出False,因为线程还没有执行完
print(f1.result()) #获得线程的执行结果。输出5,(通常是程序执行完才有返回值,故这里可以用于阻塞线程,但是也可以对result指定timeout参数。
ex.shutdown()
ps:关于Future的简单理解:由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。
Future对象
Future 提供了如下方法:
- cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。
- cancelled():返回 Future 代表的线程任务是否被成功取消。
- running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
- done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。
- result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
- exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。
- add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。
获取执行结果
- 用Future的result()方法:但是该方法会阻塞当前主线程,只有等到当前任务完成后,result()方法的阻塞才会被解除
- 通过Future的add_done_callback()方法来添加回调函数,该回调函数形如fn(future)。当线程任务完成后,程序会自动触发该回调函数,并将对应的Future对象作为参数传给该回调函数
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含2条线程的线程池
with ThreadPoolExecutor(max_workers=2) as pool:
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
def get_result(future):
print(future.result())
# 为future1添加线程完成的回调函数
future1.add_done_callback(get_result)
# 为future2添加线程完成的回调函数
future2.add_done_callback(get_result)
print('--------------')
多线程的应用:下载图片
import os
import random
import time
import requests
import concurrent.futures as futures
def download_img(url):
resp = requests.get(url) #获得链接指向文件的内容
filename = os.path.split(url)[1] # 把路径分为目录名和文件名,[1]为文件名
with open(filename, "wb+") as f:
f.write(resp.content) #写入文件
num = random.randint(0,5)
time.sleep(num)
print(num)
return filename
# 链接的获得:网络上随便点击一张图片,右击:查看图片
urls = ["http://pic118.huitu.com/res/20190420/1480621_20190420132348580020_1.jpg",
"https://img.ivsky.com/img/tupian/pre/201811/19/jiguang-008.jpg"]
ex = futures.ThreadPoolExecutor(max_workers=2) #创建线程池
res_iter = ex.map(download_img, urls) #以第二个参数为第一个参数(函数)的参数
"""
print(type(res_iter)) #发现其是一个生成器
for res in res_iter:
print(type(res))
help(res)
"""
fu_tasks = [ex.submit(download_img, url) for url in urls]
"""
print(type(fu_tasks[0]))
for future in futures.as_completed(fu_tasks): #as_completed没有顺序,谁先完成就先返回谁
print(future.result())
"""
ThreadLocal对象在线程中的应用
传参问题
前奏-其一
- 函数
- 传参
- 全局变量
- 返回值
前奏-其二
-使用全局字典
用一个全局字典dict存放所有的对象,然后以thread自身作为key获得线程对应的对象(以Studetn为例)
global_dict = {}
def std_thread(name):
std = Student(name) #当多个线程来执行std_thread的时候,各自得到各自的Student对象
#把std放到全局变量global_dict中;键值也可以使用
global_dict[threading.current_thread()] = std #当多个线程来执行这句话时,由于键值不同,能够取出不同的值
do_task1()
do_task2()
def do_task1():
#不传入std,而是根据当前线程查找,故也可以取出自己想要的值。在没有传参的情况下保证了多个线程使用同一个全局变量来时候并没有出错
std = global_dict[threading.current_thread()] #键值也可以使用threading.current_thread().name
...
def do_task2():
#不传入std,而是根据当前线程查找:
std = global_dict[threading.current_thread()]
...
这种方式理论上是可行的,它最大的有点是消除了std对象在每层函数中的传递问题,但是,每个函数获得std的代码有点low
其三:使用ThreadLocal
import threading
local_school = threading.local() #创建一个ThreadLocal对象
def process_student():
#获取当前进程相关的student
std = local_school.student #取出在process_thread中赋予的属性
print("Hlello, {} in {}".format(std, threading.current_thread().name))
def process_thread(name):
#绑定ThreadLocal的studetn
local_school.student = name #给对象添加属性student,对于ThreadLocal对象的同一个属性,在一个线程中设置的是哪个值,到时候取出的就是哪个值,不会因为线程不一样导致同一属性的值不一样(即:对于同一个属性,在不同线程中的操作是互不影响的)
process_student()
t1 = threading.Thread(target=process_thread, args=("biubiu~",), name = "Thread_A")
t2 = threading.Thread(target=process_thread, args=("老王",), name = "Thread-B")
t1.start()
t2.start()
t1.join()
t2.join()
'''运行结果如下:
Hlello, biubiu~ in Thread_A
Hlello, 老王 in Thread-B
'''
网友评论