美文网首页
使用内存映射开发高性能进程间消息通信组件

使用内存映射开发高性能进程间消息通信组件

作者: 编辑小猿 | 来源:发表于2019-07-31 15:11 被阅读0次

    一、背景

    项目开发中免不了各模块或系统之间进行消息通信,目前热门的消息中间件有Redis、RabbitMQ、Kafka、RocketMQ等等。以上几种组件中Redis在消息队列方面表现还可以,但是如果涉及发布订阅功能,就不行了,最近项目就使用了redis的发布订阅,每秒只能发出几千条,虽然目前绰绰有余,但是瓶颈可以预期。

    其余的几种都是比较重量级的消息中间件,什么跨平台、分布式、集群、支持N种协议等等,很高大全,我们可能就只使用了其中1、2个功能。严格来说,项目中集成这几种MQ的工作量是不小的,对于中小型系统来说,可能维护MQ稳定的工作量都比项目还大,难度也高,所有功能用全了的程序员恐怕不多。

    从长远考虑出发,选择重量级MQ恐怕是板上钉钉的事,但是项目一开始就上这几种,我觉得那也是欠缺考虑的。如果项目根本不要求跨机器通信,那杀鸡就不要用牛刀了。比如,你只是在模块之间、线程之间、进程之间,或者是在同一主机的各种不同系统之间,其实都可以不用重量级MQ。当然你使用了也没事,看个人选择。

    最近的项目有这么个场景,采集近所有底层设备,每个设备有点3000个,总共20多万个点需要采集上来。刚开始使用了Redis的发布订阅,但是程序毫无疑问地挂了,根本带不起来;因为程序启动时每个点的值都是从0变成N,就需要发消息出来,那一开始消息是很多的,redis根本处理不完,而且有很高频率的超时断线。以至于想换RabbitMQ,后来想想还是算了,因为那样增加项目难度不说,后期维护也是个难题。

    说到底这是模块之间的通信,是主程序(Winform)调用采集C++的DLL类库,发出消息后主程序和web端订阅,在主程序与DLL这边,在DLL方法上增加一个回调函数就搞定了,完全不用走消息中间件,Web端要哪些点的实时值就先ASK,先请求需要看哪些点,如何在主程序这边发布那些点的实时值消息,这样发布订阅的数据量少了2、3个数量级不止。

    二、需求

    针对上边的业务场景,因为是模块之间的线程间通信,这样搞问题不大;如果是进程之间也要那么高频率的通信,那就不好办了,我们不想使用重量级MQ,又想高频率传输消息,怎么办呢?网上搜索了一番,貌似没看到有成熟的速度又快、体量又小,部署又简单的中间件。

    所以在下不才,针对这个问题抛砖引玉,做一个demo出来供大家讨论一下。

    三、原理

    应题,就是使用内存映射来做同一个机器下各种消息的通信,内存映射比较适合做消息队列,因为消息可以持久化在本地,没读完下次进来还可以接着读。

    我预想是这样设计:

    1、发布订阅涉及到2个主要方法:Publish(string channel)、Subscribe(string channel, Callback callback);

    2、为每个channel生成一个文件:channel.db,默认每个db可以存储1000个同类型的结构体消息作为消息队列,从头部写入,尾部读出。

    每个db文件前面留一个索引区作为发布方与订阅方各自的读写位置。发布与订阅前,先读写这个索引区,因为是一对一读写,所以可以完美避开读写锁,大大提高性能。

    3、针对一对多需求,单独设计一个config.db文件存储种channel与其相关订阅信息,大概原理图如下:

    4、解决读写不加锁问题

    我们看结构体:SIndex有三个属性

    1) WriteIndex 记录发布方(Pubish)最后写入数据的位置

    2) ReadIndex 记录订阅方(Subscribe)最后读取数据的位置

    3) Over 表示WriteIndex已达到队列最大值,再WriteIndex小于等于队列最大值前,读写如下图:

    WriteIndex达到最大值后再往下写Over就要取反,如由False变为True。WriteIndex=0

    如果此时没有订阅方,那新消息就会被抛弃,因为已无空间存储。

    4) 如果ReadIndex数值到队列最大值,Over也取反,此时ReadIndex = 0,读写又变成图1所示

    5) 读写过程中并不存在互斥的情况,只要管理好读写位置,就可以避免加锁。

    四、接口设计

    4.1、主要参数定义

    #define FM_MAX_CHANNEL 100 // 暂定最多100个不同频道

    #define FM_MAX_SUBSCRIBE 3 // 暂定最多3个订阅用户

    #define FM_MAX_ROWS 1000 // 暂定最多队列大小为1000

    #define FM_DISCONNECT_TIME 5000 // 超过5000毫秒无心跳更新视为订阅断开

    #define FM_KEEP_CONN_CYCLE 1000 // 保持心跳连接的时间周期

    #define FM_NOTHING -1 // 空白,数组为0等

    #define FM_WORD_SIZE sizeof(WORD) // WORD长度

    #define FM_INDEX_SIZE sizeof(SIndex) // SIndex长度

    4.2、结构体

    View Code

    4.3、主要方法

    // 发布信息

    template<typename T>

    int Publish(const char *channel, T* data);

    // 订阅信息

    template<typename T>

    void Subscribe(const char *channel, SubscribeCallBackHandle callback);

    五、代码实现

    5.1 、FMDBManager,主要管理内存映射相关操作,因为是读写位置不一样,所以不需要加互斥量

    View Code

    5.2、FMDBClient,内存映射客户端,主要封装Publish与Subscribe方法给前端调用,屏蔽复杂性

    View Code

    请注意上边控制读写的2个方法

    bool CanWrite(SIndex *sIndex)

    {

    int nextWriteIndex = sIndex->WriteIndex + 1;

    if(nextWriteIndex > FM_MAX_ROWS) nextWriteIndex = 0;

    return nextWriteIndex != sIndex->ReadIndex;

    }

    bool CanRead(SIndex *sIndex)

    {

    if(sIndex->Over) return sIndex->ReadIndex > sIndex->WriteIndex;

    else return sIndex->ReadIndex + 1 <= sIndex->WriteIndex;

    }

    我们可以分析一下,下一个WriteIndex值如果大于队列最大值 WriteIndex置0,下一个WriteIndex数值如果不等于

    正在读的位置ReadIndex就能写;如果WriteIndex没有超出最大值,只要ReadIndex小于等于WriteIndex就能读,

    如果超出,就判断ReadIndex大于WriteIndex就能读。WriteIndex与ReadIndex数值在Publish与Subscribe中维护

    5.3、建立新线程获取最新订阅的客户端信息,这个功能主要是动态地像多个Subscribe端发生消息,比如订阅发生在发布之后,

    也应该能收到消息。

    View Code

    六、Demo测试

    6.1、Producer.cpp

    1 #include "pch.h"

    2 #include "../FMDB.h"

    3

    4 using namespace std;

    5

    6 int main()

    7 {

    8 FMClient * client = new FMClient();

    9

    10 int times = 0;

    11 int index = 0;

    12 int total = 0;

    13 UINT structSize = sizeof(SPerson);

    14 DWORD dwStartTmp = GetTickCount();

    15

    16 while(TRUE)

    17 {

    18 times++;

    19 if(index == 0)

    20 {

    21 dwStartTmp = GetTickCount();

    22 }

    23

    24 SPerson sPerson = { 0 };

    25 sPerson.Idx = index;

    26 sprintf_s(sPerson.Name, "Name.%d", index);

    27 sPerson.Age = index;

    28

    29 if(client->Publish("Person", &sPerson) == enumSuccess)

    30 {

    31 if(index % 2 == 0) total = total + sPerson.Idx;

    32 else total = total - sPerson.Idx;

    33

    34 index++;

    35 if(index % 50000 == 0)

    36 printf_s("发送条数: %d, 耗时:%d \n", index, (GetTickCount() - dwStartTmp));

    37 }

    38

    39 if(index >= 2000000) break;

    40 }

    41

    42 printf_s("调用次数: %d, 成功条数: %d, 检验值: %d \n", times, index, total);

    43 system("pause");

    44 }

    6.2、Consumer.cpp

    1 #include "pch.h"

    2 #include "../FMDB.h"

    3

    4 using namespace std;

    5

    6 int index = 0;

    7 int total = 0;

    8 DWORD dwStartTmp = GetTickCount();

    9

    10 int SubscribeCallback(void *msg)

    11 {

    12 SPerson * person = (SPerson *)msg;

    13

    14 if(index == 0)

    15 {

    16 dwStartTmp = GetTickCount();

    17 }

    18

    19 if(index % 2 == 0) total = total + person->Idx;

    20 else total = total - person->Idx;

    21

    22 index++;

    23 if(index % 50000 == 0)

    24 {

    25 printf("接收条数: %d, 耗时:%d, Idx:%d, Name:%s, Age:%d\n",

    26 index, (GetTickCount() - dwStartTmp), person->Idx, person->Name, person->Age);

    27 }

    28

    29 if(index >= 2000000)

    30 {

    31 return enumBreak;

    32 }

    33

    34 return enumSuccess;

    35 };

    36

    37 int main()

    38 {

    39 FMClient * client = new FMClient();

    40 client->Subscribe("Person", SubscribeCallback);

    41

    42 printf("接收条数: %d, 检验值: %d \n", index, total);

    43 system("pause");

    44 }

    6.3、运行,测试用例中使用了向队列发送200万条数据,消息大小128字节,订阅端也是接受到200万数据后退出,并且打印检验值。

    1) 检验值计算:0+1-2+3-4+ --------- - 2000000 = -1000000,如果队列运行正常,那两边的检验值应该都是是 -1000000.

    2) 每5万条打印一次日志,运行情况如下

    一对一方式运行三次,分别耗时(毫秒):2886、2979、2871

    3) 一对二方式运行三次,分别耗时(毫秒):4087、4009、4040

    4)运行过程中产生的文件

    6.4、200万数据一对一耗时近3秒,貌似也不是非常快是不是?但是这就是最大速度了吗?

    当然不是哦,别忘了这是debug版本,我们切换到release版本看速度会不会有所提升。

    一对一运行三次耗时分别是:1224、1373、1326

    厉害了,

    SPerson结构体128字节,每秒可以处理180万数据,当然实际运用肯定达不到,因为处理其他业务逻辑也要耗时间。

    相关文章

      网友评论

          本文标题:使用内存映射开发高性能进程间消息通信组件

          本文链接:https://www.haomeiwen.com/subject/eqhtdctx.html