python通过标准库threading实现多线程的运行。
程序的运行总要考虑并发,并行数。在多线程程序中为了确保程序在运行中出现争抢资源的现象,使用线程锁或者线程池来规避资源的争抢。
线程的实现
Python通过两个标准库_thread 和threading,提供对线程的支持 , threading对_thread进行了封装。threading模块中提供了Thread , Lock ,RLock , Condition等组件
因此在实际的使用中我们一般都是使用threading
Thread类的说明:
参数说明:
参数 | 描述 |
---|---|
target | 表示调用对象,即子线程要执行的任务 |
name | 子线程的名称 |
args | 传入target函数的位置参数,是一个元组,参数后面必须加 逗号 |
常用实例方法:
实例方法 | 描述 |
---|---|
Thread.run(self) | 线程启动时运行的方法,由该方法调用target参数所指定的函数 |
Thread.start(self) | 启动线程,start就是帮你去调用run方法 |
Thread.terminate(self) | 强制终止线程 |
Thread.join(self,timeout=None) | 阻塞调用,主线程进行等待 |
Thread.setDeamon(self,deamonic) | 将子线程设置为守护线程 |
Thread.getname(self,name) | 获取线程名称 |
Thread.setname(self,name) | 设置线程名称 |
创建线程:
在python中由两种方法创建线程,实例Thread类和重写Thread类
1、实例Thread类
import threading
import time
#定义线程要运行的函数
def thread_print(startname,endname):
print('我是: {}'.format(startname))
time.sleep(2) #为了便于观察,使程序睡2秒
print('线程结束')
#建立线程实例,args是一个元组,必须加逗号
t1 = threading.Thread(target=thread_print,args=('一','二'))
t2 = threading.Thread(target=thread_print,args=('开始',0))
t1.setDaemon(True)
#t2.setDaemon(True)
t1.start()
t2.start()
输出:
我是: 一
我是: 开始 #睡眠2秒
线程结束
线程结束
Process finished with exit code 0
2、继承Thread类
import threading
import time
#继承threading类中的Thread类
class MyThread(threading.Thread):
# 线程中需要的参数
def __init__(self,name):
super().__init__()
self.name = name
#重构run方法
def run(self):
print('I am {}'.format(self.name))
time.sleep(3)
#创建实例化线程
t1 = MyThread('apple')
t2 = MyThread('banana')
#启动线程,调用类中的run方法
t1.start()
t2.start()
#获取线程名称
print(t1.getName())
print(t2.getName())
输出:
I am apple
I am banana
apple
banana
Process finished with exit code 0
在知道了这两种方法后,我们来看子线程和主线程以及一些线程定义
主线程:当一个程序启动时 , 就有一个线程开始运行 , 该线程通常叫做程序的主线程
子线程:因为程序是开始时就执行的 , 如果你需要再创建线程 , 那么创建的线程就是这个主线程的子线程
主线程的重要性体现在两方面 :
1、是产生其他子线程的线程
2、通常它必须最后完成执行比如执行各种关闭操作
join:阻塞调用程序 , 直到调用join () 方法的线程执行结束, 才会继续往下执行
import threading
import time
#继承threading类中的Thread类
class MyThread(threading.Thread):
# 线程中需要的参数
def __init__(self,name):
super().__init__()
self.name = name
#重构run方法
def run(self):
print('I am {}'.format(self.name))
time.sleep(3)
print('子线程结束!!!')
#创建实例化线程
t1 = MyThread('apple')
#启动线程,调用类中的run方法
t1.start()
t1.join() #只有等待子线程结束,主线程才能结束
print('主线程结束!!!')
输出:
I am apple
子线程结束!!!
主线程结束!!!
Process finished with exit code 0
setDeamon():setDaemon() 与 join() 基本上是相对的 , join会等子线程执行完毕 ; 而setDaemon则不会等
import threading
import time
#继承threading类中的Thread类
class MyThread(threading.Thread):
# 线程中需要的参数
def __init__(self,name):
super().__init__()
self.name = name
#重构run方法
def run(self):
print('I am {}'.format(self.name))
time.sleep(3)
print('子线程结束!!!')
#创建实例化线程
t1 = MyThread('apple')
#启动线程,调用类中的run方法
t1.setDaemon(True) #放在子线程启动之前,否则会报错
t1.start()
print('主线程结束!!!')
输出:
I am apple #子线程没有结束,主线程就已经结束
主线程结束!!!
Process finished with exit code 0
线程间的通信
互斥锁:在多线程中 , 所有变量对于所有线程都是共享的 , 因此 , 线程之间共享数据最大的危险在于多个线程同时修改一个变量 , 那就乱套了 , 所以我们需要互斥锁 , 来锁住数据。
提示!
因为线程属于同一个进程,因此它们之间共享内存区域。因此全局变量是公共的。
from threading import Thread
a = 1
def func():
global a
a = 2
t = Thread(target=func())
t.start()
t.join()
print(a)
输出:
2
Process finished with exit code 0
上面是一个共享参数,也就是共享内存的问题,多线程下,共享内存会出现互相竞争的问题
from threading import Thread
a = 0
def add_func():
global a
for i in range(1000000):
a += 1
def sub_func():
global a
for i in range(1000000):
a -= 1
t_add = Thread(target=add_func)
t_sub = Thread(target=sub_func)
t_add.start()
t_sub.start()
t_add.join()
t_sub.join()
print(a)
输出说明:
当取值小于10000时,结果为0 是正常的
当取值大于10000时,结果就会别的数,有时为正,有时为负数,这就说明这两个线程在互相抢占资源造成结果的不正确。
使用锁来控制共享资源的访问
from threading import Thread,Lock
a = 0
lock = Lock()
def add_func():
global a
for i in range(1000000):
lock.acquire()
a += 1
lock.release()
def sub_func():
global a
for i in range(1000000):
lock.acquire()
a -= 1
lock.release()
t_add = Thread(target=add_func)
t_sub = Thread(target=sub_func)
t_add.start()
t_sub.start()
t_add.join()
t_sub.join()
print(a)
输出:
0
Process finished with exit code 0
或者
from threading import Thread,Lock
a = 0
lock = Lock()
def add_func():
global a
for i in range(1000000):
with lock: #上下文管理器,会自动关闭锁
a += 1
def sub_func():
global a
for i in range(1000000):
with lock:
a -= 1
t_add = Thread(target=add_func)
t_sub = Thread(target=sub_func)
t_add.start()
t_sub.start()
t_add.join()
t_sub.join()
print(a)
输出:
0
Process finished with exit code 0
但是,加锁的缺点是程序会运行非常缓慢。
操作命令 | 描述 |
---|---|
put(item) | 入队 |
get() | 出队 |
empty() | 测试空 #近似 |
full() | 测试满 #近似 |
qsize() | 队列长度 |
task_done() | 任务完成 |
join() | 等待完成 (此处的join和线程的阻塞不是一回事) |
from threading import Thread
from queue import Queue
from random import randint
#创建队列,指定长度
my_q = Queue(10)
def func_put(q):
'''生产数据'''
for i in range(10):
num = randint(0,100)
q.put(num)
def func_get(qq):
'''取出数据'''
for j in range(5): #每次取出5个数值
num = qq.get()
print(num)
t1 = Thread(target=func_put, args=(my_q,))
t2 = Thread(target=func_get, args=(my_q,))
t1.start()
t2.start()
t1.join()
t2.join()
输出:
48
10
78
41
27
Process finished with exit code 0
队列保证了数据的安全性,没有争抢资源的现象。是按照顺序依次取出的。
from queue import Queue
my_q = Queue(4)
my_q.put(1,)
print(my_q.qsize())
my_q.get()
print(my_q.qsize())
print(my_q.empty())
my_q.put(1,)
my_q.put(1,)
my_q.put(1,)
my_q.put(1,)
print(my_q.full())
my_q.task_done()
my_q.task_done()
my_q.task_done()
my_q.task_done()
my_q.task_done()#每put一次,就需要任务完成一次
my_q.join() #检测put和task_dane()是否相同,不相同就会阻塞,相同就会完成
print('ok')
输出:
1
0
True
True
ok
Process finished with exit code 0
线程池
线程池:主线程: 相当于生产者,只管向线程池提交任务。并不关心线程池是如何执行任务的。因此,并不关心是哪一个线程执行的这个任务
线程池: 相当于消费者,负责接收任务,并将任务分配到一个空闲的线程中去执行
线程的简单实现:
from threading import Thread
from queue import Queue
import time
class ThreadPool:
def __init__(self,n):
self.queue = Queue()
for i in range(n):
Thread(target=self.worker).start()
def worker(self):
while True:
func,args,kwargs = self.queue.get()
func(*args,**kwargs)
self.queue.task_done()
def apply_sy(self,func,args = (),kwargs = {}):
self.queue.put((func,args,kwargs))
def join(self):
self.queue.join()
def t1():
print('任务1')
time.sleep(2)
print('任务1完成')
def t2 (*args,**kwargs):
print('任务2',args,kwargs)
time.sleep(2)
print('任务2完成',args,kwargs)
pool = ThreadPool(4)
pool.apply_sy(t1)
pool.apply_sy(t2,args = (1,2),kwargs = {'a':3,'b':4})
print('任务提交')
pool.join()
print('任务完成')
输出:
任务提交
任务1任务2
(1, 2) {'a': 3, 'b': 4}
任务2完成 任务1完成
(1, 2) {'a': 3, 'b': 4}
任务完成
Process finished with exit code -1
python内置线程池:
from multiprocessing.pool import ThreadPool
import time
def t1():
print('任务1')
time.sleep(2)
print('任务1完成')
def t2 (*args,**kwargs):
print('任务2',args,kwargs)
time.sleep(2)
print('任务2完成',args,kwargs)
pool = ThreadPool(4)
pool.apply_async(t1)
pool.apply_async(t2,args = (1,2),kwds = {'a':3,'b':4})
print('任务提交')
pool.close() #在join前必须要有close,这样就不允许提交任务了
pool.join()
print('任务完成')
输出:
任务提交任务1
任务2 (1, 2) {'a': 3, 'b': 4}
任务2完成任务1完成
(1, 2) {'a': 3, 'b': 4}
任务完成
Process finished with exit code 0
池的其他操作:
1、关闭操作:close -关闭提交通道,不允许在提交任务
2、终止操作:terminate -终止进程池,终止所有任务。
from multiprocessing.pool import ThreadPool
import time
def t1():
print('任务1')
time.sleep(2)
print('任务1完成')
def t2 (*args,**kwargs):
print('任务2',args,kwargs)
time.sleep(2)
print('任务2完成',args,kwargs)
pool = ThreadPool(4)
pool.apply_async(t1)
pool.apply_async(t2,args = (1,2),kwds = {'a':3,'b':4})
print('任务提交')
pool.terminate() #终止进程池,终止所有任务
pool.join()
print('任务完成')
输出:
任务提交
任务完成
Process finished with exit code 0
网友评论