美文网首页
Unix进程间通信详解

Unix进程间通信详解

作者: FengyunSky | 来源:发表于2020-04-17 10:48 被阅读0次

主要讲解进程间通信方式,包含管道 FIFO命名管道 消息队列 同步(互斥锁、条件变量、信号量、读写锁、fcntl记录上锁) 共享内存RPC远程调用,像基于tcp udp套接字、域套接字的已在《unix网络编程》中讲解;

IPC对象的可持续性:包含进程可持续、内核可持续、文件可持续;

image.png
fork exec _exit系统调用对IPC对象的影响如下:
image.png

posix IPC

posix ipc文件路径类型系统调用函数命名都以相应的IPC类型相关,如消息队列为mq_xxx、信号量sem_xxx、共享内存shm_xxx

文件路径名称需要符合规范:

  • 长度不超过最大长度限制;
  • 需要以/为前缀,且该文件具有相应的权限;
  • 名字中额外的/斜杠符需要又程序实现定义;

对于创建或者打开IPC文件描述符系统调用,如mq_open sm_open shm_open,需要指定文件模式,如O_WRONLY O_RDONLY O_RDWR等,且需要具备相应的权限;

image.png

管道

管道是最古老的IPC技术,其中包括无命名管道(用于存在亲缘关系的进程通信)和命名管道(可用于非亲缘关系进程通信),为半双工通信,也存在全双工(需要系统支持);

对于非命名管道系统调用如下:

#include <unistd.h>

int pipe(int fd[2]);    //若成功返回0,失败返回-1

通常用法是父子进程各关闭一方读及另一方写,形成半双工通信;


进程间管道.png

对于命名管道FIFO,具体系统调用如下:

#include <sys/types.h>
#include <sys/stat.h>

int mkfifo(const char *pathname, mod_t mod); //若成功返回0,出错返回-1

默认隐含使用O_CREAT | O_EXCL,若指定路径不存在,则会创建管道文件(其中mod为模式位);若指定管道文件已存在,则返回EEXIST错误,若返回此错误需要调用open打开;

管道文件open或者fopen打开,都只能只读或者只写模式,因为管道是半双工通信;

image.png
对管道或者FIFO,write写数据总是往末尾添加数据,read读数据总是从头返回数据;如果对管道或者FIFO调用lseek,就会返回ESPIPE错误;

对于管道文件,可指定为非阻塞模式,可通过open指定O_NOBLOCK标志位,或者使用fcntl函数修改当前的文件阻塞模式位,具体的非阻塞标志O_NOBLOCK对管道和FIFO影响如下:

image.png
对于具体的读或者写操作,需要注意:
  • read读取的不一定是传入的字节数,有可能小于该值,具体返回的为管道或者FIFO可读的字节数;

  • write写操作若一次性写入字节数小于等于PIPE_BUF(该值posix规定不小于512)管道缓存长度,则内核保证为原子性,即两方同时写操作,只会先写入一方所有数据,再写入另一方数据(若阻塞模式则能容纳则写入,不能容纳则等待可读空间能容纳再写,若非阻塞,则见下文),不会导致任何一方数据乱序;但若写入字节数超过PIPE_BUF则不保证原子性,原子性是内核上锁保护实现;

  • 对于O_NOBLOCK标志不影响write写操作的原子性,原子性完全是所请求写入的字节数是否小于等于PIPE_BUF(不是管道剩余可用空间),非阻塞写具体情况如下:

    1. 若写入字节数小于等于PIPE_BUF,若有足够空间容纳写入数据,则所有数据立即写入返回;若不能容纳,则对于非阻塞立即返回EAGAIN错误(因为内核无法接收部分数据的同时又保证写入的原子性);
    2. 若写入字节数大于PIPE_BUF,则写入管道或者FIFO能容纳的字节数,且该值作为write返回值;若管道或者FIFO已满,则返回EAGAIN错误;
  • 管道或FIFO保证读写不会同时进行,因为为半双工;

  • 如果一个没有为读打开的管道或FIFO写入,那么内核将产生一个SIGPIPE信号;

  • 若管道或FIFO为阻塞模式,open打开管道读或者写会阻塞该管道被写或者读为止,假设客户-服务模式存在两个管道文件同时读或者写,会一直阻塞导致死锁;

  • 多客户-单服务模式下,若实现服务端不受客户端连接关闭导致的read为0结束,需要同时读写打开管道文件;

    readfifo = open(SERV_FIFO, O_RDONLY, 0);
    dummyfd = open(SERV_FIFO, O_WDONLY, 0);
    

    FIFO`相比普通文件通信优势:

  • FIFO由内核控制,效率更高,管道文件被关闭会自动清空内容(随进程持续性),而普通文件操作是随文件系统的持续性;

  • FIFO由内核控制单向流动,半双工通信,而文件操作需要控制时序实现锁,避免同时读或同时写,还要处理对端关闭的通知;

  • FIFO若写入字节数小于PIPE_BUF缓存区大小,保持原子性,但文件操作需要自己去实现;

  • ...

使用实例

popen函数则使用管道来获取shell执行结果,shell多指令操作,也使用管道;

使用限制

系统对管道或FIFO限制如下:

  • 进程打开的最大的描述符数(可通过sysconf函数查询,一般都很大,osx10.15为256)
  • PIPE_BUF大小限制,可通过pathconf或者fpathconf查询(一般为512)

消息边界

管道为无边界通信,对端无法感知发送的字节数,若支持边界,可通过如下方式:

  • 使用分隔符(如回车符 http就是使用回车符再加换行符CR/CF来分割文本记录)

  • 添加消息长度,通过结构体来指定发送数据的长度及类型(放在消息首部),对端需要先读取消息首部确定消息长度,再读取有效数据;

    //如书中例子的结构体结构
    struct mesg_t {
      long mesg_len;
      long mesg_type;
      char mesg_data[MAXMESGDATA];
    }
    
  • 每次连接一个记录:应用通过关闭与其对端的连接来指示一个记录的结束,如http1.0;

消息队列

消息队列类似消息链表,不同于管道/FIFO,每个消息都是一个记录(包含了消息优先级或者类型、消息长度、消息数据),消息队列是随内核持续,且往消息队列中添加消息,无需对端打开接收消息;

posix ipcsystem v ipc消息存在许多类似,但存在如下差异:

  • posix ipc获取消息返回的是优先级最高的消息,而system v ipc消息队列的读则可以返回任意指定优先级的消息;
  • posix ipc提供了往空消息队列中添加消息的异步通知机制,可允许产生一个信号或者启动一个线程(调用线程入口函数),而system v ipc未提供如此机制;

posix ipc提供的接口函数如下:

mq_open打开或者创建消息队列对象,返回消息队列描述符,其中指定了消息队列的name路径;

mq_close关闭当前进程的消息队列描述符,但不是删除内核消息队列对象;

mq_unlink删除消息队列链接的文件,消息队列也存在引用计数概念,需要文件引用计数为0时,内核才会删除链接的消息队列文件并删除内核中的消息队列;

mq_getattr mq_setattr获取/设置消息队列的属性,属性包括消息队列flag标志(0或者O_NOBLOCK非阻塞标志)、消息队列的最大消息数、消息队列中消息的最大字节数及当前消息队列中消息数;

mq_send发送消息队列消息,需指定消息的长度及优先级;

mq_recv接收消息队列中优先级最高的消息,并且会返回消息的优先级;

mq_notify建立消息的异步信号通知,mq_wait阻塞等待信号事件;

posix ipc消息队列限制,可通过sysconf获取:

  • 消息队列中的最大消息数;
  • 消息的最大字节数;

Mac mach消息队列

Mac未提供posix及systemv 消息队列接口,但通过mach port消息实现了类似消息队列,不过未提供上层API;

mac 内核框架
image.png
OSX和iOS操作系统中核心为Darwin,具体的Darwin架构如下图:
image.png
image.png
Darwin架构为层次化架构,包含了XNU内核、unix shell环境及各种框架及库;其中XNU(XNU's not UNIX)核心包含了:
  • Mach微内核

    mach微内核不同于宏内核(所有内核实现都在同一内核空间中,优点是内核内存访问速度快、效率高),只实现操作系统最核心的功能(如内存管理和任务调度),其他的服务都有外部实现(通常为用户态程序),且外部服务通信需要通过消息传递形式,彼此是隔离状态

    优点:1)代码量小,可以遍历所有的代码路径,验证功能的正确性;2)稳定性和健壮性,只实现最基础的功能,其他功能都由外部服务实现,保证最小的故障及崩溃;3)灵活性,方便移植;

    缺点:性能,由于使用消息传递机制需要内存复制操作及数次上下文切换;

    但XNU内核属于混合内核,内核的最核心部分支持底层服务,包括调用、内存管理和虚拟内存,类似微内核;其他的外部服务也在内核态,且在同一内存空间中;

    image.png
    实现:
  • 进程和线程抽象

  • 虚拟内存管理

  • 任务调度

  • 进程间通信和消息传递

  • BSD层

    集成自FreeBSD保持了完全兼容,但也存在一些变化;是建立在Mach之上,提供了更高层次的抽象,可靠现代化的API,保证了POSIX兼容;

    实现:

    • unix进程模型
    • POSIX线程模型及其相关的原语
    • UNIX用户和组
    • 网络协议栈
    • 文件系统访问
    • 设备访问
  • libkern层

    提供了一个内建的、自包含的c++库,可实现面向对象内核开发,不仅使用c及汇编语言,开发效率更高;

  • I/O Kit

    I/O Kit设备驱动程序框架;

Mach概述

Mac系统调用提供了一套BSD封装的POSXI系统调用接口,也提供了Mach核心的系统调用接口,且该接口仍可在用户态访问,如top命令;

mach中所有东西都是通过自己的对象实现,进程(mach中称为任务)、线程和虚拟内存都是对象,且都有自己的属性;其独特之处在于:对象之间的通信是通过消息传递的形式实现的,且不同对象间不能直接相互调用;消息是以FIFO形式可靠传输;

mach消息

mach消息是Mach IPC的核心基础,消息可以在在两个端口(或称为端点)之间传递,端点可以是单主机也可以是远程机器,并解决了消息参数串行化、对齐、填充和字节顺序问题;

mach_port_t port;

typedef struct
{
  mach_msg_size_t msgh_descriptor_count;
} mach_msg_body_t;

typedef struct 
{
  mach_msg_bits_t   msgh_bits;
  mach_msg_size_t   msgh_size;
  mach_port_t       msgh_remote_port;
  mach_port_t       msgh_local_port;
  mach_port_name_t  msgh_voucher_port;
  mach_msg_id_t     msgh_id;
} mach_msg_header_t;

struct {
    mach_msg_header_t header;
    mach_msg_body_t body;
    mach_msg_type_descriptor_t type;
    mach_msg_trailer_t trailer;
} message;

使用mach_msg_send mach_msg_receive发送/接收消息(都是通过mach_msg接口完成的),见mach_msg.c源码

Core FoundationFoundation为Mach端口提供了高级API。在内核基础上封装的CFMachPort / NSMachPort可以用做runloop源,尽管CFMachPort / NSMachPort有利于的是两个不同端口之间的通讯同步,具体见进程间通信 (OSX/iOS)

//server.c
#include <CoreFoundation/CoreFoundation.h>
//接收到mach port消息的回调
CFDataRef myCallBack(CFMessagePortRef local, SInt32 msgid, CFDataRef data, void *info) {
     char *message = "Thanks for calling!";
     CFDataRef returnData = CFDataCreate(NULL, (const UInt8 *)message, strlen(message)+1);
     printf("here is our received data: %s\n", CFDataGetBytePtr(data));
     return returnData;  // as stated in header, both data and returnData will be released for us after callback returns
}

int main() {
        //创建本地端口
     CFMessagePortRef local = CFMessagePortCreateLocal(NULL, CFSTR("MyPort"), myCallBack, NULL, false);
    //创建本地端口runloop源
     CFRunLoopSourceRef source = CFMessagePortCreateRunLoopSource(NULL, local, 0);
    //添加到runloop中
     CFRunLoopAddSource(CFRunLoopGetCurrent(), source, kCFRunLoopDefaultMode);
     CFRunLoopRun();    // will not return as long as message port is still valid and source remains on run loop
     CFRelease(local);
}

//client.c
#include <CoreFoundation/CoreFoundation.h>

int main() {
    //创建远端端口对象
     CFMessagePortRef remote = CFMessagePortCreateRemote(NULL, CFSTR("MyPort"));
     char *message = "Hello, world!";
     CFDataRef data, returnData = NULL;
     data = CFDataCreate(NULL, (const UInt8 *)message, strlen(message)+1);
    //只要完成指定远端的端口,装载数据,还有设置发送与接收的超时时间的操作。剩下就由CFMessagePortSendRequest来接管了
     if (kCFMessagePortSuccess == CFMessagePortSendRequest(remote, 0, data, 1, 1, kCFRunLoopDefaultMode, &returnData) && NULL != returnData) {
         printf("here is our return data: %s\n", CFDataGetBytePtr(returnData));
         CFRelease(returnData);
     }
     CFRelease(data);
     CFRelease(remote);
}

//Makefile
XCODE_BASE=/Applications/Xcode.app/Contents
SIMULATOR_BASE=$(XCODE_BASE)/Developer/Platforms/MacOSX.platform
FRAMEWORKS=$(SIMULATOR_BASE)/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX.sdk/System/Library/Frameworks/
INCLUDES=$(SIMULATOR_BASE)/Developer/SDKs/MacOSX.sdk/usr/include/

all: client server

client: client.c
    clang -I$(INCLUDES) -F$(FRAMEWORKS) -o client client.c -framework CoreFoundation -target x86_64-apple-macos10.15 -isysroot /Applications/Xcode.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX10.15.sdk

server: server.c
    clang -I$(INCLUDES) -F$(FRAMEWORKS) -o server server.c -framework CoreFoundation -target x86_64-apple-macos10.15 -isysroot /Applications/Xcode.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX10.15.sdk

CVE-2016-7637---再谈Mach IPC

再看CVE-2016-1757浅析mach message的使用

macOS IPC Study Notes

线程进程间同步

互斥锁 条件变量

互斥锁可实现线程的互斥或者进程的互斥(需要设置互斥锁进程共享,即进程都可访问到该互斥锁),互斥锁分为静态初始化和动态初始化,具体如下:

//静态初始化
static pthread_mutext_t mutex = PTHREAD_MUTEX_INIT;

//动态初始化
pthread_mutex_t mutex = pthraad_mutext_init();

使用完成需要去销毁,pthread_mutex_destroy()*

上锁、解锁函数如下:

#include <pthread.h>

int pthread_mutex_lock(pthread_mutex_t *mptr);
int pthread_mutex_trylock(pthrad_mutex_t *mptr);
int pthread_mutex_unlock(pthrad_mutex_t *mptr);
//mac平台暂不支持如下api
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex, const struct timespec *restrict tsptr);//功能与lock等价,但设置了超时时间,若超时时间到,未加锁,则返回ETIMEOUT

其中pthread_mutex_trylock为非阻塞函数,若该互斥锁已上锁,则返回EBUSY错误;

使用时需要小心避免线程对同一互斥锁上锁两次,或者两个互斥锁相互加锁导致相互等待,进而引发死锁

条件变量提供了多个线程会和的场合,需要配合互斥锁使用,互斥锁用于上锁,条件变量用于等待,具体函数如下:

#include <pthread.h>

//初始化及销毁
static pthread_cond_t cond = PTHREAD_COND_INIT;//静态初始化
//动态初始化
int pthread_cond_init(pthread_cond_t *cptr, 
                      const pthread_condattr_t *restrict attr);//attr为NULL,则为默认属性(私有)
int pthread_cond_destroy(pthread_cond_t *cond);

//等待条件变量
int pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr);
int pthread_cond_timedwait(pthread_cond_t *cptr, 
                           pthread_mutex_t *mptr,
                           const struct timespec *restrict tsptr);
//唤醒等待条件的线程
int pthread_cond_signal(pthread_cond_t *cptr);//至少唤醒一个以上的线程,posix规范,且该信号不是unix SIG_xxx信号(mac只能唤醒一个为最先添加到等待条件队列的线程)
int pthread_cond_broadcast(pthread_cond_t *cptr);//唤醒所有等待条件的线程

pthread_cond_signalapi描述如下:

These two functions are used to unblock threads blocked on a condition variable.

The pthread_cond_signal() call unblocks at least one of the threads that are blocked on the specified condition variable cond (if any threads are blocked on cond).

The pthread_cond_broadcast() call unblocks all threads currently blocked on the specified condition variable cond.

If more than one thread is blocked on a condition variable, the scheduling policy determines the order in which threads are unblocked. When each thread unblocked as a result of a pthread_cond_signal() orpthread_cond_broadcast() returns from its call to *pthread_cond_wait() or *pthread_cond_timedwait(), the thread owns the mutex with which it called *pthread_cond_wait() or *pthread_cond_timedwait(). The thread(s) that are unblocked contend for the mutex according to the scheduling policy (if applicable), and as if each had called *pthread_mutex_lock().*****

*The pthread_cond_signal() or pthread_cond_broadcast() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() orpthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behaviour is required, then that mutex is locked by the thread callingpthread_cond_signal() or pthread_cond_broadcast().*****

The pthread_cond_signal() and pthread_cond_broadcast() functions have no effect if there are no threads currently blocked on cond.

使用pthread_cond_wait时需要给mutext互斥锁上锁,因为该函数内部会通过互斥锁锁定,自动把调用线程添加到等待条件的线程列表上,然后对互斥量解锁,保证陷入wait调用后到调用线程被加入到唤醒队列之间的原子性,避免唤醒丢失引发无法唤醒(可见Calling pthread_cond_signal without locking mutex);并且需要获取到pthread_cond_signal 唤醒信号及mutex互斥锁并重新lock该互斥锁后才会返回;

image.png

如上若在条件条件判断完成后,另一线程B改变该判断条件为真,且发送条件变量信号,就会导致线程A一直等待(即信号丢失);因此:

  • pthread_cond_wait等待线程发送信号时,需要循环判断条件是否满足,避免虚假唤醒;

  • pthread_cond_wait需要传入互斥锁,且需要对该互斥锁加锁,避免唤醒信号丢失;

  • pthread_cond_signal线程发送信号且修改判断条件变量时,也需要加锁,防止信号丢失;

  • 确保互斥锁锁定在更改条件和发送信号代码路径中(如右侧代码),并且pthread_cond_signal后再pthread_cond_unlock解锁互斥锁;

这涉及到线程调度问题,若线程Bunlocksignal有可能在unlock后发生了线程调度切换,另一个线程C unlocksignal唤醒等待的线程A,线程A处理完成后继续等待,此时线程调度到B继续signal信号发送,而此时线程A被唤醒但条件不满足继续wait等待,引发线程B发送“虚假”唤醒信号;条件变量signal与unlock的顺序

> posix描述:
> 
> The *pthread_cond_signal()* or *pthread_cond_broadcast()* functions may be called by a thread whether or not it currently owns the mutex that threads calling *[pthread_cond_wait()](https://pubs.opengroup.org/onlinepubs/007908799/xsh/pthread_cond_wait.html) or *[pthread_cond_timedwait()](https://pubs.opengroup.org/onlinepubs/007908799/xsh/pthread_cond_timedwait.html) have associated with the condition variable during their waits; however, if predictable scheduling behaviour is required, then that mutex is locked by the thread calling*pthread_cond_signal()* or *pthread_cond_broadcast()*.**
> 
> **翻译:posix明确允许在`pthread_cond_unlock`后调用`pthread_cond_signal`,若需要可预见的调度行为,那么调用`pthread_cond_signal`必须锁住该互斥锁;**
属性
共享属性
#include <pthread.h>

//属性获取及修改函数
int pthread_mutexattr_getpshared(const pthread_mutexattr_t *attr, int *valptr);
int pthread_mutexattr_setshared(pthread_mutexattr_t *attr, int value);
int pthread_condattr_getpshared(const phread_condattr_t *attr, int *valptr);
int pthread_condattr_setpshared(pthread_condattr_t *attr, int value);
                                                                                                //成功均返回0,出错则为整的Exx;

其中get函数返回由valptr执行的整数的这个属性的当前值,set函数设置属性的当前值,value可以是PTHREAD_PROCESS_PRIVATE或者PTHREAD_PROCESS_SHARED,默认为前者(进程内不同线程共享),后者为进程间共享属性;对于mac系统,不支持该接口,因此跨平台需要通过sysconf检查是否支持;

若设置为PTHREAD_PROCESS_SHARED进程间共享,则需要对互斥锁放在共享内存,且分配的共享内存必须大于等于sizeof(pthread_mutex_t)

进程间共享需要注意:

  • 若进程持有互斥锁期间进程终止(可能是异常终止),内核不会自动释放进程所持有的锁;

    对于读写锁posix信号量也存在此情况,进程终止内核总是自动清理的唯一同步锁类型是fcntl记录锁;

  • 多线程中一个线程持有互斥锁期间终止(另一线程取消pthread_cancelpthread_exit资源退出,或者异常终止),对于pthread_exit自愿退出,程序自身清楚还持有一个互斥锁;对于pthread_cancel线程取消,可通过注册线程清理程序来清理;但对于异常终止,会导致上面的情况,不会自动释放;

  • 若内核存在自动清理,也能存在进程终止导致的数据异常,进而引发其他进程逻辑错误,因此需要处理这种异常情况;

健壮性属性
int pthread_mutexattr_getrobust(const pthread_mutexattr_t *restrict attr,
                                                                    int *restrict robust);
int pthread_mutexattr_setrobust(pthread_mutexattr_t *attr,
                                                                    int robust);
                                                                                            //成功返回0,出错返回Exx

int pthread_mutex_consistent(pthread_mutex_t *mutex);

健壮性属性包括PTHREAD_MUTEX_STALLED(默认)和PTHREAD_MUTEX_ROBUST

PTHREAD_MUTEX_STALLED属性,若持有互斥锁的进程终止(不管是异常还是正常),未清理互斥锁,会导致等待该互斥锁的进程一直阻塞;

PTHREAD_MUTEX_ROBUST属性,会解决异常进程持有互斥锁未释放终止的问题,若持有互斥锁的拥有者终止未对该锁解锁,会导致另一个线程调用pthread_mutex_lock获取该锁且返回EOWNERDEAD错误,指明该锁之前的拥有者已经不存在且处于“不一致”的状态;需要调用下面的函数pthread_muext_consistent来指明该互斥锁状态在解锁之前是一致的,否则会导致该互斥锁处于永久不可用状态;

若在线程解锁该互斥锁后调用pthread_mutex_consistent,导致其他试图获取该互斥锁的线程返回ENOTRECOVERABLE,表明该互斥锁已经不可用,只能调用pthread_mutex_destroy销毁,因此,需要解锁该互斥锁前调用pthread_mutex_consistent

Mac系统暂不支持该属性!

类型属性
int pthread_mutex_gettype(const pthread_mutexattr_t *restrict attr, int *restrict type);
int pthread_mutext_settype(pthread_mutexattr_t *attr, int type);
                                                                                                    //成功返回0,否则错误Exx

主要类型包括如下:

  • PTHREAD_MUTEX_NORMAL

    一种标准的互斥锁类型,不做任何的错误检测和死锁检测,如果调用线程锁定状态下再次lock就会导致死锁;若已经unlock解锁再次unlock就会导致未定义的行为;

  • PTHREAD_MUTEX_ERRORCHECK

    会进行错误检测,包括死锁,解锁另一个已经锁定的该类型的互斥锁,重复解锁,都会返回错误;

  • PTHREAD_MUTEX_RECURSIVE

    递归锁(又称可重入锁),可对已经上锁的线程再次上锁,存在引用计数概念,需要解锁同样的次数才可解锁;若另一线程对已经上递归锁的线程进行解锁,或,重复解锁,都会导致错误;

  • PTHREAD_MUTEX_DEFAULT

    此类型锁提供默认的特性和行为,不同系统默认行为不同;对于Mac系统,重复加锁,解锁,或者线程对对已经上锁的另一线程解锁,都会出现未定义的行为

Mac支持该属性!

读写锁

#include <pthread.h>

//静态初始化
static pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
//动态初始化
int pthread_rwlock_init(pthread_rwlock_t *rwlock);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

int pthread_rwlock_rdlock(pthread_rwlock_t *rwptr);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwptr);
int pthread_rwlock_unlock(pthread_rwlock_t *rwptr);

int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwptr);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwptr);

读写锁,可实现并发读、读写互斥、写写互斥,这种对于某个给定资源的共享访问也称为共享-独占上锁,获取一个读写锁用于读称为共享锁,获取一个读写锁用于写称为独占锁

属性方法:

int pthread_rwlockattr_getshared(const pthread_rwlockarrt_t *attr, int *valptr);
int pthread_rwlockattr_setshared(pthread_rwlockattr_t *attr, int value);

value属性值包括PTHREAD_PROCESS_PRIVATE(默认)和PTHREAD_PROCESS_SHARED共享属性(可实现进程间共享);

Mac支持读写锁进程间共享!

自旋锁

自旋锁互斥锁类似,互斥锁会让进程或者线程休眠,但自旋锁会一直独占着cpu,因此自旋锁一般用于独占cpu时间短的操作及非抢占性内核(对于分时或者抢占性内核,若出现调度自旋锁仍在上锁就会导致休眠),适合底层实现其他锁的原语,不太适合用户层以避免一直占用着cpu;

自旋锁避免上下文切换导致的效率问题,对于需要独占cpu极小时间,又不会让内核调度的话,使用自旋锁效率较高。但对于现代处理器上下文越来越快,自旋锁只有在特定场景下才有用。

#include <pthread.h>
//初始化,pshared同其他同步锁值相同:PTHREAD_PROCESS_SHARED表示进程间共享,默认PTHREAD_PROCESS_PRIVATE
int pthread_spin_init(pthread_spin_t *lock, int pshared);
int pthread_spin_destroy(pthread_spin_t *lock);

int pthread_spin_lock(pthread_spin_t *lock);
int pthrad_spin_trylock(pthread_spin_t *lock);
int pthread_spin_unlock(pthread_spin_t *lock);
                                                                //成功返回0,出错返回Exx

Mac不支持自旋锁接口

屏障

屏障(也称栅栏)可实现线程的并行同步机制,类似设置一道屏障(或者栅栏),等待所有人(或者任务)完成到达屏障前都需要休眠,直至所有人(或任务)完成,都是被唤醒放行;其中pthread_join等待另一线程完成后再继续,也算是屏障,不过是计数为2;但屏障可实现任意数目的线程等待,直至所有线程完成处理工作。

#include <pthread.h>

int pthread_barrier_init(pthread_barrier_t *restrict barrier,
                                                    pthread_barrierattr_t *attr,                //指定属性,null默认属性
                                                    unsigned int count);                                //指定数量
int pthread_barrier_destroy(pthread_barrier_t *barrier);
                                                                                                                //成功返回0,出错返回Exxx

int pthread_barrier_wait(pthread_barrier_t *barrier);
                //成功返回0或者PTHREAD_BARRIER_SERIAL_THREAD,出错返回Exx

//属性,同上PTHREAD_PROCESS_SHARED表示进程间共享,默认PTHREAD_PROCESS_PRIVATE
int pthread_barrierattr_getpshared(pthread_barrier_t *restrict attr,
                                    int *restrict pshared);
int pthread_barrierattr_setpshared(pthread_barrier_t *attr,
                                    int pshared);

pthread_barrier_wait若线程未达到指定的数目则休眠,直到达到指定数目,则所有线程被唤醒;对于一个任意线程,最后一个执行调用的则返回PTHREAD_BARRIER_SERIAL_THREAD,其他返回0;一旦到达指定数目计数,屏障可被重用,除非使用pthread_barrier_destroy销毁,再初始化pthread_barrier_init初始化为新的指定计数,否则屏障计数不会被改变;

记录锁

读写锁能实现进程间同步,但存在进程异常终止内核不会自动释放锁;记录锁解决了进程间同步自动释放问题,并且fork子进程记录锁不会被继承,exec创建新的程序可以继承原执行程序的记录锁,不过可设置FD_CLOEXEC执行时关闭标志位,则会自动关闭相应文件的记录锁;

#include <fcntl.h>

struct flock {
  short l_type;     //F_RDLCK, F_WRLCK, F_UNLCK
  short l_whence;   //SEEK_SET, SEEK_CUR, SEEK_END
  off_t l_start;    //相对l_whence偏移字节数
  off_t l_len;      //锁定长度,0为整个文件
  pid+t l_pid;      //用于F_GETLK命令选项返回当前进程id
}

cmd:
F_GETLK, F_SETLK, F_SETLKW(如果记录锁被上锁则调用进程会被阻塞直至解锁)

int fcntl(int fd, int cmd, ../*struct flock *flockptr */);

读写锁相同,也存在两种类型的锁:共享读锁独占写锁,锁机制一致;

记录锁存在两种类型锁:建议性锁强制性锁,不过要看系统是否支持;

建议性锁就是系统不保证对文件已上锁的进程不会被其他进程干扰;强制性锁顾名思义系统会检查所有的文件操作系统调用,如read write,是否干扰了某个进程持有的某个锁;

开启特定文件的强制性锁需满足:

  • 关闭组执行位
  • 打开设置组ID位SGID

典型应用场景:进程单例

信号量

信号量是一种用于不同进程或者单个进程不用线程同步手段的原语,不同于管道、FIFO、消息队列这些进程间通信方式,它是一个计数器(随内核持续),用于共享数据对象的原子访问;

信号量也可以实现互斥锁的进程或者线程的同步(设定计数器值为1,称为二值信号量),但两者存在区别:

  • 互斥锁上锁/解锁需要在同一个线程,而信号量不存在此限制;
  • 互斥锁+条件变量实现同步存在条件变量信号丢失的情况,而信号量有一个与之关联的状态(引用计数),且随内核存在(即使进程崩溃也依然不会丢失);

posix提供了两种类型信号量:命名信号量和基于内存的信号量(Mac不支持);

image.png image.png
#include <semaphore.h>

//oflag可为0、O_CREAT 或 O_CREAT|O_EXCL,mode及value只有创建信号量时才需要,且创建时需要添加读写权限,因为信号量需要修改文件来改变计数器;
sem_t *sem_open(const char *name, int oflag, .../* mode_t mode, unsigned int value */);

//进程终止会自动关闭已打开的信号量,unlink也存在引用计数概念,需要最后一个close后方可删除内核文件
int sem_close(sem_t *sem);
int sem_unlink(sem_t *sem);

int sem_wait(sem_t *sem);//计数器减1,若为0,则阻塞
int sem_trywait(sem_t *sem);//若失败返回EAGAIN
int sem_post(sem_t *sem);//计数器加1,触发sem_wait阻塞的进程或线程继续执行
int sem_getvalue(sem_t *sem);//Mac不支持

/***** 基于内存的信号量(Mac不支持) *****/
//shared为0时,线程共享;shared为1,则进程间共享
int sem_init(sem_t *sem, int shared, unsigned int value);
int sem_destroy(sem_t *sem);

sem_open创建信号量指定name文件,在文件系统中无法查看到(mac)*

信号量进程间共享,可通过命名信号量(sem_open指定相同的名字即可)来共享,fork子进程也可以使用父进程打开的任何信号量;

基于内存的信号量是随内存持续,只要基于内存的信号量的内存区保持有效,则该信号量一直存在;

若信号量计数器值已为0,继续sem_wait调用,进程/线程仍会等待该信号量,且计数器仍会-1,semp_post只会释放其中一个等待的进程/线程,直至计数器>0,所有等待该信号量的进程/线程不再等待信号量继续执行;

image.png

探讨 iOS 开发中各种锁

深入理解 iOS 开发中的锁

共享内存

共享内存是最快的IPC,其有效避免了数据在内核的流动次数,可直接在通信进程共享数据;主要的函数接口如下:

#include <sys/mman.h>

void *mmap(void *addr, size_t len, int prot, int flags, int fd, off_t offset);
                                                                                                //成功返回被映射区的起始地址,失败返回MAP_FAILED
int munmap(void *addr, size_t len);
int msync(void *addr, size_t len, int flags);
                                                                                                //成功返回0,失败返回-1

其中mmap可实现如下三种类型的映射(以下都可实现无亲缘关系进程间共享内存):

  • 使用普通文件以提供内存映射I/O(内存映射文件);

  • 使用特殊文件以提供内存映射(内存映射匿名文件);

    如/dev/zero块设备,提供匿名文件的内存映射;

  • 使用shm_open以提供POSIX共享内存区对象;

内存映射文件类型

具体的mmap参数解释如下:

  • addr指定描述符fd应被映射的进程内空间的起始地址,通常为NULL(由内核自动去选择起始地址),若此地址超过描述符fd空间地址,会存在异常(如SIGBUS 总线错误SIGSEGV段错误);

  • len是映射到调用进程地址空间的字节数,不能为0;

  • prot数据保护参数,包括PROT_NONE数据不可访问,PROT_READ数据可读,PROT_WRITE数据可写,PROT_EXEC数据可执行;通常为PROT_REEAD | PROT_WRITE

  • flags为共享标志,包括

    • MAP_SHARED变动是共享的,调用进程对被映射数据所做的修改对于共享该对象的所有进程可见,且改变了其底层支撑对象(或是一个文件,或者是共享内存区);
    • MAP_PRIVATE变动是私有的,调用进程对被映射数据所做的修改只对自己可见,且不会改变其底层支撑对象;
    • MAP_FIXED准确的解释第一个addr参数,对于可移植考虑,不建议使用;
    • MAP_ANON提供匿名存储映射,不需要指定普通文件且打开,需要将描述符指定为-1Mac支持!
    • ...

    进程间通信,通常使用MAP_SHARED选项;

  • fd描述符,通常为普通文件或者块设备,但不能为套接字或者终端的描述符,否则失败;函数成返回后,close描述符对建立的映射无影响;

  • offset偏移,通常为0,由内核去自动选择起始地址;

munmap函数会删除映射关系,其中addr参数为mmap返回的起始地址,len是映射区的大小;若删除映射关系后再次调用该函数,则调用进程会产生一个SIGSEGV信号(前提是映射区地址不再被mmap重用);

msync函数同步映射数据到被映射文件,其中flags包括MS_ASYNC执行异步写,MS_SYNC执行同步写,MS_INVALIDATE使高速缓存的数据无效;MS_ASYNCMS_SYNC必须指定其一,差别在于MS_SYNC会阻塞同步完成后返回,而MS_ASYNC写操作已由内核排入队列,就立即返回;MS_INVALIDATE使所有的缓存数据无效;

与普通文件共享相比,共享内存(文件形式)的I/O操作都在内核中完成,不需要调用read write等操作,但若被映射文件共享期间,若映射区域数据被非共享进程修改,会导致映射数据异常,因此需要添加文件锁,避免文件被其他进程异常修改。

共享内存不同进程数据共享,因此需要数据同步,通常使用信号量;因信号量随内核持续,为避免进程异常未删除或者同步信号量,建议sem_open打开信号量后sem_unlink删除该信号量(因信号量已被进程打开,真正删除是进程终止),如下:

sem_t *sem =  sem_open(xxx);
sem_unlink(sem);//需要注意,若是无亲缘关系进程间共享信号量,则unlink会导致不同进程不是共享的同一个信号量

共享内存区对象类型

shm_open打开或创建共享内存区对象;

#include <sys/mman.h>

/**** posix api****/
//name可以是个名称,也可以是个路径
int shm_open(const char *name, int oflag, mode_t mode);
                                                                                            //成功返回非负描述符,失败-1
int shm_unlink(const char *name);
                                                                                            //成功返回0,失败返回-1
/**** systemv api ****/
#include <sys/shm.h>

//创建共享内存对象并返回标识符
int shmget(key_t key, size_t size, int oflag);
                                                                                            //成功返回共享内存对象,失败-1
//把一个共享内存区链接到调用进程的地址空间
void *shmat(int shmid, const void *shmaddr, int flag);
                                                                                            //成功返回映射区的起始地址,失败-1
//类似close
int shmdt(const void *shmaddr);
//获取已存在的共享内存区的大小,或者删除共享内存区对象
int shmctl(int shmid, int cmd, struct shmid_ds *buff);
                                                                                            //成功0,失败-1

内存映射文件不同,共享内存区对象使用shm_open指定的name不需要是个文件,可以是个名称,随内核持续;相同之处是,都需要使用mmap来映射内存,使用相同;

shm_open参数如下:

  • name可以是个名称,也可以是文件;
  • oflag必须包含O_RDONLY或者O_RDWR其中之一,还可以包含O_CREAT O_EXCL O_TRUNC,标志与open函数类似;

shm_unlinksem_unlink使用;

虽然内存映射文件及共性内存区对象都可以使用无亲缘关系进程共享内存,但通常使用共享内存区对象(相比映射文件,该类型存在内存,不受文件系统影响),来实现无亲缘关系进程的共享;

mac既支持posix接口,又支持systemv接口!

内存映射大小修改/获取函数

#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>

int ftruncate(int fd, off_t len);
int fstat(int fd, struct stat *buf);

可使用ftruncate来修改内存区映射的大小,可以通过fstat获取内存映射大小;

信号量、共享内存对象、消息对象,随内核持续的,命名需要使用不同的名字,建议在命名后添加后缀 .mq .sem .shm

RPC(Remote Procedure Call, 远程过程调用)

RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。

主要用于分布式系统或者集群部署,来实现功能扩展或计算力扩展;

RPC涉及到如下问题:

  • 网络传输,不同主机间建立通信传输需要调用的函数名称及参数,主要是通过在客户端和服务器之间建立TCP连接,远程过程调用的所有交换的数据都在这个连接里传输。连接可以是按需连接,调用结束后就断掉,也可以是长连接,多个远程过程调用共享同一个连接。大部分RPC框架都使用TCP协议,但也可以使用UDPgRPC使用了HTTP2
  • 寻址问题,底层RPC框架需要寻找对端主机的地址及端口,并传输函数名称及参数,对端主机获取函数名称及参数后,需要查找相对应的函数地址;
  • 序列化和反序列化,调用端需要将参数序列化(如转化为字节流)发送,对端收到数据后需要反序列化将数据还原为原有格式的内存表达方式,进行本地调用,再将结果返回到调用端;
    image.png

锁的实现原理

  • 关中断,保护临界区
  • 锁总线,被锁的内存区不能访问
  • 锁调度器,如uc/os
  • 任务控制块添加等待条件的标记,若条件未到达则休眠,进行上下文切换
  • 自旋忙等待,占用cpu,不会触发上下文切换
  • ...

相关文章

网友评论

      本文标题:Unix进程间通信详解

      本文链接:https://www.haomeiwen.com/subject/dmfevhtx.html