Python 有关线程的模块主要有两个,一个是 thread
,另一个是 Threading
。thread
更趋向于底层,官方建议能力高的人去操作这个模块,对于我等初学者来说,threading
模块已经够我们折腾一阵子了。threading
模块与 Process
模块差不多,只是从结构上来说,用法有点类似。
其实 Python 的多线程有点鸡肋,准确地说,是 CPython 的多线程有点鸡肋。在 Python 诞生的那个年代,哪有什么多线程技术,一个 CPU 就够搞了,用不到什么特别高深的技术,所以这也算是个历史遗留问题吧。Python 有一个 全局解释器锁机制,就是俗称的 GIL。每个线程在执行的过程中都需要先获取 GIL。简单来说,就是 Python 将所有线程都加了一把锁,保证在同一时刻只能有一条线程去操纵数据。释放 GIL 情况:在 IO 操作等可能会引起阻塞的系统调用之前,可以释放 GIL。所以,Python 在调用一些 C 代码写的程序时,解释器都会释放 GIL。
from threading import Thread
def func(n):
pass
t = Thread(target=func, args=(,))
t.start()
t.join()
# 还有类实现的
class MyThread(Thread):
def __init__(slef, )
super().__init__()
pass
def run(self):
pass
实现一个 socket
服务端:
import socket
from threading import Thread
def chat(conn):
con.send(b'hellow')
msg = conn.recv(1024).decode('utf-8')
print(msg)
conn.close()
sk = socket.socket()
sk.bind(('127.0.0.1', 8000))
sk.listen()
while True:
conn, addr = sk.accept()
Thread(target=chat, args=(conn, )).start()
sk.close()
- 守护线程
守护进程随着主进程代码执行结束而结束
而守护线程会在主线程结束之后等待其他子线程的结束才结束
在t.start()
之前添加t.daemon=True
- 线程锁
from threading import Thread, Lock
def func(lock):
global n
lock.acquire()
n -= 1
lock.release()
n = 10
f_lst = []
lock = Lock()
for i in range(10):
t = Thread(target=func, args=(lock, ))
t.start()
t_lst.append(t)
for t in t_lst:
t.join()
print(n)
注意,以上代码并不能确保最后一定能输出 0,因为假设有一个线程取到了数据,在准备向回赋值的时候,突然时间片轮转到了,导致这个线程暂停,另一个线程取到了数据。但是第一个线程还没有写数据,所以数据会污染。解决的办法是加锁即可。
# 再来看一个哲学家吃饭问题
from threading import Thread, Lock
first_lock = Lock()
second_lock = Lock()
def eat1(name):
first_lock.acquire()
print('%s拿到第一个筷子啦'%name)
second_lock .acquire()
print('%s拿到第二个筷子了'%name)
print('%s吃面'%name)
second_lock .release()
first_lock.release()
def eat2(name):
second_lock.acquire()
print('%s拿到第二个筷子啦'%name)
time.sleep(1)
first_lock.acquire()
print('%s拿到第一个筷子了'%name)
print('%s吃面'%name)
first_lock.release()
second_lock .release()
Thread(target=eat1,args=('a',)).start()
Thread(target=eat2,args=('b',)).start()
Thread(target=eat1,args=('c',)).start()
Thread(target=eat2,args=('d',)).start()
以上代码会出现死锁问题。为了解决这个,可以使用递归锁:
from threading import Thread, RLock
first_lock = second_lock = RLock()
def eat1(name):
first_lock.acquire()
print('%s拿到第一个筷子啦'%name)
second_lock .acquire()
print('%s拿到第二个筷子了'%name)
print('%s吃面'%name)
second_lock .release()
first_lock.release()
def eat2(name):
second_lock.acquire()
print('%s拿到第二个筷子啦'%name)
time.sleep(1)
first_lock.acquire()
print('%s拿到第一个筷子了'%name)
print('%s吃面'%name)
first_lock.release()
second_lock .release()
Thread(target=eat1,args=('a',)).start()
Thread(target=eat2,args=('b',)).start()
Thread(target=eat1,args=('c',)).start()
Thread(target=eat2,args=('d',)).start()
这样就不会发生死锁问题了。因为 RLock 内部维护着一个 Lock 和一个 counter 变量,counter 记录了 acquire 的次数,从而使得资源可以被多次 require 。直到一个线程所有的 acquire 都被 release,其他的线程才能获得资源。在同一个线程中使用两把以上的锁时会出现死锁问题,可以使用递归。
- 事件和信号量
import time
from threading import Semaphore, Thread
def func(sem, a, b):
sem.acquire()
time.sleep(1)
print(a+b)
sem.release()
sem = Semaphore(4)
for i in range(10):
t = Thread(target=func, args=(sem, i, 2*i))
t.start()
同一时间只能有 n 个线程访问那段代码。
- 事件
e = Event()
,e.is_set()
判断事件是否为真,e.set()
设置事件为真,e.clear()
清除事件状态,保留为真
# 第一个线程 : 连接数据库
# 等待一个信号 告诉我我们之间的网络是通的
# 连接数据库
# 第二个线程 : 检测与数据库之间的网络是否连通
# time.sleep(0,2) 2
# 将事件的状态设置为True
import time
import random
from threading import Thread,Event
def connect_db(e):
count = 0
while count < 3:
e.wait(0.5) # 状态为False的时候,只等待 0.5s 就结束
if e.is_set() == True:
print('连接数据库')
break
else:
count += 1
print('第%s次连接失败'%count)
else:
raise TimeoutError('数据库连接超时')
def check_web(e):
time.sleep(random.randint(0,3))
e.set()
e = Event()
t1 = Thread(target=connect_db,args=(e,))
t2 = Thread(target=check_web,args=(e,))
t1.start()
t2.start()
- 条件锁
一个条件被创建之初,默认有一个 False 状态。False 状态会影响 wait 一直处于等待状态 notify(int n) 制造 n 个锁。但是线程消耗完锁之后并不归还锁。
con = Condition()
conn.acquire()
conn.notify(n)
conn.release()
# 在线程中
conn.acquire()
conn.wait()
conn.release()
# 要进去的时候需要先获得钥匙,进去之后判断钥匙时候得到
- 定时器
from threading import Timer
def func():
print('定时器')
Timer(2, func).start()
# 过两秒过后会打印 定时器
- 队列
主要是为了线程间的数据安全问题。内置了锁,保证线程安全
# Python 3 中的
import queue
# Python 2 中的
import Queue
q = queue.Queue() # 先进先出
q.get()
q.put()
q.put_nowait() # 没有阻塞
q.get_nowait() # 没有阻塞
q = queue.LifoQueue() # 栈 先进后出
q = queue.PriorityQueue() # 优先级队列
q.put((10, 'a'))
q.put((20, 'b')) # 越小优先级越高,负数也可以
# 优先级一样,看第二项
线程池
concurrent.futures
这是 Python 3 提供的。里面提供了线程池和进程池,接口完全一样,只是本质上不同。
import time
from concurrent.futures import ThreadPoolExecutor
def func(n)
time.sleep(2)
print(n) --> 打印的时候不是固定的,因为线程开启的事件不同
return n*n
tpool = ThreadPoolExecutor(max_workers=n) #默认不要超过 n = CPU 个数 * 5
tlst = []
for i in range(20):
t = tpool.submit(func, 1) --> 异步的
tlist.append(t)
# tpool .shutdown() --> 完成 close + join 的工作
print('over')
for i in tlist:
print(t.result()) --> 获取返回值 一定按照顺序打印,因为 tlist 里面顺序是固定的
# 如果使用 map
tpool.map(func, range(20)) # 拿不到返回值
另外它也有 回调函数的用法
tpool.submit(func, 1).add_done_callback(call_back)
线程池的缺点:
任务队列是无界的,所以需要控制好。如果队列的生产者任务生产的太快,而线程池消费太慢处理不过来,任务就会堆积。如果堆积一直持续下去,内存就会持续增长直到OOM,任务队列里堆积的所有任务全部彻底丢失。
另外如果要查看成功的任务:
from concurrent.futures import ThreadPoolExecutor, as_completed
executor = ThreadPoolExecutor(max_workers=2)
urls = []
all_task = [executor.submit(function, (url)) for url in urls]
for future in as_completed(all_task):
data = future.result()
print('success')
# 顺序不固定,谁先完成就处理谁
# 还有一种方法,只不过这种方法是严格按照同步顺序来的
for data in executor.map(function, urls):
print(data)
顺序严格按照 urls 的顺序来的
另外还有一个 wait
:
from concurrent.futures import wait
wait(all_task, return_when=)
这里 return_when 有第一个完成后,所有完成后等。
阻塞某一个 task 或者一些 task 完成
网友评论