一、什么是线程
每个进程有一个地址空间,而且默认就有一个控制线程。
线程就流水线工作的过程,一条流水线必须属于一个车间,车间的工作过程是一个进程,一个车间内至少有一条流水线。
所以,进程只是用来把资源集中在一起,只是一个资源单位,或者说资源集合,而线程才是cpu上的执行单位。
多线程的概念是,在一个进程中存在多个线程,多个线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。
线程与进程的区别
-
Threads share the address space of the process that created it; processes have their own address space.
线程共享创建它进程的地址空间;进程拥有自己的地址空间。 -
Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
线程可以直接访问其进程的数据段;进程具有父进程的数据段的副本。 -
Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
线程可以直接与其进程的其他线程通信;进程必须使用进程间通信来与兄弟进程通信。 -
New threads are easily created; new processes require duplication of the parent process.
新线程容易创建;新进程需要复制父进程。 -
Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
线程可以对同一进程的线程执行相当大的控制;进程只能对子进程执行控制。 -
Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.
对主线程的更改(取消、优先级更改等)可能会影响进程的其他线程的行为;对父进程的更改不影响子进程。
总结:
- 同一个进程内的多个线程共享该进程内的地址资源
- 创建线程的开销要远小于创建进程的开销
三 多线程应用举例
举例:
开启word,该进程需要办很多事,如监听键盘输入,处理文字,定时自动将文字保存到硬盘,这三个任务操作的都是同一块数据,因而不能用多进程。只能在一个进程里并发地开启三个线程,如果是单线程,那就只能是,键盘输入时,不能处理文字和自动保存,自动保存时又不能输入和处理文字。
开启线程的两种方式
from threading import Thread
import time
def test_T1():
time.sleep(2)
print('T1子线程')
if __name__ == '__main__':
t=Thread(target=test_T1)
t.start()
print('T1主线程')
class TestT2(Thread):
def __init__(self):
super().__init__()
def run(self):
time.sleep(2)
print('T2子线程')
if __name__ == '__main__':
t = TestT2()
t.start()
print('T2主线程')
#执行结果:
# T1主线程
# T2主线程
# T1子线程
# T2子线程
theading里的Thread对象
Thread实例对象的方法
- isAlive(): 返回线程是否活动的。
- getName(): 返回线程名。
- setName(): 设置线程名。
threading模块方法:
- threading.currentThread(): 返回当前的线程变量。
- threading.enumerate(): 返回一个包含正在运行的线程的list。正在-运行指线程启动后、结束前,不包括启动前和终止后的线程。
- threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
import threading
def work():
import time
time.sleep(3)
print(threading.current_thread().getName())
if __name__ == '__main__':
t=threading.Thread(target=work)
t.start()
print(threading.current_thread().getName())
print(threading.current_thread()) #主线程
print(threading.enumerate()) #连同主线程在内有两个运行的线程
print(threading.active_count())
print('主线程/主进程')
'''
MainThread
<_MainThread(MainThread, started 12876)>
[<_MainThread(MainThread, started 12876)>, <Thread(Thread-1, started 4576)>]
2
主线程/主进程
Thread-1
'''
import time
def test(name):
time.sleep(2)
print('%s' %name)
if __name__ == '__main__':
t=threading.Thread(target=test,args=('zhy',))
t.start()
t.join()
print('主线程')
print(t.is_alive())
'''
zhy
主线程
False
'''
守护线程
守护线程会等待主线程运行完毕后被销毁。(守护进程也一样遵循此原则。)
需要强调:运行完毕并非终止运行
-
对主进程来说,运行完毕指的是主进程代码运行完毕
-
对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
1、主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源才会结束,否则会产生僵尸进程。
2、主线程在其他非守护线程运行完毕后才算运行完毕。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
from threading import Thread
import time
def test(name):
time.sleep(2)
print('%s' %name)
if __name__ == '__main__':
t=Thread(target=test,args=('zhy',))
t.setDaemon(True) #必须在t.start()之前设置
t.start()
print('主线程')
print(t.is_alive())#True
def test1():
print('test1')
time.sleep(1)
print("end1")
def test2():
print('test2')
time.sleep(3)
print("end2")
if __name__ == '__main__':
t1=Thread(target=test1)
t2=Thread(target=test2)
t1.daemon=True
t1.start()
t2.start()
print("main-------")
'''
test1
test2
main - ------
end1
end2
'''
二、GIL全局解释器锁
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
翻译:在CPython中,全局解释器锁或GIL是防止多重的互斥体。立即执行Python字节码的本地线程。这个锁是必性的,因为CPython的内存管理线程不安全。(然而,自从GIL存在,其他特征发展开始依赖与它。)
GIL并不是Python的特性,它是实现Python解析器CPython所引入的概念,因为CPython是大部分环境下默认执行环境,很多人认为CPython就是Python,把GIL归结为Python语言的缺陷。
GIL并不是Python的特性,Python完全可以不依赖于GIL
GIL是什么
GIL本质就是一把互斥锁,将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,保证数据安全。每次执行python程序,都会产生一个独立的进程。
- 所有数据都是共享的,代码作为一种数据也是被所有线程共享,在进程内所有线程都能访问到代码,于是我们可以开启三个线程然后target都指向该代码,能访问到意味着就是可以执行。
- 所有线程的任务,都需要将任务的代码当做参数传给解释器的代码去执行。
多个线程执行流程是
多个线程先访问到解释器的代码,本质就是访问了开启进程的代码,该代码再target所执行的代码,并将所执行的代码作为参数交给解释器去执行。
解释器的代码是所有线程共享的,垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题:对于同一个数据10,可能线程1执行x=10的同时,垃圾回收执行的是回收10的操作。解决这种问题的方法就是加锁处理,保证python解释器同一时间只能执行一个任务的代码。
GIL与Lock
Python已经有GIL来保证同一时间只能有一个线程,为什么还需要lock?
因为锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据。保护不同的数据就应该加不同的锁,GIL 与Lock是两把锁,保护的数据不一样。前者是解释器级别的,保护的是解释器级别的数据,如垃圾回收的数据;后者是保护开发应用程序的数据,只能用户自定义加锁处理。
代码执行过程
- 100个线程去抢GIL锁,即抢执行权限。
- 线程1先抢到GIL,开始执行,一旦执行就会拿到lock.acquire()。
- 线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,被迫交出执行权限,即释放GIL。
- 直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,其他的线程再重复2 3 4的过程。
GIL与多线程
有了GIL的存在,同一时刻同一进程中只有一个线程被执行。
问题:进程可以利用多核,但是开销大,而python的多线程开销小,但却无法利用多核优势,python怎么用?
回答:该问题忽略了现实程序中的I/O阻塞。
1.多cpu可以有多个核并行完成计算,所以多核提升的是计算性能。
2.cpu遇到I/O阻塞,仍需要等待,多核对I/O操作没有用处
工人相当于cpu,工人干活的过程中如果没有原材料,则需要等待原材料的到来。如果工厂干的大多数任务都要有准备原材料的过程,即I/O密集型,那再多的工人意义也不大。反过来讲,如果工厂原材料都齐全,则工人越多,效率越高。
结论:
- 对于纯计算,cpu越多越好,但是对于I/O来说,多cpu没用。
- 计算型和I/O型是相对的概念。对运行一个程序来说,cpu的增多执行效率会有所提高,这是因为程序不会是纯计算或者纯I/O。
- 大多数网络编程,都是I/O密集型,python对于IO密集型的任务效率显著。
- 多线程用于IO密集型:如socket,爬虫,web;多进程用于计算密集型,如金融分析
三、死锁现象
死锁: 是指两个或两个以上的进、线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法执行下去。
from threading import Thread,Lock
import time
l1 = l2 = Lock()
class TestLock():
def run(self):
self.func()
def func(self):
l1.acquire()
print('fuc1上11锁保护')
l2.acquire()
print('fuc1上12锁保护')
l1.release()
l2.release()
if __name__ =='__main__':
t = TestLock()
t.run()
加锁意味着同一时间只能执行一个线程,锁死是在执行过程中又加了把锁。
锁2:你先执行完我在执行。
锁1:把你锁2执行完我就执行完了。
解决方法:递归锁
在Python中为了支持在同一线程中多次请求同一资源,python提供了可递归锁RLock。
RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。递归锁可以连续acquire多次,而互斥锁只能acquire一次。上面的例子用RLock,问题得到解决。
信号量 Semaphore
信号量也是一把锁,可以拿锁的线程数量,这个数量便是信号量的大小。假如指定信号量为5,互斥锁同一时间只能有一个任务抢锁去执行,信号量同时则可以有5个任务拿锁执行。
from threading import Thread,Semaphore,current_thread
import time
def func():
sm.acquire()#加信号量的锁
print('%s' %current_thread().getName())
time.sleep(1)
sm.release()
if __name__ == '__main__':
sm = Semaphore(5)
for i in range(30):
t = Thread(target=func)
t.start()
原理:Semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1,调用release() 时内置计数器+1,计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
三、线程的相关方法
控制线程状态 Event
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题很麻烦。为了解决这个问题,需要使用threading库中的Event对象,它允许线程等待某些事件的发生。
在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
- event.isSet():返回event的状态值;
- event.wait():如果 event.isSet()==False将阻塞线程;
- event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
- event.clear():恢复event的状态值为False。
from threading import Thread,Event,current_thread
import time
def test():
n=1
while not event.is_set():
print('%s waiting.....'%n)
time.sleep(1)
n+=1
def test2():
event.set()
print('连接成功')
if __name__ == '__main__':
event = Event()
t = Thread(target=test)
t2 = Thread(target=test2)
t.start()
time.sleep(4)
t2.start()
定时器 Timer
指定n秒后执行某操作
from threading import Timer
def test(name):
print("%s hello"%name)
t = Timer(1,test,['zhy'])
t.start()
一秒后输出:zhy hello
线程queue
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
翻译:队列在线程编程中特别有用,尤其是在信息必须在多个线程之间交换时。
三种不同的用法
- queue.Queue() 队列,先进先出。
- queue.LifoQueue()堆栈,后进先出。
- queue.PriorityQueue() 优先级队列,自己设置优取出的级别。
import queue
q=queue.Queue()
'''
队列,先进先出。
输出结果1 2 3
'''
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
q=queue.LifoQueue()
'''堆栈,后进先出。
输出结果3 2 1
'''
q.put(1)
q.put(2)
q.put(3)
q=queue.PriorityQueue()
'''
优先队列,自己设计优先级,输出结果b c a
数字越小越优先输出,拿出数据类型是元组。
'''
q.put((3,'a'))
q.put((1,'b'))
q.put((2,'c'))
print(q.get())
print(q.get())
print(q.get())
四、进程池与线程池
初学多进程或多线程时,我们简单实现并发的套接字通信,单这种实现方式的致命缺陷是:服务的开启的进程数或线程数会随着并发的客户端数目地增多而增多,这会对服务端主机带来巨大压力,于是要考虑对服务端开启的进程数或线程数加以控制,这就是进程池或线程池的用途。
介绍
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
基本方法
1、submit(fn, *args, **kwargs)
异步提交任务
2、map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作
3、shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
4、result(timeout=None)
取得结果
5、add_done_callback(fn)
回调函数
进程池
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously.
ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.
翻译:ProcessPoolExecutor类是一个执行程序子类,它使用进程池异步执行调用。ProcessPoolExecutor使用多处理模块,允许它跨步执行全局解释器锁,但也意味着只能执行可返回对象并返回。
一个执行器子类,它使用max_workers执行异步调用,如果未指定,则会将默认为机器上的处理器数量作为默认值,等于0,则将引发ValueError错误。
用法
from concurrent.futures import ProcessPoolExecutor as PPE
import os,time,random
def test(n):
print(n+1,'%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n*n
if __name__ == '__main__':
executor=PPE(max_workers=7)
res=[]
for i in range(20):
f=executor.submit(test,i)#异步提交数据
res.append(f)
executor.shutdown(True)#等所有进程执行完毕后才回收资源
for i in res:
print(i.result()) #取得函数执行结果
线程池
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.
New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.
翻译:ProcessPoolExecutor是一个执行器子类,它使用线程池异步执行调用。使用最多Max工线程的池异步执行调用。版本3.5:如果 max_workers没有给出,它将默认为机器上的处理器数乘以5,假设ProcessPoolExecutor经常被用来I/O操作,那么max_work的数量应该高于这个值。
新版本3.6:The thread_name_prefix argument 增加了允许用户控制创建的辅助线程,以便于调试。
线程池用法
线程池的用法和进程池完全一样,知识将ProcessPoolExecutor换成ThreadPoolExecutor。
map方法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os,time,random
def test(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2
if __name__ == '__main__':
executor=ThreadPoolExecutor(max_workers=3)
executor.map(test,range(1,12))
'''
#map取代了for+submit,等同于:
for i in range(11):
future=executor.submit(task,i)
'''
回调函数
可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数。
from concurrent.futures import ProcessPoolExecutor
import requests
import os
def get_page(url):
print('<进程%s> get %s' %(os.getpid(),url))
respone=requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text}
def parse_page(res):
res=res.result()
print('<进程%s> parse %s' %(os.getpid(),res['url']))
parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res)
if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.python.org',
'http://www.sina.com.cn/'
]
p=ProcessPoolExecutor(3)
for url in urls:
p.submit(get_page,url).add_done_callback(parse_page)
#parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
网友评论