进程
进程
- 进程是程序执行的过程,包括了动态创建、调度和消亡的整个过程,进程是程序资源管理的最小单位。
- 进程管理的资源包括:CPU(寄存器),IO, 内存,网络资源等
进程地址空间
- 当创建一个进程时,操作系统会为该进程分配一个 4GB 大小的虚拟进程地址空间。
- 操作系统采用虚拟内存技术,把进程虚拟地址空间划分成用户空间和内核空间。每个进程的用户地址空间都是独立的,一般而言是不能互相访问的,但内核空间是每个进程都共享的,所以进程之间要通信必须通过内核。
- 用户空间按照访问属性一致的地址空间存放在一起的原则,划分成 5个不同的内存区域,总计 3G 的容量:
内存区域 | 存储内容 |
---|---|
栈 | 局部变量,函数参数,函数的返回值 |
堆 | 动态分配的内存 |
BSS段 | 未初始化或初值为0的全局变量和静态局部变量 |
数据段 | 已初始化且初值非0的全局变量和静态局部变量 |
代码段 | 可执行文件的操作指令,可执行程序在内存中的镜像(只读) |
- 在 x86 32 位系统里,Linux 内核地址空间是指虚拟地址从 0xC0000000 开始到 0xFFFFFFFF 为止的高端内存地址空间,总计 1G 的容量, 包括了内核镜像、物理页面表、驱动程序等运行在内核空间 。
进程间通信
- 当一个进程创建时可以共享程序代码的页,但它们各自有独立的数据拷贝(堆和栈),因此子进程对内存单元的修改对父进程是不可见的。
- 进程间的通信方式包括管道、消息队列、共享内存、信号量、信号和Socket。
管道
-
所谓的管道,就是内核里面的一串缓存。从管道的一端写入的数据,实际上是缓存在内核中的,另一端读取,也就是从内核中读取这段数据。另外,管道传输的数据是无格式的流且大小受限。匿名管道的通信的方式是单向的,如果要双向通信,需要创建两个管道。管道分为匿名管道和命名管道
-
匿名管道:没有名字标识,是只存在于内存的特殊文件,不存在于文件系统中。匿名管道只能用于存在父子关系的进程间通信。(创建的子进程会复制父进程的文件描述符,这样就做到了两个进程各有两个「 fd[0] 与 fd[1]」,两个进程就可以通过各自的 fd 写入和读取同一个管道文件实现跨进程通信了)它的生命周期随着进程创建而建立,随着进程终止而消失。shell 命令中的 “|”
-
命名管道:需要在文件系统创建一个类型为 p 的设备文件,不相干的进程从而可以通过这个设备文件进行通信。
-
管道这种通信方式效率低,不适合进程间频繁地交换数据
消息队列
- 消息队列是保存在内核中的消息链表,在发送数据时,会分成一个一个独立的数据单元,也就是消息体(数据块),消息体是用户自定义的数据类型,消息的发送方和接收方要约定好消息体的数据类型,所以每个消息体都是固定大小的存储块,不像管道是无格式的字节流数据。如果进程从消息队列中读取了消息体,内核就会把这个消息体删除。
- 消息队列不适合比较大数据的传输,因为在内核中每个消息体都有一个最大长度的限制,同时所有队列所包含的全部消息体的总长度也是有上限。
- 消息队列通信过程中,存在用户态与内核态之间的数据拷贝开销。因为进程写入数据到内核中的消息队列时,会发生从用户态拷贝数据到内核态的过程,同理另一进程读取内核中的消息数据时,会发生从内核态拷贝数据到用户态的过程。
共享内存
-
现代操作系统,对于内存管理,采用的是虚拟内存技术,也就是每个进程都有自己独立的虚拟内存空间,不同进程的虚拟内存映射到不同的物理内存中。所以,即使进程 A 和 进程 B 的虚拟地址是一样的,其实访问的是不同的物理内存地址,对于数据的增删查改互不影响。
-
共享内存的机制,就是拿出一块虚拟地址空间来,映射到相同的物理内存中。这样这个进程写入的东西,另外一个进程马上就能看到了,都不需要拷贝来拷贝去,传来传去,大大提高了进程间通信的速度。
信号量
- 用了共享内存通信方式,带来新的问题,那就是如果多个进程同时修改同一个共享内存,很有可能就冲突了。为了防止多进程竞争共享资源,而造成的数据错乱,信号量使得共享的资源,在任意时刻只能被一个进程访问。
- 信号量是一个整型的计数器,表示的是资源个数,其值可以通过两个原子操作来控制,分别是 P 操作和 V 操作。主要用于实现进程间的互斥(初始化信号量为1)与同步(初始化信号量为0),而不是用于缓存进程间通信的数据。
信号
- 在 Linux 操作系统中, 为了响应各种各样的事件,提供了几十种信号,分别代表不同的意义。我们可以通过 kill -l 命令,查看所有的信号。
- 运行在 shell 终端的进程,我们可以通过键盘输入某些组合键的时候,给进程发送信号。例如:
Ctrl+C 产生 SIGINT 信号,表示终止该进程;
Ctrl+Z 产生 SIGTSTP 信号,表示停止该进程,但还未结束;
kill -9 1050 ,表示给 PID 为 1050 的进程发送 SIGKILL 信号,用来立即结束该进程
Socket
- 前面提到的管道、消息队列、共享内存、信号量和信号都是在同一台主机上进行进程间通信,那要想跨网络与不同主机上的进程之间通信,就需要 Socket 通信了。。
Python实现
- multiprocessing
from multiprocessing import Process
import os, time
# 子进程要执行的代码
def run_proc(name):
time.sleep(2)
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
- join所完成的工作就是阻塞当前进程,直到调用join方法的那个进程执行完,再继续执行当前进程
- multiprocessing.Pool 进程池
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(2)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(2)
for i in range(3):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
Parent process 5584.
Waiting for all subprocesses done...
Run task 0 (12836)...
Run task 1 (5916)...
Task 0 runs 2.00 seconds.
Run task 2 (12836)...
Task 1 runs 2.00 seconds.
Task 2 runs 2.00 seconds.
All subprocesses done.
- Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程。
- 进程间通信
-
基于共享内存:进程之间默认是不能共享全局变量的(子进程不能改变主进程中全局变量的值)。如果要共享全局变量需要用(multiprocessing.Value("d",10.0),数值)(multiprocessing.Array("i",[1,2,3,4,5]),数组)(multiprocessing.Manager().dict(),字典)(multiprocessing.Manager().list(),列表)。
-
基于管道:包括 multiprocessing.Pipe(),multiprocessing.Queue()。
from multiprocessing import Process, Queue
import os, time
# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue.' % value)
q.put(value)
time.sleep(2)
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()
Process to read: 9504
Process to write: 10672
Put A to queue.
Get A from queue.
Put B to queue.
Get B from queue.
Put C to queue.
Get C from queue.
- 一个子进程向Queue中写数据,另外一个进程从Queue中取数据,当一个Queue为空的用get取数据会进程会被阻塞。
线程
线程
- 线程是操作操作系统能够进行运算调度的最小单位。线程被包含在进程之中,是进程中的实际运作单位,一个进程内可以包含多个线程,线程是资源调度的最小单位。
线程的内存模型
- 每个线程独立的线程上下文:一个唯一的整数线程ID,栈和栈指针,程序计数器,通用目的寄存器和条件码。
- 和其他线程共享的进程上下文的剩余部分:整个用户虚拟地址空间,包括只读代码段,读/写数据段,堆以及所有的共享库代码和数据区域,也共享所有打开文件的集合。
线程的状态
- 线程的状态分为:
- 可运行 (runnable):线程被创建之后,调用Start()函数就到了这个状态。
- 运行 (running):start()函数之后,CPU切换到了这个线程开始执行里面的Run方法就称为运行状态。
- 阻塞 (blocked):阻塞状态是指线程因为某种原因放弃了cpu执行权,暂时停止运行。直到线程进入可运行(runnable)状态,才有机会再次获得cpu 执行权 转到运行(running)状态。
- 根据程序分别处于 Running 以及 IO Blocked 两种状态的时间占比,可分为两种:
- 计算密集型 ,程序执行时大部分时间处于 Running 状态;
- IO密集型 ,程序执行时大部分时间处于 IO Blocked 状态;
线程的调度
- 线程分类:
- 内核级线程:
内核线程建立和销毁都是由操作系统负责、通过系统调用完成,内核维护进程及线程的上下文信息以及线程切换。
程序一般不会直接使用内核线程,而是使用内核线程的一种高级接口-轻量级进程(Light-weight Process简称LWP ,是一种由内核支持的用户线程,每一个轻量级进程都与一个特定的内核线程关联。)。
优势:内核级线级能参与全局的多核处理器资源分配,充分利用多核 CPU 优势。
局限性:需要系统调度,系统调度代价大,需要在用户态和内核态来回切换;内核进程数量有限。
- 用户级线程
用户线程的创建、调度、同步和销毁全由用户空间的库函数完成,不需要内核的参与。内核的调度对象是进程本身,内核并不知道用户线程的存在。
优势:性能极强。用户线程的建立,同步,销毁和调度完全在用户态中完成,操作非常快,低消耗,无需切换内核态。
局限性:所有操作都需要自己实现,逻辑极其复杂。 用户线级线程只能参与竞争该进程的处理器资源,不能参与全局处理器资源的竞争。
- 线程的调度最常用的两种是:
-
分时调度
所有线程轮流使用 CPU 的使用权,平均分配每个线程占用 CPU 的时间。 -
抢占式调度
优先让优先级高的线程使用 CPU,如果线程的优先级相同,那么会随机选择一个(线程随机性)。
- Python 线程调度实现方式参考了分时调度的思路,只不过将时间片换成字节码 。当一个线程取得 GIL 全局锁并开始执行字节码时,对已执行字节码进行计数。当执行字节码达到一定数量,或者线程主动让出控制(time.sleep(), wait(), blocked IO等)时,线程主动释放 GIL 全局锁。
Python实现
- 多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响。而多线程中,静态变量,实例变量被线程共享,局部变量不被线程共享:
from threading import Thread
num = 0
def run_thread(name):
global num
num = num + 1
print('{} num: {}'.format(name, num))
if __name__=='__main__':
p1 = Thread(target=run_thread, args=('thread1',))
p2 = Thread(target=run_thread, args=('thread2',))
p1.start()
p2.start()
p1.join()
p2.join()
# thread1 num: 1
# thread2 num: 2
from multiprocessing import Process
num = 0
def run_proc(name):
global num
num = num + 1
print('{} num: {}'.format(name, num))
if __name__=='__main__':
p1 = Process(target=run_proc, args=('process1',))
p2 = Process(target=run_proc, args=('process2',))
p1.start()
p2.start()
p1.join()
p2.join()
# process1 num: 1
# process2 num: 1
- 线程之间共享数据最大的危险在于多个线程同时改一个变量:
from threading import Thread
num = 0
def change_num(name):
global num
num = num + 1
def run_thread(name):
for i in range(1000000):
change_num(name)
if __name__=='__main__':
p1 = Thread(target=run_thread, args=('thread1',))
p2 = Thread(target=run_thread, args=('thread2',))
p1.start()
p2.start()
p1.join()
p2.join()
print(num)
# 1851956
- num的输出值无法预料。原因是对于 num = num + 1 可以分为两步:1)计算num + n,存入该线程的临时变量中; 2)将临时变量的值赋给num。
- 由于线程之间是进行随机调度的,如果有多个线程同时操作一个对象,如果没有很好地保护该对象,会造成程序结果的不可预期,我们因此也称为“线程不安全”。
- 可以使用互斥锁(Lock),同一时刻只允许一个线程执行操作
from threading import Thread, Lock
num = 0
def change_num(name, lock):
global num
lock.acquire()
num = num + 1
lock.release()
def run_thread(name, lock):
for i in range(1000000):
change_num(name, lock)
if __name__=='__main__':
lock = Lock()
p1 = Thread(target=run_thread, args=('thread1', lock))
p2 = Thread(target=run_thread, args=('thread2', lock))
p1.start()
p2.start()
p1.join()
p2.join()
print(num)
# 2000000
- 由于python GIL全局锁,python多线程并不能提升计算密集型任务的效率。这种情况下可以考虑使用多进程。
from threading import Thread
import time
def change_num(name):
nums = [0] * 100
for num in nums:
for i in range(1000000):
num = num + 1
if __name__=='__main__':
p1 = Thread(target=change_num, args=('thread1',))
start = time.time()
p1.start()
p1.join()
end = time.time()
print('runs %0.2f seconds.' % (end - start))
# runs 21.23 seconds.
from threading import Thread
import time
def change_num(name):
nums = [0] * 100
for num in nums:
for i in range(1000000):
num = num + 1
if __name__=='__main__':
p1 = Thread(target=change_num, args=('thread1',))
p2 = Thread(target=change_num, args=('thread2',))
p3 = Thread(target=change_num, args=('thread2',))
start = time.time()
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
end = time.time()
print('runs %0.2f seconds.' % (end - start))
# runs 19.49 seconds.
协程
协程
- 协程,又称微线程。协程的作用,是在执行函数A时,可以随时中断,去执行函数B,然后中断继续执行函数A(可以自由切换)。但这一过程并不是函数调用(没有调用语句),这一整个过程看似像多线程,然而协程只有一个线程执行。
- 协程的调度完全由用户控制,协程拥有自己的寄存器上下文和栈,协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,直接操作用户空间栈,完全没有内核切换的开销。
- 协程适用于高并发IO密集型场景
Python实现
- python 2.5 中引入 yield/send 表达式用于实现协程
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('Consuming %s...' % n)
r = '200 OK'
def produce(c):
c.__next__()
n = 0
while n < 3:
n = n + 1
print('Producing %s...' % n)
r = c.send(n)
print('Consumer return: %s' % r)
c.close()
if __name__=='__main__':
c = consumer()
produce(c)
Producing 1...
Consuming 1...
Consumer return: 200 OK
Producing 2...
Consuming 2...
Consumer return: 200 OK
Producing 3...
Consuming 3...
Consumer return: 200 OK
- 如果一个函数中包含yield关键字,它就不是一个普通函数,而是一个生成器(generator)。调用函数就是创建了一个generator对象。
- 这个函数,在每次调用next()(python3.x中更名为next())的时候执行,遇到yield语句返回,再次执行时从上次返回的yield语句处继续执行。
- send() 和next() 一样,都能让生成器继续往下走一步(下次遇到yield停),但send()能传一个值,这个值作为yield表达式整体的结果
- 整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
- gevent是一个广泛使用的协程框架,它的一个特点是可以用同步的方法写异步应用,不需要写回调函数:
from gevent import monkey; monkey.patch_all()
import gevent
import time
def f(num):
print('start: ' + str(num))
time.sleep(3)
print('end: ' + str(num))
gevent.joinall([
gevent.spawn(f, 1),
gevent.spawn(f, 2),
gevent.spawn(f, 3),
])
start: 1
start: 2
start: 3
end: 1
end: 2
end: 3
-
我们之所以可以用阻塞的time.sleep()是因为gevent对其进行了monkey_patch,会将所有的socket patch 成非阻塞的,time.sleep()也就变成了gevent.sleep()。
-
gevent使用多路IO复用对文件描述符的事件监听,当处理一个socket链接时,就创建一个协程Greenlet去处理
-
当socket遇到阻塞的时候,比如等待数据的返回或者发送,此时gevent会为这个socket的fd在epoll上添加可读或者可写事件回调。然后,通过 get_hub().switch() 切换到主协程,去干其它事情。当该socket可读或者可写时,epoll会调用上述添加的回调函数,从而切换回socket的处理协程,从上次悬挂点接着往下执行。
-
gevent的monkey patch有个需要注意的地方,它不会patch c扩展模块中的socket。比如说MySQL连接,它仍然是阻塞的。
- python 3.5 之后还引入async/await。
1)
import asyncio
async def hw_async():
print('Hello world!')
loop = asyncio.get_event_loop()
loop.run_until_complete(hw_async())
loop.close()
- async 是明确将函数声明为协程的关键字。这样的函数执行时会返回一个协程对象。
- event loop是协程执行的控制点,如果希望执行协程,就需要用到它们。asyncio启动默认的event loop(asyncio.get_event_loop()), 调度并执行异步任务, 关闭event loop。loop.run_until_complete()这个函数是阻塞执行的, 直到所有的异步函数执行完毕。
2)
import asyncio
import datetime
from threading import currentThread
async def hw_async(num):
print(f"Hello world {num} begin at: {datetime.datetime.now().strftime('%S')},thread:{currentThread().name}")
await asyncio.sleep(3)
print(f"Hello world {num} end at: {datetime.datetime.now().strftime('%S')}")
loop = asyncio.get_event_loop()
tasks = [hw_async(1), hw_async(2),hw_async(3)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
Hello world 3 begin at: 53,thread:MainThread
Hello world 1 begin at: 53,thread:MainThread
Hello world 2 begin at: 53,thread:MainThread
Hello world 3 end at: 56
Hello world 1 end at: 56
Hello world 2 end at: 56
-
从输出可以看出,三个不同的协程函数都是在MainThread线程完成的。并且异步执行。
-
await语法只能出现在通过async修饰的函数中,否则会报SyntaxError错误。且await后面的对象需要是一个awaitable,有三种主要类型: 协程, 任务 和 Future。
-
await io操作,此时,当前协程就会被挂起,时间循环转而执行其他协程。但并不是说所有协程里的await都会导致当前携程的挂起,如果跟的是我们定义的携程,则会执行这个携程,如果是asyncio模块制作者定义的固有协程,比如模拟io操作的asyncio.sleep,以及io操作,比如网络io:asyncio.open_connection这些,才会挂起当前携程。
-
loop.run_until_complete(asyncio.wait(tasks))运行时,会首先将tasks列表里的Coroutines先转换为future
什么时候使用进程,线程和协程
- I/O密集型:适合协程
- 计算密集型:适合多进程 (但进程数量存在限制)
- I/O密集型 + 计算密集型:多进程 + 协程
网友评论