美文网首页
IPC学习之POSIX消息队列

IPC学习之POSIX消息队列

作者: 哈莉_奎茵 | 来源:发表于2018-06-25 17:35 被阅读0次

之前进程间通信的知识都是应付面试的时候临时补的,最近还是要通过码代码来学习下,主要参考《Linux/Unix系统编程手册》,本来照着这本书的示例一路敲下来就足够了,但是书上给的都是比较完整的例子(毕竟便于即敲即用嘛),这里我尽量简化。

1. 准备

这里给出下文可能用到的变量的定义

mqd_t mqd;  // 消息队列句柄,可用于IO多路复用
struct mq_attr attr;  // 消息队列属性,下面为字段说明,[]内为可以设置/获取该字段的API
                      // .mq_flags 0或者O_NONBLOCK [mq_getattr(), mq_setattr()]
                      // .mq_maxmsg 最大消息数量 [mq_open(), mq_getattr()]
                      // .mq_msgsize 最大消息大小 [mq_open(), mq_getattr()]
                      // .mq_curmsgs 队列中消息数量 [mq_getattr()]
const char* mq_name = "/mq";  // 消息队列名称,必须以'/'开头

消息队列的maxmsgmsgsize都是在创建时指定的,之后不能改变。是否非阻塞读写则可以手动设定。消息数量也是只读的,是随着消息队列的读取/写入而改变的,每次写入则加1,每次读取则减1。
另外,相关API都是返回-1作为错误码,之后API说明略去检查返回值的步骤。
gcc编译时需要加上-lrt选项,动态链接到共享库librt.so

2. 基本API

创建消息队列

// 参数2和3同系统调用open(2),参数4设置自定义消息队列属性,若为NULL则不设置,也可以不要参数4
mqd = mq_open(mq_name, O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, &attr);

生产者

下列代码发送3个消息到队列中

mqd = mq_open(mq_name, O_WRONLY | O_NONBLOCK);
const char* messages[] = {"msg-1", "msg-2", "msg-3"};
int priorities[] = {4, 0, 6};
for (int i = 0; i < 3; ++i)
    mq_send(mqd, messages[i], strlen(messages[i]), priorities[i]);

注意对现有的消息队列调用mq_open时,第2个参数不带O_CREAT时不需要指定权限。
发送时需要指定优先级,消息按照优先级降序存在消息队列中,因此队列中的消息依次是"msg-3", "msg-1", "msg-2"

消费者

读取消息队列中所有消息并显示优先级

mqd = mq_open(mq_name, O_WRONLY | O_NONBLOCK);
mq_getattr(mqd, &attr);  // 取得队列属性,从而定义合适大小的缓冲区
char* buffer = new buffer[attr.mq_msgsize + 1];
unsigned int priority;
ssize_t num_read;
while ((num_read = mq_receive(mqd, buffer, attr.mq_msgsize, &priority)) != -1) {
    buffer[num_read] = '\0';
    printf("[%2u] %s\n", priority, buffer); 
}
// 若errno != EAGAIN则需要处理错误,非阻塞模式下若队列为空errno会被设置为EAGAIN
delete[] buffer;

另外关于EINTR错误,若设置信号处理器时指定了SA_RESTART标志,则mq_receivemq_send会自动重启,所以无需代码处理。但是这针对的是阻塞式I/O,参考man 7 signal如下内容

If a blocked call to one of the following interfaces is interrupted by a signal handler, then the call will be automatically restarted after the signal handler returns if the SA_RESTART flag was used; otherwise the call will fail with the error EINTR

至于非阻塞式I/O,个人觉得根本就不会被打断,因为没有意义,但是实际情况还是得看内核怎么实现,有篇讨论可以参考。EINTR and non-blocking calls

超时读写

mq_timedsend()mq_timedreceive相比mq_send()mq_receive()仅仅多出一个参数const struct timespec *abs_timeout,用于指定超时时间,仅仅在O_NONBLOCK标记不起作用时才有效。

 struct timespec
   {     
     __time_t tv_sec;        /* Seconds.  */
     __syscall_slong_t tv_nsec;  /* Nanoseconds.  */                                                                                                             
   };    

注意该参数设置的是绝对时间,因此可以用clock_gettime()来获取CLOCK_REALTIME时钟的当前值,并在该值上加上所需的时间量来生成一个恰当初始化的timespec结构。

2. 消息通知

示例程序的流程是顺序的,先创建消息队列,再写入若干消息,再依次读取所有消息。实际上用于进程间通信时,写者和读者是并发执行的,即生产者-消费者问题。
POSIX消息队列提供了消息通知API,在队列为空时若有新的消息到来,能够接收通知。通过下列API可以注册通知,指定具体通知方式(比如信号)后,若新消息到来使得队列从空变成非空,就会调用自定义的通知处理函数。

int mq_notify(mqd_t mqdes, const struct sigevent *sevp);

以下几点需要特别注意

  • 任一时刻只能有1个进程能够向特定消息队列注册通知,如果已经存在,再次注册会失败,errno被置为EBUSY
  • 若对非空队列注册通知,只有等到队列被清空后,新消息到来时才能发出通知。
  • 若向注册进程发送了一个通知之后就会删除注册信息,这样其他进程就可以向队列注册通知。
  • 若有其他进程调用mq_receive()发生阻塞,则有新消息到来时,其他进程接收消息,注册进程继续等待通知。
  • 可以传入NULL来撤销通知。

通过man 7 sigevent可以查看该结构的详细信息

       union sigval {          /* 通知传递的数据 */
           int     sival_int;         /* Integer value */
           void   *sival_ptr;         /* Pointer value */
       };

       struct sigevent {
           int          sigev_notify; /* 通知方法 */
           int          sigev_signo;  /* 通知信号 */
           union sigval sigev_value;  /* 通知传递的数据 */
           void       (*sigev_notify_function) (union sigval);
                            /* 线程通知的函数 (SIGEV_THREAD) */
           void        *sigev_notify_attributes;
                            /* 线程通知的线程属性 (SIGEV_THREAD) */
           pid_t        sigev_notify_thread_id;
                            /* 用于接收信号的线程ID (SIGEV_THREAD_ID) */
       };

其中sigev_notify指定通知的方法,有SIGEV_NONE(不作任何处理)、SIGEV_SIGNAL(使用信号通知)和SIGEV_THREAD(使用线程通知)。
用示例代码解释典型用法(忽略了错误处理),预定义变量如下

struct sigevent sev;
constexpr int NOTIFY_SIG = SIGUSR1;  // 自定义通知信号的种类

使用信号通知的框架如下

int main() {
    // ...
    signal(SIGUSR1, [](int){});  // 仅仅用于跳出sigsuspend

    sev.sigev_notify = SIGEV_SIGNAL;
    sev.sigev_signo = NOTIFY_SIG;
    mq_notify(mqd, &sev);

    sigset_t empty_mask;
    sigemptyset(&empty_mask);
    while (true) {
        sigsuspend(&empty_mask);
        mq_notify(mqd, &sev);
        // TODO: 处理由空变为非空的消息队列
    }
}

使用线程通知的框架如下

static void notifySetup(mqd_t& mqd);  // 注册通知线程,函数的前置声明

static void notifyFunc(union sigval sv) {
    auto& mqd = *reinterpret_cast<mqd_t*>(sv.sival_ptr);
    notifySetup(mqd);
    // TODO: 处理由空变为非空的消息队列
}

static void notifySetup(mqd_t& mqd) {
    sev.sigev_notify = SIGEV_THREAD;
    sev.sigev_notify_function = notifyFunc;
    sev.sigev_notify_attributes = NULL;
    sev.sigev_value.sival_ptr = &mqd;
    mq_notify(mqd, &sev);
}

int main() {
    // ...
    notifySetup(mqd);
    pause();  // 主线程永远中止,因为定时器通知是在一个单独的线程中调用notifyFunc()来分发的
}

3. 相关系统命令

常见的命令,ipcs显示IPC状态,ipcrm释放IPC。但是它们都只能用于System V IPC,对POSIX IPC没有作用。POSIX IPC被实现成了虚拟文件系统的文件,可以直接使用lsrm来列出和删除这些文件,挂载在/dev/目录下,因此可以用常见的Linux命令来管理。
/dev/mqueue/xxx:记录了消息队列/xxx的状态,这也解释了为何IPC名字要以/开头。

# cat /dev/mqueue/mq
QSIZE:13         NOTIFY:0     SIGNO:10    NOTIFY_PID:30527 

其中QSIZE为所有未消费的消息大小之和,并且进程30527在等待信号10(SIGUSR1)的通知。这是我在启动之前的mq_notify示例程序监听所致。
如果rm掉该文件,那么之后消息队列也无效了。
/proc/sys/fs/mqueue/下记录了一些文件

  • msg_default:消息队列默认属性的mq_maxmsg字段的值;
  • msg_max:消息队列默认属性的mq_maxmsg字段的上限;
  • msgsize_default:消息队列默认属性的mq_msgsize字段的值;
  • msgsize_max:消息队列默认属性的mq_msgsize字段的上限;
  • queues_max:系统可以创建的消息队列个数上限。
# tail -n +1 /proc/sys/fs/mqueue/* | grep -v "^$"
==> /proc/sys/fs/mqueue/msg_default <==
10
==> /proc/sys/fs/mqueue/msg_max <==
10
==> /proc/sys/fs/mqueue/msgsize_default <==
8192
==> /proc/sys/fs/mqueue/msgsize_max <==
8192
==> /proc/sys/fs/mqueue/queues_max <==
256

其中tail+NUM代表打印第NUM行开始的内容。

4. I/O多路复用

POSIX消息队列相比System V消息队列的最大优点就是,mqd_t类型的句柄可以被select/poll/epoll监听。
示例代码epoll_consumer.cc

5. 总结

消息队列本质是消息组成的链表,允许进程以消息的形式交换数据,和数据报socket一样。不同于TCP的流式传输,消息具有边界,N次写入对应N次读取,若读取缓冲区太小,则剩余的部分会被舍弃,而不会留给下次继续读。
队列具有容量上限和单条消息的大小上限,队列填满时,写入操作会被阻塞,非阻塞模式下会失败并设置errnoEAGAIN,因此采用非阻塞模式能用循环读取整个队列而进行后续操作。单次发送的数据超过消息大小上限时,会发送失败。
相比System V消息队列,POSIX消息队列的优点是:

  1. 支持I/O多路复用;
  2. 支持队列从空变为非空时的消息异步通知。
  3. 如同其他POSIX IPC,POSIX消息队列维护了引用计数,支持安全地删除。

缺点在于可移植性较差,因为诞生较晚。大多情况下这不是问题,不维护老代码的话完全可以用POSIX消息队列。另一方面POSIX消息队列严格按照优先级排序,System V消息队列支持按照消息的类型字段来读取消息。

相关文章

  • IPC学习之POSIX消息队列

    之前进程间通信的知识都是应付面试的时候临时补的,最近还是要通过码代码来学习下,主要参考《Linux/Unix系统编...

  • 进程间通信(下)

    消息队列 在UNP第二卷中详细介绍了两种消息队列:Posix消息队列和System V消息队列。这两种消息队列很相...

  • linux -11-IPC(共享内存和消息队列)

    XSI IPC之共享内存 和 消息队列(有固定的套路) 共享内存/消息队列/信号量集 遵循相同的规范,因此编程上有...

  • Linux IPC之消息队列

    通过ipcs可以查看消息队列新增头文件消息队列中消息的新增和删除是对等的,内部维护一个链表,插入一个...

  • 进程通信之消息队列

    ftok键值获取 创建消息队列 IPC对象机制shell命令 消息队列的控制 发送 实例 接收 实例

  • IPC-消息队列

    消息队列概念 消息队列是IPC(进程间通信,inter process communication)中常用的一种方...

  • 进程间通信(5)-IPC通信

    IPC通信(Inter-Process Communication) 三种IPC对象: 共享内存、消息队列、信号灯...

  • XSI IPC之消息队列详解

    命令 ipcs 查看消息队列,共享内存,信号量数组的信息命令 ipcrm -q id 销毁一个消息队列 消息队...

  • Linux-C-day-4-进程间通信-消息队列||信号量 ||

    消息队列 消息队列是在内核中实现的,并且是具有一定的优先级的一种进程间通信模型 POSIX PIC消息队列 在un...

  • Android Binder进程间通信机制

    1、概述 Linux传统IPC机制主要有已下几种:管道、消息队列、共享内存Socket等。消息队列和管道采用存储-...

网友评论

      本文标题:IPC学习之POSIX消息队列

      本文链接:https://www.haomeiwen.com/subject/iawyyftx.html