美文网首页
基于共享内存的无锁消息队列设计

基于共享内存的无锁消息队列设计

作者: 钟离惜 | 来源:发表于2021-01-21 23:01 被阅读0次

一、前言

在前一篇文章中有讲到如何利用CAS原理实现无锁同步,这篇文章就是借用CAS和共享内存实现进程间的高速通信。

消息队列是一种进程、系统间进行通信的常用组件。应用程序通过读写出入队列的消息来通信,实现异步通信,简化系统设计;消息队列还能作为缓存,提高系统吞吐量。

想必大家对环形队列的实现有很清楚的了解,消息队列头主要就是包含一个当前队列的头部位置和尾部位置的信息,环形队列可以存放一定数量的POD类型结构体数据,用于push和pop。

二、性能和需求

2.1 支持多写一读

根据应用的需要,一个消息队列可能有多个写者。负责上行数据处理的通信进程也可以使用多线程方式,以便充分利用CPU资源。

2.2 高吞吐量

开源消息队列按照是否存在中间汇聚节点可分为两类:点对点的如ZeroMQ、nanomsg等,分布式的使用汇聚节点的如Kafka、ActiveMQ、RabbitMQ、NTAS等。

点对点的消息队列由于网络结构简单,吞吐量较高,ZeroMQ吞吐量在50万笔/秒以上,nanomsg在200万笔/秒以上。

使用中间汇聚节点的消息队列,由于结构复杂,同时保证消息的可靠以及持久化,吞吐量较低,其中,NATS在20万笔/秒,Kafka在10万笔/秒,ActiveMQ、RabbitMQ 在2-3万笔/秒。

虚拟机测试环境下吞吐量可轻松达到10W以上。

2.3 低时延

ZeroMQ在万兆网卡下,最低时延为30μs;在进程间消息通信工作模式下,最低时延为13μs[4]。Kafkas时延为2-3ms[5]。HP-UX上系统级消息队列最低时延为50μs。

相较于ZeroMQ、Kafka等消息队列产品,在单机上实现的消息队列应该有更低的响应时延。

预期的目标,消息队列时延在10μs以下是比较理想的。

三、关键技术

3.1 基于共享内存的存储和通信

操作系统提供的进程间通信机制有文件、socket、消息队列、管道、共享内存等。其中,共享内存是最快的IPC机制。

共享内存映射到进程空间后,数据可以直接从共享内存进行读写,不需要执行系统调用进行数据的传输。因此,避免了其它进程间通信机制必须的用户态/内核态切换以及用户空间与内核空间的数据拷贝。

由于消息队不需支持跨主机通信,所以可以采用共享内存进行进程间的通信。

3.2 无锁互斥访问

允许多个写者,采用CAS无锁算法访问尾部位置。
只允许一个读者,不需要对头部位置进行枷锁。

3.3 确保在写的时候不会被读

在每个单元设置一个独立的标志位,一段时间内最多有两个进程访问,一个读一个写,只有写进程写完之后才会把这个标志位置为1,初始化时共享内存区域会用0初始化,当读线程读时如果该标志为1才会读出数据,读完之后会把标志位再置为0。

代码中这个是否被设置的标志位没有加锁,可以看文章单线程读单线程写一个变量是否一定要加锁
简要来说就是因为bool只有一个字节,是一个CPU指令周期(等于总线宽度也就是系统位数,比如64位系统总线宽度是64个字节,一次可以读64字节数据,这也是需要内存对齐的原因)绝对能处理完的。
最坏的情况是读写进程都在访问这个变量,写进程要把这个bool改成true,写进程把这个bool类型的标志位读到CPU寄存器,然后将寄存器里的值设置为true,然后再写入内存,只要没写入内存,这个变量都是false,可以任读进程去读,因为读进程读到false的话是不会去读队列里的数据的。
其次在读进程要将其要修改为false的时候,此时写进程还在处理tail,不存在并发问题。

四、测试结果

测试环境:R5 2600下Centos7 2核4线程2G内存虚拟机

测试一:一个进程写,一个进程读:


测试二:四个进程写,一个进程读:


五、代码示例
shm_queue.hshm_queue.cpp主要就是共享内存无锁队列的实现。
timer.h是个简单的计时器类。
main1.cpp是先启动的用来初始化共享内存,并用来读的进程。
main2.cpp是先用来写的进程,前面测试中可以起多个进程来访问。

//shm_queue.h
#ifndef _SHM_QUEUE_H_
#define _SHM_QUEUE_H_

#include <string>

//only used for POD struct.
//not safety for multithreading.
//safety for multiprocess.
//one process read and multiprocess write

//permission of the process to memory.
enum ShmPermission
{
    SHM_READ,   //read only
    SHM_WRITE,  //read and write
};

int shm_create(int key, size_t size, size_t capacity, ShmPermission shmPermission, bool initialize = false);
int shm_detach(int key);
int shm_delete(int key);
int shm_push(int key, void* p);
int shm_pop(int key, void* p);
int shm_popout(int key);

#endif // !_SHM_QUEUE_H_
//shm_queue.cpp
#include "shm_queue.h"

#include <sys/shm.h>
#include <unordered_map>
#include <string.h>

struct shmqueue
{
    size_t  size;       //size of one cell
    size_t  length;     //total num of cell in queue
    int     shmid;
    void*   pshm;       //each cell has a set bool flag in it's tail
    size_t* head;       //pointer at the head position
    size_t* tail;       //pointer at the tail position
};

static std::unordered_map<int, shmqueue> g_shm;
static std::unordered_map<int, shmqueue>::iterator ite;

int shm_create(int key, size_t size, size_t length, ShmPermission shmPermission, bool initialize)
{
    if (size == 0 || length == 0)
    {
        return -1;
    }
    if (g_shm.find(key) != g_shm.end())
    {
        return -1;
    }
    int shmid = 0;
    size_t total = (size + sizeof(bool)) * length + 2 * sizeof(size_t);
    int shmflg = (shmPermission == SHM_READ) ? 0444 : 0644;
    shmflg = initialize ? (shmflg | IPC_CREAT) : shmflg;
    shmid = shmget(key, total, shmflg);
    if (shmid == -1)
    {
        return -1;
    }
    void* p = shmat(shmid, NULL, 0);
    if (p == (void*)-1)
    {
        return -1;
    }
    if (initialize)
    {
        memset(p, 0, total);
    }
    g_shm[key] = { size, length, shmid, p,
        (size_t*)(reinterpret_cast<size_t>(p) + (size + sizeof(bool)) * length),
        (size_t*)(reinterpret_cast<size_t>(p) + (size + sizeof(bool)) * length + sizeof(size_t)) };
    return 0;
}

int shm_detach(int key)
{
    ite = g_shm.find(key);
    if (ite == g_shm.end())
    {
        return -1;
    }
    if (shmdt(ite->second.pshm) != 0)
    {
        return -1;
    }
    g_shm.erase(key);
    return 0;
}

int shm_delete(int key)
{
    ite = g_shm.find(key);
    if (ite == g_shm.end())
    {
        return -1;
    }
    if (shmctl(ite->second.shmid, IPC_RMID, NULL) != 0)
    {
        return -1;
    }
    g_shm.erase(key);
    return 0;
}

int shm_push(int key, void* p)
{
    if (p == NULL)
    {
        return -1;
    }
    ite = g_shm.find(key);
    if (ite == g_shm.end())
    {
        return -1;
    }
    size_t tail = *ite->second.tail;
    size_t next = (tail == (ite->second.length - 1)) ? 0 : (tail + 1);
    while (!__sync_bool_compare_and_swap(ite->second.tail, tail, next))
    {
        tail = *ite->second.tail;
        next = (tail == (ite->second.length - 1)) ? 0 : (tail + 1);
    }
    memcpy((void*)(reinterpret_cast<size_t>(ite->second.pshm) + tail * (ite->second.size + sizeof(bool))), p, ite->second.size);
    *(bool*)(reinterpret_cast<size_t>(ite->second.pshm) + *ite->second.tail * (ite->second.size + sizeof(bool)) + ite->second.size) = true;
    return 0;
}

int shm_pop(int key, void* p)
{
    if (p == NULL)
    {
        return -1;
    }
    ite = g_shm.find(key);
    if (ite == g_shm.end())
    {
        return -1;
    }
    if (*ite->second.tail == *ite->second.head)
    {
        return -1;
    }
    while (*(bool*)(reinterpret_cast<size_t>(ite->second.pshm) + *ite->second.head * (ite->second.size + sizeof(bool)) + ite->second.size) == false);
    memcpy(p, (void*)(reinterpret_cast<size_t>(ite->second.pshm) + *ite->second.head * (ite->second.size + sizeof(bool))), ite->second.size);
    return 0;
}

int shm_popout(int key)
{
    ite = g_shm.find(key);
    if (ite == g_shm.end())
    {
        return -1;
    }
    if (*ite->second.tail == *ite->second.head)
    {
        return -1;
    }
    *(bool*)(reinterpret_cast<size_t>(ite->second.pshm) + *ite->second.head * (ite->second.size + sizeof(bool)) + ite->second.size) = 0;
    *ite->second.head = (*ite->second.head == (ite->second.length - 1)) ? 0 : (*ite->second.head + 1);
    return 0;
}
//timer.h
#ifndef _TIMER_
#define _TIMER_

#include <sys/time.h>

class CUseTime
{
public:
    CUseTime()
    {
        gettimeofday(&tv, NULL); 
        m_start_time = tv.tv_sec * 1000 + tv.tv_usec / 1000;
    }
    virtual ~CUseTime() {}
public:
    long UseTime() 
    { 
        gettimeofday(&tv, NULL);
        return tv.tv_sec * 1000 + tv.tv_usec / 1000 - m_start_time;
    }
    void Restart()
    {
        gettimeofday(&tv, NULL);
        m_start_time = tv.tv_sec * 1000 + tv.tv_usec / 1000;
    }
private:
    struct timeval tv;
    long m_start_time;
};

#endif // !_TIMER_
//main1.ccp
#include "shm_queue.h"

#include <iostream>
using namespace std;
#include "timer.h"

struct Student
{
    int sex;
    char name[100];
};

int main(int argc, char *argv[])
{
    int key = 5555;
    Student stu;
    struct timeval tv;
    int times = 0;
    CUseTime useTime;
    if (0 == shm_create(key, sizeof(Student), 100, SHM_WRITE, true))
    {
        while (1)
        {
            if (0 == shm_pop(key, &stu))
            {
                shm_popout(key);
                gettimeofday(&tv, NULL);
                ++times;
            }
            if (useTime.UseTime() >= 1000)
            {
                printf("one second to receive %d message\n", times);
                times = 0;
                useTime.Restart();
            }
        }
    }
    else
    {
        cout << "shm_create falied." << endl;
    }

    shm_detach(key);
    shm_delete(key);

    return 0;
}
//main2.cpp
#include "shm_queue.h"

#include <iostream>
#include <unistd.h>
#include <string.h>
using namespace std;
#include <sys/time.h>

struct Student
{
    int sex;
    char name[100];
};

int main(int argc, char *argv[])
{
    int key = 5555;
    int sex = 1;
    struct timeval tv;
    struct timeval tv1;
    gettimeofday(&tv1, NULL);
    if (0 == shm_create(key, sizeof(Student), 100, SHM_WRITE))
    {
        while (1)
        {
            {
                Student stu{ sex, "xiaoming" };
                shm_push(key, &stu);
                gettimeofday(&tv, NULL);
                printf("push time:%ld name %s  sex %d \n", tv.tv_sec * 1000 + tv.tv_usec / 1000, stu.name, sex++);
            }
            if ((tv.tv_sec * 1000 + tv.tv_usec / 1000) - (tv1.tv_sec * 1000 + tv1.tv_usec / 1000) > 10000)
            {
                printf("ten seconds concurrency times: %d\n", sex);
                break;
            }
        }
    }
    else
    {
        cout << "shm_create falied." << endl;
    }
    shm_detach(key);

    sleep(3);

    return 0;
}

参考文章
【共享内存】基于共享内存的无锁消息队列设计

相关文章

  • 基于共享内存的无锁消息队列设计

    一、前言 在前一篇文章中有讲到如何利用CAS原理[https://www.jianshu.com/p/207951...

  • (1)进程间几种通信方式

    管道、消息队列、共享内存、信号量、信号、socket 要知道管道、消息队列、共享内存的本质:内存本质、效率以及传输...

  • linux -11-IPC(共享内存和消息队列)

    XSI IPC之共享内存 和 消息队列(有固定的套路) 共享内存/消息队列/信号量集 遵循相同的规范,因此编程上有...

  • Unix进程间通信比较

    通信方式共享内存 (速度最快、需要进程处理同步)命名管道消息队列SOCKETRPC 同步方式:文件锁信号量MONI...

  • 并发编程

    1.进程间的通讯方式 1.1 共享内存1.2 消息队列1.3 管道1.4 信号量(相当于锁机制) 2. 线程...

  • XSI IPC之消息队列详解

    命令 ipcs 查看消息队列,共享内存,信号量数组的信息命令 ipcrm -q id 销毁一个消息队列 消息队...

  • 原子操作

    无锁 通过CAS+volatile来实现 CAS特点 基于乐观锁设计,不怕修改共享变量,修改了也没事,继续等待就可...

  • 进程通信方式

    管道及有名管道信号消息队列共享内存信号量套接字

  • Android Binder进程间通信机制

    1、概述 Linux传统IPC机制主要有已下几种:管道、消息队列、共享内存Socket等。消息队列和管道采用存储-...

  • 深入浅出Binder进程间通信机制

    1、概述 Linux传统IPC机制主要有已下几种:管道、消息队列、共享内存Socket等。消息队列和管道采用存储-...

网友评论

      本文标题:基于共享内存的无锁消息队列设计

      本文链接:https://www.haomeiwen.com/subject/diadzktx.html