美文网首页
进程间通信

进程间通信

作者: 遇明不散 | 来源:发表于2019-04-19 10:24 被阅读0次

    进程间通信

    • 进程空间相对独立,资源无法相互获取,此时在不同进程间通信需要专门方法
    • 进程间通信就是在不同的进程间进行数据的传输

    进程间通信方式

    文件

    不推荐,文件和磁盘交互慢,数据不安全

    管道
    • 在内存中开辟一个管道空间,生成管道操作对象,对多个进程可见,多个进程使用"同一个"管道对象进行操作即可实现通信
    • 管道允许在进程之间按先进先出的方式传送数据,是进程间通信的一种常见方式
    管道基本函数
    fd1,fd2 = Pipe(duplex = True)
    # 功能:创建一个管道
    # 参数:默认为True,表示双向管道;设置为False则为单向管道
    # 返回值:返回两个管道流对象,分别表示管道的两端
    #      如果是双向管道,都可以读写
    #      如果是单向管道,则第一个对象只能接受,第二个对象只能发送,即fd1只读,fd2只写
    
    fd.recv()
    # 功能:从管道读取信息
    # 返回值:读取到的内容
    # 如果管道为空则阻塞
    
    fd.send(data)
    # 功能:向管道发送内容
    # 参数:要发送的内容
    # 可以发送python数据类型,字符串、数字、列表等多种类型的数据
    # 一次recv()只能接受一次send()的内容
    
    管道实现示例
    # pipe.py
    from multiprocessing import Process,Pipe
    import os,time
    
    # 双向管道,fd1,fd2均可收发
    fd1,fd2 = Pipe()
    # 单向管道,fd1只能接收,fd2只能发送
    # fd1,fd2 = Piple(False)
    
    def fun(name):
        time.sleep(2)
        fd1.send('hello %s' % name)
    
        # 若为单向管道
        # fd2.send('hello %s' % name)
    
        # 发送其他数据类型
        # fd1.send([1,2,3,4,5])   
    
    jobs = []
    
    for i in range(5):
        p = Process(target = fun,args = (i,))
        jobs.append(p)
        p.start()
    
    for i in range(5):
        data = fd2.recv()
        # 若为单向管道
        # data = fd1.recv()
        print(data)
    
    for i in jobs:
        i.join()
    
    消息队列
    • 在内存中开辟一个队列数据结构模型,用来存放消息
    • 任何拥有队列对象的进程都可以进行消息的存放和取出
    • 取出顺序和存入顺序保持一致,即先进先出
    消息队列基本函数
    q = Queue(maxsize = 0)
    # 功能:创建消息队列
    # 参数:表示最多存放多少消息,默认表示根据内存分配存储
    # 返回: 队列对象
    
    q.put(data,[block,timeout])
    # 功能:向队列存储消息
    # 参数:data 要存的内容,可以使用数字、列表、字符串等
    #       block 默认队列满时会阻塞,设置为False则非阻塞
    #       timeout 超时时间
    
    data = q.get([block,timeout])
    # 功能:获取队列消息
    # 参数:block 默认队列满时会阻塞,设置为False则非阻塞
    #      timeout 超时时间
    # 返回值: 返回取出的内容
    
    # 判断队列是否为满
    q.full()   
    
    # 判断队列是否为空
    q.empty()  
    
    # 判断队列中消息数量
    q.qsize()  
    
    # 关闭队列
    q.close()  
    
    消息队列示例
    # queue1.py 基本函数使用
    from multiprocessing import Queue
    from time import sleep
    
    # 创建消息队列对象
    q = Queue(3)
    
    q.put(1)
    sleep(0.5)
    print(q.empty())  # False
    q.put(2)
    print(q.full())   # False
    q.put(3)
    print(q.qsize())  # 3
    # q.put(4,True,2) # 超时等待
    print(q.get())    # 1
    q.close()         # 关闭队列
    
    # queue2.py 进程间通信
    from multiprocessing import Queue,Process
    from time import sleep
    
    q = Queue()
    
    def fun1():
        sleep(1)
        q.put({'a':1,'b':2})
    
    def fun2():
        sleep(2)
        print('Get message {} from another process'.format(q.get()))
    
    p1 = Process(target = fun1)
    p2 = Process(target = fun2)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    q.close()
    
    共享内存
    • 在内存空开辟一块空间,对多个进程可见,进程可以写入输入,但是每次写入的内容会覆盖之前的内容
    • 由于没有对内存进行格式化的修饰,所以存取快效率高
    共享内存基本函数
    obj = Value(ctype,obj)
    # 功能:开辟共享内存空间
    # 参数:ctype  要存储的数据类型,C语言类型  
    #      obj  共享内存的初始化数据
    # 返回:共享内存对象
    
    obj = Array(ctype,obj)
    # 功能:开辟一个共享内存空间
    # 参数:ctype 要存储的数据类型,C语言类型   
    #      obj 初始化存入的内容,比如列表,字符串,要求列表中的数据类型相同
    #          如果是整数则表示开辟空间的个数
    # 返回值:返回共享内存对象,可以通过遍历获取每个元素的值
    #        如果存入的是字符串,obj.value 表示字符串的首地址
    
    共享内存实现实例
    # value.py
    from multiprocessing import Process,Value
    import time
    import random
    
    # 创建共享内存
    money = Value('i',2000)
    
    def deposite():
        for i in range(100):
            time.sleep(0.05)
            money.value += random.randint(1,200)
    
    def withdraw():
        for i in range(100):
            time.sleep(0.05)
            money.value -= random.randint(1,180)
    
    d = Process(target = deposite)
    w = Process(target = withdraw)
    d.start()
    w.start()
    d.join()
    w.join()
    
    print('Remaining:',money.value)
    
    # array.py
    from multiprocessing import Array,Process
    import time
    
    # 创建共享内存
    # share_memory = Array('i',[1,2,3,4,5])
    # share_memory = Array('u',['a','b','c'])
    # share_memory = Array('c',b'hello')
    
    # 创建共享内存,开辟5个整形空间
    share_memory = Array('i',5)
    
    def fun():
        for i in share_memory:
            print(i)
        # 修改共享内存中的值
        share_memory[1] = 1
    
    p = Process(target = fun)
    p.start()
    p.join()
    
    # 只有字符串的时候可以这样用
    # print(share_memory.value)
    
    管道、消息队列、共享内存比较
    比较 管道 消息队列 共享内存
    开辟空间 内存中 内存中 内存中
    读写方式 双向/单向 先进先出 覆盖之前内容
    效率 一般 一般 较快
    是否需要同步互斥 不需要 不需要 需要
    应用 多用于父子进程 广泛灵活 需要注意进行互斥操作
    信号
    • 一个进程向另一个进程发送一个信号来传递某种讯息,接受者根据接收到的信号进行相应的行为
    • 进程间通信中唯一种异步通信方法
    • kill -l 查看系统信号
    • kill -signame PID 给进程号PID的进程发送signame信号
    同步执行与异步执行
    • 同步执行:按照顺序逐句执行,一步完成再做下一步
    • 异步执行:在执行过程中利用内核记录延迟发生或者准备处理的事件,这样不影响应用层的持续执行,当事件发生时再由内核告知应用层处理
    常见信号
    • SIGHUP 连接断开
    • SIGINT Ctrl-C
    • SIGQUIT Ctrl-\
    • SIGTSTP Ctrl-Z
    • SIGKILL 终止一个进程
    • SIGSTOP 暂停一个进程
    • SIGALRM 时钟信号
    • SIGCHLD 子进程状态改变时给父进程发出
    信号基本函数
    import signal
    import os
    
    os.kill(pid,sig)
    # 功能:向指定的进程发送信号
    # 参数:pid 目标进程
    #      sig 要发送的信号
    
    signal.alarm(sec)
    # 功能:向自身发送时钟信号SIGALRM,终止进程运行,非阻塞
    # 参数:sec 时钟时间,表示多少秒后自身会收到这个信号
    # 进程中只能有一个时钟,第二个会覆盖第一个时间
    
    signal.pause()
    # 功能:阻塞等待接收一个信号
    
    信号处理函数
    import signal
    signal.signal(signum,handler)
    # 功能:处理信号
    # 参数:signum  要处理的信号
    #      handler 信号的处理方法 
    #        SIG_DFL  表示使用默认的方法处理
    #        SIG_IGN  表示忽略这个信号
    #        func     传入一个函数,表示用指定函数处理
    #          def func(sig,frame)
    #              sig:捕获到的信号
    #              frame:信号对象
    
    • signal函数也是一个异步处理信号函数
    • SIGKILLSIGSTOP不能被signal函数处理
    • 僵尸进程的信号处理方案,父进程中signal(SIGCHLD,SIG_IGN)
    信号实现示例
    # signal.py
    import signal
    import os
    from time import sleep
    
    def handler(sig,frame):
        if sig == signal.SIGALRM:
            print('Get a alarm signal')
        elif sig == signal.SIGINT:
            # print('Get a ctrl+c signal')
            os.kill(os.getpid(),signal.SIGKILL)
    
    signal.alarm(5)
    
    # 默认信号处理方式
    # signal.signal(signal.SIGALRM,signal.SIG_DFL)
    
    # 忽略信号
    # signal.signal(signal.SIGALRM,signal.SIG_IGN)
    
    # 信号处理函数处理
    signal.signal(signal.SIGALRM,handler)
    signal.signal(signal.SIGINT,handler)
    
    while True:
        sleep(1)
        print('Waiting for a signal ...')
    
    信号量

    给定一个数量,对多个进程可见,且多个进程都可以操作。进程通过对数量多少的判断执行各自的行为。

    信号量基本函数
    from multiprocessing import Semaphore
    
    sem = Semaphore(num)
    # 功能:创建信号量
    # 参数:信号量初始值
    # 返回:信号量对象
    
    # 获取信号量值
    sem.get_value() 
    
    # 将信号量减1,当信号量为0会阻塞
    sem.acquire() 
    
    # 将信号量加1
    sem.release() 
    
    信号量实现示例
    # sem.py
    from multiprocessing import Semaphore,Process
    import os
    from time import sleep
    
    sem = Semaphore(3)
    
    def fun():
        print('进程%d等待信号量'%os.getpid())
    
        # 消耗一个信号量
        sem.acquire()
        print('进程%d消耗信号量'%os.getpid())
    
        sleep(3)
    
        # 增加一个信号量
        sem.release()
        print('进程%d添加信号量'%os.getpid())
    
    jobs = []
    
    for i in range(4):
        p = Process(target = fun)
        jobs.append(p)
        p.start()
    
    for i in jobs:
        i.join()
    
    print(sem.get_value())
    

    同步和互斥

    临界资源

    对多个进程或者线程都可见的资源,容易产生争夺,这类资源称为临界资源

    临界区

    对临界资源进行操作的代码区域称为临界区

    解决资源争夺

    同步或者互斥

    同步

    同步是一种合作关系,为完成某种任务而建立的多个进程或者线程之间的协调关系、次序等,传递消息告知资源占用情况

    互斥

    互斥是一种制约关系,当一个进程或线程进入临界区后会进行枷锁操作,此时其他进程(线程)无法进如临界区,只有当该进程(线程)使用后进行解锁,其他人才可以使用。这种技术往往是通过阻塞完成。

    进程间同步互斥方法

    事件

    e.wait() 产生一种阻塞,直到eset之后才结束阻塞
    e.set()e进行set操作,wait不再阻塞
    e.is_set() 判断e是否被设置的状态
    e.clear()e变成没有设置的状态

    lock = Lock() 创建锁进程
    lock.acquire() 给临界区上锁
    lock.release() 给临界区解锁
    with lock 实现加锁解锁
    具体实现上,acquire()为一个条件阻塞函数,当有任意一个进程先进行了acquire操作后,其他进程再企图进行acquire操作时就会阻塞,直到lock对象被release后其他进程才可进行下一次acquire操作

    相关文章

      网友评论

          本文标题:进程间通信

          本文链接:https://www.haomeiwen.com/subject/vznfgqtx.html