美文网首页
Python 进程学习

Python 进程学习

作者: vckah | 来源:发表于2018-07-10 16:14 被阅读0次

    Python 中多进程是由 multiprocessing 模块提供的

    import time
    from multiprocessing import Process
    
    def test(i):
        print("子进程开始")
        print(i)
        time.sleep(2)
        print("子进程结束")
    
    if __name__ == "__main__":
        p_ls = []
        for i in range(3):
            p  = Process(target=test, args=(i, ))
            p.start()
            p_ls.append(p)
        for i in range(3):
            p_ls[i].join(()
    

    还有一种方法,使用类:

    from multiprocessing import Process
    
    class MyProcess(Process):
        def __init__(self):
            super().__init__()
        def run():
            pass
    
    p = MyProcess()
    p.start()
    p.join()
    

    终止一个进程:p.terminate()
    判断一个进程是否存活: p.is_alive
    查看一个进程的 id:p.pid
    查看进程对象名称:p.name
    守护进程会随着主进程的代码执行完毕而结束
    p.start() 之前设置 p.daemon = True
    加锁。涉及到数据改动的时候,目的是保证数据安全。

    from multiprocessing import  Lock
    lock = Lock()
    lock.acquire()
    lock.release()
    

    进程同步控制 ---- 锁,信号量,事件

    • 信号量 相当于多个锁,可以规定获取锁个数的大小。其内部相当于加了一个计数器,acquire() 的时候计数器 -1,release() +1
    from multiprocessing import Process, Semaphore
    
    def test(i, sem):
        sem.acquire()
        pass
        sem.release()
    
    if __name__ =="__main__":
        sem = Semaphore(4)
        for i in range(4):
            p = Process(target=test, args=(i, sem,))
            p.start()
    
    • 事件
      一个信号可以使所有的进程都进入阻塞状态,也可以解除
      创建之初默认是阻塞的
    from multiprocessing import Event
    
    e = Event()
    e.is_set()      --> False
    e.set()         --> 将事件的状态设置为 True
    e.wait()        --> 依据 is_set 的值来决定是否阻塞,相当于监视 is_set 的状态,然后根据其状态来做某些事情
    e.clear()       --> 将事件的状态设置为 False 阻塞
    

    例如一个简单的红绿灯事件

    import time
    import random
    from multiprocessing import Process, Event
    
    def car(e, i):
            if not e.is_set():
                print('car %s 在等待'%(i) )
                e.wait()      --> 阻塞,直到得到事件状态变为 True
            print('car %s 通过'%i)
    
    def light(e):
        while True:
            if e.is_set():        --> 开始时是 False,所以绿灯亮
                e.clear()      
                print("红灯亮了")
            else:
                e.set()            --> 设置为 True,红灯亮
                print("绿灯亮了")
            time.sleep(2)
    
    if __name__ == "__main__":
        e = Event()
        traffic = Process(target=light, args=(e, ))
        traffic.start()
        for i in range(10):
            c = Process(target=cars, args=(e, i))
            c.start()
            time.sleep(random.randint())
    

    进程间通信 IPC ----- 队列和管道

    • 队列
    # 先进先出
    from multiprocessing import Queue
    q = Queue(5)
    q.put(1)
    q.get()
    q.full()        -->> 队列是否满了
    q.empty()    -->> 判断队列是否空
    q.get_nowait()    ->> 如果队列空了,那么它会报错
    

    消费者与生产者

    import time
    import random
    from multiprocessing import Process, Queue
    
    def producer(q):
        for i in range(10):
            time.sleep(random.randint(1, 2))
            food = "产品%s"%i
            print('生产者生产了一个东西')
            q.put(food)
    
    def consumer(q):
        while True:
            food = q.get()
            if food is None:
                break
            print(food)
            time.sleep(random.random())
    
    if __name__ == "__main__":
        q = Queue()
        p = Process(target=produce, args=(q,))
        p.start()
        c = Process(target=consume, args=(q, ))
        c.start()
        p.join()
        q.put(None)
    

    但是上面的代码有一个问题,就是 q.put(None) 的频繁,有几个消费者,那么就需要向队列中 put 几个值,这种情况可以使用 JoinableQueue

    from multiprocessing import JoinableQueue
    def producer(q):
        xxxx
        q.join()    -->>  阻塞,直到队列中的所有数据全被处理完毕
    def consumer(q):
         xxxx
        q.task_done()
    if __name__ == "__main__":
        q = JoinableQueue()
        c.daemon = True
    
    • 管道
    from multiprocessing import Pipe
    conn1, conn2 = Pipe()
    conn1.send('12345')
    conn2.recv()
    

    进程池

    主要是为了效率问题,开启单个进程会耗费很多资源。进程池就是先创建好多个进程,然后从进程池中取到进程,提高操作系统调度进程的利用率。一般 CPU 个数 + 1 是最大开启进程的数量。

    from multiprocessing import Pool
    
    def func(n):
        for i in range(10):
            n += 1
            print(n)
    
    if __name__ == "__main__":
        pool = Pool(5)
        pool.map(func, <可迭代对象>range(100))
        # map 是自带 close 和 join 的,最后一次返回结果
        res_lst = []
    # 还有一个 apply 方法
        for i in range(20):
            p.apply(func, args=(i, ))        --> 同步提交的方式
            res = p.apply_sync(func, args=(i, ))      --> 异步提交方式
            # res.get()     --> 阻塞等待结果
            # 如果需要不阻塞,那么可以这样做
            res_lst.append(res)
        # 最后可以得到结果
        for res in res_lst:
            print(res.get())
        # 与 apply_async 配合使用
        p.close()    --> 结束进程池接受任务
        p.join()     --> 感知进程池中的任务结束
    

    进程池实现 socket 服务端

    import socket
    from multiprocessing import Pool
    
    def func(conn):
        conn.send(b'hello')
        msg = conn.recv(1024).decode('utf-8')
        print(msg)
        con.close()
    
    if __name__ == "__main__":
         p = Pool(5)
        sk = socket.socket()
        sk,bind(('127.0.0.1', 8000))
        sk.listen()
        while True:
            conn, addr = sk.accept()
            p.apply_async(func, args=(conn, ))
        sk.close()
    

    另外进程池还有回掉函数,意思就是在 任务执行完毕后执行指定的回调函数,任务返回的数据作为回调函数的参数。回调函数的参数来源只有进程返回的数据。回调函数是在主进程中执行的

    def func1():
        pass
        return x
    def func2(x):
        pass
        return 
    
    if __name__ == "__main__":
        pool = Pool(5)
        p.apply_async(func, args=(), callback=func2)
    

    回调函数经常用于爬虫,因为网络延迟和下载很耗时。
    例如

    import requests
    from multiprocessing import Pool
    
    def get(url):
        res = requests.get(url)
        if res.status_code == 200:
            return url, res.content
    def call_back(args):
        url, content =  args
        print(url, len(content))
    
    if __name__ == "__main__":
        url_lst = []
        p = Pool(5)
        for url in url_lst:
            p.apply_async(get, args=(url,), callback=call_back) 
        p.close()
        p.join()
    

    相关文章

      网友评论

          本文标题:Python 进程学习

          本文链接:https://www.haomeiwen.com/subject/odyfpftx.html