前言
我是偏后台开发的coder,学到python的这里时尤其的关注。操作系统的相关接口在python是不是比linux C中要简洁的多。OS的概念不说了,这次笔记集中关注python中多进程、多线程、高并发、加锁同步、进程间通信等实现。
Definition
进程(process),在我的理解中,就是一个任务,是一段运行的程序。后台的童鞋应该知道其本质就是一个task_struct结构体,里面记载着程序运行需要的所有资源和他自身的信息。当他获得运行所需的内存、CPU资源等,也就是成为了一个running状态的进程。
可以把进程理解为一个任务,那线程就是完成这个任务的执行流。线程是CPU调度的最小粒度。通常来说,现在的项目中,至少我接触的,一个进程中都包括着不止一个的线程。毕竟现在的OS都是SMP的,充分利用多核心提高程序效率应该是每个coder敲键盘时需要优先考虑的。
多进程
linux的内核向外提供了 fork()
这个系统调用来创建一个本进程的拷贝,当然往往fork()后都跟着 exec()
族系统调用,我们创建一个进程一般都是为了执行其他的代码程序。
python的 os 模块封装了很多常用的系统调用,可以说是python中最常用的一个库了。举个栗子:
import os
print('Process (%s) start...' % os.getpid())
pid = os.fork()
if pid == 0:
print('Child process (%s).' % os.getpid())
else:
print('Parent process (%s).' % pid)
fork()
会返回两个结果,父进程返回一个大于0的无符号数,子进程返回0。
我们都知道socket()是有好几个步骤的,而对于web服务器,每天每时每分都有着成千上万的访问请求。如果是一个进程向外提供服务,那就是这个进程为第一个用户从创建socket到关闭,再为下一个用户提供服务。用户时排着队接受服务的,显然不符合逻辑。
拿Apache举个栗子,它是多进程架构服务器的代表。
- 运行主程序,只负责server端socket的
listen()
和accept()
,当然主进程是一个守护进程 - 每当一个用户请求服务,就会调用
fork()
,在子程序中接受数据,read()
或者write()
,然后提供服务直至关闭 - 主进程还是要负责回收结束的子进程资源的
伪代码如下:
import os
server_fd = socket()
bind(server_fd,ip,port)
listen(server_fd,MAX_PROCESS)
While Online:
connfd = accpet(server_fd)
for each connfd:
os.fork()
// TODO
close(server_fd)
上面这段程序只适用linux平台,windows平台创建进程的方式并不是 fork()
调用。python中提供了multiprocesssing
模块来兼容windows,比起fork()
,代码的语义更好理解一些
from multiprocessing import Process
import os
def run_proc(name):
print('Child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
#创建Process实例
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
这里的join
语义和linux平台的多线程中的join语义很像,但效果其实是linux平台的wait
有时候需要进程池,multiprocessing
也直接提供了pool
用于创建。
pool.apply(func,params) 是单进程阻塞模式
pool.apply_async(func,params,callback) 是多进程异步模式
pool.map(func,iter) 用于可迭代结构,阻塞式调用
pool.map_async(func,iter,callback)
一般情况下,还是把进程数控制成和CPU核数相同。pool结束调用pool.join()
回收进程资源时,需要先pool.close()
上面提到过,创建一个新进程的原因往往是为了加载新的代码,去执行新的任务。所以python封装了fork()
和之后的exec族,提供subprocess
模块,直接操作新的子进程。这个包,一般是用来执行外部的命令或者程序如shell命令,和os.system()
类似。
import subprocess
r = subprocess.call(['ls','-l']) #阻塞
r = subprocess.call('ls -l',shell = True)
r = subprocess.check_call(['ls','-l']) #returncode不为0则raise CalledProcessError异常
r = subprocess.check_output('ls -l',shell=True)
r = subprocess.Popen(['ls','-l']) #非阻塞,需主动wait
r = subprocess.Popen(['ls','-l'],stdin=child1.stdout,stdout=subprocess.PIPE, stderr=subprocess.PIPE) #设置标准输入输出出错的句柄
out,err = r.communicate() #继续输入,或者用来获得返回的元组(stdoutdata,stderrdata)
手动继续输入的例子:
import subprocess
print('$ python')
p = subprocess.Popen(['python'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b"print('Hello,world')")
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
进程间通信
用multiprocessing
的Queue
或者Pipe
来帮助实现,类似linux中的Pipe
,打开一条管道,一个进程往里面扔数据,一个从另一头捡数据。python中的Pipe是全双工管道,既可以读也可以写。可以通过Pipe(duplex=False)
创建半双工管道。
from multiprocessing import Pipe,Queue
#实例
q = Queue()
p = Pipe()
#写入数据
q.put(value)
p[0].send(value)
#读数据
q.get()
p[1].recv()
分别举个例子,用Queue
from multiprocessing import Process, Queue
import os, time, random
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A','B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
time.sleep(random.random())
print('Get %s from queue.' % value)
if __name__=='__main__':
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join()
pr.terminate()
用Pipe:
from multiprocessing import Process, Pipe
import os, time, random
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A','B', 'C']:
print('Put %s to pipe...' % value)
q.send(value)
time.sleep(random.random())
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.recv()
time.sleep(random.random())
print('Get %s from pipe.' % value)
if __name__=='__main__':
p = Pipe()
pw = Process(target=write, args=(p[0],))
pr = Process(target=read, args=(p[1],))
pw.start()
pr.start()
pw.join()
time.sleep(2)
pr.terminate()
多线程
有人会有疑问,问什么要在进程中开多个线程,多创建几个进程一起干活不就行了。其实这样是可以的,只不过进程这个单位有点大,比较占用资源,创建的时候开销比较大(尤其在windows系统下),进程多了CPU调度起来,在进程间切换也是非常耗时的。还有多任务协同合作时,需要数据交换,进程间通信也是开销,而一个进程中的线程是共享进程的内存空间的,可以直接交互。所以现在多线程的程序更加常见。
不过多线程也是有弊端的,协同合作的多线程,有一个挂了,会影响到所有的其他线程,也就代表这个任务是做不下去了。进程因为有着独立的地址空间,所以一个进程死了对其他进程的影响可以说很小。
python中提供了threading模块为多线程服务,threading.current_thread()
返回当前线程,主线程名为MainThread
import threading
thread = threading.Thread(target=func,args=())
thread.start()
thread.join()
多线程编程,最重要的就是同步和互斥,也就是各种锁的用法。为什么要用锁,后台的童鞋应该都懂,现在的SMP操作系统都是抢占式内核,也就是即使你不同的核共同工作时,很幸运的没有改乱一个共享变量,当然这就不可能了。当你的CPU时间片到时间了,或者需要内存或者IO资源,你被踢出了CPU的工作队列,你必须得在走的时候给你的资源把锁加上,下次再来接着做。线程同步的重点的是对共享资源的判断,和选择合适的锁。也就是对什么资源加锁和用什么锁。
不过在python中很遗憾,多线程存在着天生的缺陷,因为有着GIL的存在,这是python解释器的设计缺陷。导致python程序在被解释时,只能有一个线程。不过,对于IO密集型的程序,多线程的设计还是很有帮助的。比如爬虫
- 最常用的锁,类似 mutex
- 条件变量,
threading.Condition()
会包含一个Lock
对象,因为这两者一般都是配合使用的。 - 信号量,
threading.Semaphore()
import threading
lock = threading.Lock()
lock.acquire()
lock.realease() #配合try...finally保证最后释放掉锁,防止死锁
cond = threading.Condition()
cond.wait()
cond.notify() cond.notify_all()
sem = threading.Semaphore(NUM)
sem.acquire()
sem.realease()
event = threading.Event() #相当于没有lock的cond
event.set(True)
event.clear()
假设以下的情况
thread_func(params):
web_res = params
def func1(web_res):
http = web_res.http
TODO
def func2(web_res):
data = web_res.data
TODO
def func3(web_res):
user = web_res.user
TODO
在一个线程中,又存在多个子线程或者函数时,需要把一个参数都传给它们时。可以通过唯一的id来区分出从全局变量自己的局部变量时。可以用ThreadLocal
实现
import threading
student = threading.local()
def func(name):
person = student.name #需要之前关联过
p1 = threading.Thread(target=func,argc='A')
p1 = threading.Thread(target=func,argc='B')
通过ThredLocal免去了我们亲自去字典中存取。通常用于web开发中的为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等。
分布式进程
分布式是为了在横向上提升整个系统的负载能力。python中multiprocessing模块中的manage子模块支持把多进程分布到不同的机器上。当然肯定存在一个master进程来负责任务的调度。依赖manage子模块,可以很轻松的写出分布式程序。
比如爬虫,想要爬下豆瓣或者知乎这样网站的全部数据,用单机估计得花费好几年。可以把需要爬的网站的所有URL放在一个Queue中,master进程负责Queue的管理,可以将很多设备与master进程所在的设备建立联系,爬虫开始获取URL时,都从主机器获取。这样就能保证协同不冲突的合作。
网友评论