python - OS相关

作者: Allenware | 来源:发表于2017-04-23 16:07 被阅读78次

    前言

    我是偏后台开发的coder,学到python的这里时尤其的关注。操作系统的相关接口在python是不是比linux C中要简洁的多。OS的概念不说了,这次笔记集中关注python中多进程、多线程、高并发、加锁同步、进程间通信等实现。

    Definition

    进程(process),在我的理解中,就是一个任务,是一段运行的程序。后台的童鞋应该知道其本质就是一个task_struct结构体,里面记载着程序运行需要的所有资源和他自身的信息。当他获得运行所需的内存、CPU资源等,也就是成为了一个running状态的进程。

    可以把进程理解为一个任务,那线程就是完成这个任务的执行流。线程是CPU调度的最小粒度。通常来说,现在的项目中,至少我接触的,一个进程中都包括着不止一个的线程。毕竟现在的OS都是SMP的,充分利用多核心提高程序效率应该是每个coder敲键盘时需要优先考虑的。

    多进程

    linux的内核向外提供了 fork() 这个系统调用来创建一个本进程的拷贝,当然往往fork()后都跟着 exec() 族系统调用,我们创建一个进程一般都是为了执行其他的代码程序。

    python的 os 模块封装了很多常用的系统调用,可以说是python中最常用的一个库了。举个栗子:

    import os
    
    print('Process (%s) start...' % os.getpid())
    
    pid = os.fork()
    if pid == 0:
        print('Child process (%s).' % os.getpid())
    else:
        print('Parent process (%s).' %  pid)
    

    fork() 会返回两个结果,父进程返回一个大于0的无符号数,子进程返回0。

    我们都知道socket()是有好几个步骤的,而对于web服务器,每天每时每分都有着成千上万的访问请求。如果是一个进程向外提供服务,那就是这个进程为第一个用户从创建socket到关闭,再为下一个用户提供服务。用户时排着队接受服务的,显然不符合逻辑。

    拿Apache举个栗子,它是多进程架构服务器的代表。

    1. 运行主程序,只负责server端socket的listen()accept(),当然主进程是一个守护进程
    2. 每当一个用户请求服务,就会调用fork(),在子程序中接受数据,read()或者write(),然后提供服务直至关闭
    3. 主进程还是要负责回收结束的子进程资源的

    伪代码如下:

    import os
    
    server_fd = socket()
    bind(server_fd,ip,port)
    listen(server_fd,MAX_PROCESS)
    While Online:
        connfd = accpet(server_fd)
        for each connfd:
            os.fork()
            // TODO
    close(server_fd)
    

    上面这段程序只适用linux平台,windows平台创建进程的方式并不是 fork() 调用。python中提供了multiprocesssing模块来兼容windows,比起fork(),代码的语义更好理解一些

    from multiprocessing import Process
    import os
    
    def run_proc(name):
        print('Child process %s (%s)...' % (name, os.getpid()))
    
    if __name__=='__main__':
        print('Parent process %s.' % os.getpid())
        #创建Process实例
        p = Process(target=run_proc, args=('test',))
        print('Child process will start.')
        p.start()
        p.join()
        print('Child process end.')
    

    这里的join语义和linux平台的多线程中的join语义很像,但效果其实是linux平台的wait

    有时候需要进程池,multiprocessing 也直接提供了pool用于创建。

    pool.apply(func,params) 是单进程阻塞模式
    pool.apply_async(func,params,callback)  是多进程异步模式
    pool.map(func,iter) 用于可迭代结构,阻塞式调用
    pool.map_async(func,iter,callback)
    

    一般情况下,还是把进程数控制成和CPU核数相同。pool结束调用pool.join()回收进程资源时,需要先pool.close()

    上面提到过,创建一个新进程的原因往往是为了加载新的代码,去执行新的任务。所以python封装了fork()和之后的exec族,提供subprocess模块,直接操作新的子进程。这个包,一般是用来执行外部的命令或者程序如shell命令,和os.system()类似。

    import subprocess
    
    r = subprocess.call(['ls','-l'])    #阻塞
    r = subprocess.call('ls -l',shell = True)
    r = subprocess.check_call(['ls','-l'])  #returncode不为0则raise CalledProcessError异常
    r = subprocess.check_output('ls -l',shell=True)
    r = subprocess.Popen(['ls','-l'])   #非阻塞,需主动wait
    
    r = subprocess.Popen(['ls','-l'],stdin=child1.stdout,stdout=subprocess.PIPE, stderr=subprocess.PIPE)    #设置标准输入输出出错的句柄
    out,err = r.communicate()   #继续输入,或者用来获得返回的元组(stdoutdata,stderrdata)
    

    手动继续输入的例子:

    import subprocess
    
    print('$ python')
    p = subprocess.Popen(['python'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    output, err = p.communicate(b"print('Hello,world')")
    print(output.decode('utf-8'))
    print('Exit code:', p.returncode)
    

    进程间通信

    multiprocessingQueue或者Pipe来帮助实现,类似linux中的Pipe,打开一条管道,一个进程往里面扔数据,一个从另一头捡数据。python中的Pipe是全双工管道,既可以读也可以写。可以通过Pipe(duplex=False)创建半双工管道。

    from multiprocessing import Pipe,Queue
    #实例
    q = Queue()
    p = Pipe()
    #写入数据
    q.put(value)
    p[0].send(value)
    #读数据
    q.get()
    p[1].recv()
    

    分别举个例子,用Queue

    from multiprocessing import Process, Queue
    import os, time, random
    
    def write(q):
        print('Process to write: %s' % os.getpid())
        for value in ['A','B', 'C']:
            print('Put %s to queue...' % value)
            q.put(value)
            time.sleep(random.random())
    
    def read(q):
        print('Process to read: %s' % os.getpid())
        while True:
            value = q.get(True)
            time.sleep(random.random())
            print('Get %s from queue.' % value)
    
    if __name__=='__main__':
        q = Queue()
        pw = Process(target=write, args=(q,))
        pr = Process(target=read, args=(q,))
    
        pw.start()
        pr.start()
        pw.join()
        pr.terminate()
    

    用Pipe:

    from multiprocessing import Process, Pipe
    import os, time, random
    
    def write(q):
        print('Process to write: %s' % os.getpid())
        for value in ['A','B', 'C']:
            print('Put %s to pipe...' % value)
            q.send(value)
            time.sleep(random.random())
    
    def read(q):
        print('Process to read: %s' % os.getpid())
        while True:
            value = q.recv()
            time.sleep(random.random())
            print('Get %s from pipe.' % value)
    
    if __name__=='__main__':
        p = Pipe()
        pw = Process(target=write, args=(p[0],))
        pr = Process(target=read, args=(p[1],))
    
        pw.start()
        pr.start()
        pw.join()
        time.sleep(2)
        pr.terminate()
    

    多线程

    有人会有疑问,问什么要在进程中开多个线程,多创建几个进程一起干活不就行了。其实这样是可以的,只不过进程这个单位有点大,比较占用资源,创建的时候开销比较大(尤其在windows系统下),进程多了CPU调度起来,在进程间切换也是非常耗时的。还有多任务协同合作时,需要数据交换,进程间通信也是开销,而一个进程中的线程是共享进程的内存空间的,可以直接交互。所以现在多线程的程序更加常见。

    不过多线程也是有弊端的,协同合作的多线程,有一个挂了,会影响到所有的其他线程,也就代表这个任务是做不下去了。进程因为有着独立的地址空间,所以一个进程死了对其他进程的影响可以说很小。

    python中提供了threading模块为多线程服务,threading.current_thread()返回当前线程,主线程名为MainThread

    import threading
    
    thread = threading.Thread(target=func,args=())
    thread.start()
    thread.join()
    

    多线程编程,最重要的就是同步和互斥,也就是各种锁的用法。为什么要用锁,后台的童鞋应该都懂,现在的SMP操作系统都是抢占式内核,也就是即使你不同的核共同工作时,很幸运的没有改乱一个共享变量,当然这就不可能了。当你的CPU时间片到时间了,或者需要内存或者IO资源,你被踢出了CPU的工作队列,你必须得在走的时候给你的资源把锁加上,下次再来接着做。线程同步的重点的是对共享资源的判断,和选择合适的锁。也就是对什么资源加锁和用什么锁。

    不过在python中很遗憾,多线程存在着天生的缺陷,因为有着GIL的存在,这是python解释器的设计缺陷。导致python程序在被解释时,只能有一个线程。不过,对于IO密集型的程序,多线程的设计还是很有帮助的。比如爬虫

    • 最常用的锁,类似 mutex
    • 条件变量,threading.Condition()会包含一个Lock对象,因为这两者一般都是配合使用的。
    • 信号量,threading.Semaphore()
    import threading
    
    lock = threading.Lock()
    lock.acquire()
    lock.realease()  #配合try...finally保证最后释放掉锁,防止死锁
    
    cond = threading.Condition()
    cond.wait()
    cond.notify()   cond.notify_all()
    
    sem = threading.Semaphore(NUM)
    sem.acquire()
    sem.realease()
    
    event = threading.Event()   #相当于没有lock的cond
    event.set(True)
    event.clear()
    

    假设以下的情况

    thread_func(params):
        web_res = params
        def func1(web_res):
            http = web_res.http
            TODO
        def func2(web_res):
            data = web_res.data
            TODO
        def func3(web_res):
            user = web_res.user
            TODO
    

    在一个线程中,又存在多个子线程或者函数时,需要把一个参数都传给它们时。可以通过唯一的id来区分出从全局变量自己的局部变量时。可以用ThreadLocal实现

    import threading 
    
    student = threading.local()
    
    def func(name):
        person = student.name  #需要之前关联过
    
    p1 = threading.Thread(target=func,argc='A')
    p1 = threading.Thread(target=func,argc='B')
    

    通过ThredLocal免去了我们亲自去字典中存取。通常用于web开发中的为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等。

    分布式进程

    分布式是为了在横向上提升整个系统的负载能力。python中multiprocessing模块中的manage子模块支持把多进程分布到不同的机器上。当然肯定存在一个master进程来负责任务的调度。依赖manage子模块,可以很轻松的写出分布式程序。

    比如爬虫,想要爬下豆瓣或者知乎这样网站的全部数据,用单机估计得花费好几年。可以把需要爬的网站的所有URL放在一个Queue中,master进程负责Queue的管理,可以将很多设备与master进程所在的设备建立联系,爬虫开始获取URL时,都从主机器获取。这样就能保证协同不冲突的合作。

    相关文章

      网友评论

        本文标题:python - OS相关

        本文链接:https://www.haomeiwen.com/subject/doryzttx.html