一、背景
项目开发中免不了各模块或系统之间进行消息通信,目前热门的消息中间件有Redis、RabbitMQ、Kafka、RocketMQ等等。以上几种组件中Redis在消息队列方面表现还可以,但是如果涉及发布订阅功能,就不行了,最近项目就使用了redis的发布订阅,每秒只能发出几千条,虽然目前绰绰有余,但是瓶颈可以预期。
其余的几种都是比较重量级的消息中间件,什么跨平台、分布式、集群、支持N种协议等等,很高大全,我们可能就只使用了其中1、2个功能。严格来说,项目中集成这几种MQ的工作量是不小的,对于中小型系统来说,可能维护MQ稳定的工作量都比项目还大,难度也高,所有功能用全了的程序员恐怕不多。
从长远考虑出发,选择重量级MQ恐怕是板上钉钉的事,但是项目一开始就上这几种,我觉得那也是欠缺考虑的。如果项目根本不要求跨机器通信,那杀鸡就不要用牛刀了。比如,你只是在模块之间、线程之间、进程之间,或者是在同一主机的各种不同系统之间,其实都可以不用重量级MQ。当然你使用了也没事,看个人选择。
最近的项目有这么个场景,采集近所有底层设备,每个设备有点3000个,总共20多万个点需要采集上来。刚开始使用了Redis的发布订阅,但是程序毫无疑问地挂了,根本带不起来;因为程序启动时每个点的值都是从0变成N,就需要发消息出来,那一开始消息是很多的,redis根本处理不完,而且有很高频率的超时断线。以至于想换RabbitMQ,后来想想还是算了,因为那样增加项目难度不说,后期维护也是个难题。
说到底这是模块之间的通信,是主程序(Winform)调用采集C++的DLL类库,发出消息后主程序和web端订阅,在主程序与DLL这边,在DLL方法上增加一个回调函数就搞定了,完全不用走消息中间件,Web端要哪些点的实时值就先ASK,先请求需要看哪些点,如何在主程序这边发布那些点的实时值消息,这样发布订阅的数据量少了2、3个数量级不止。
二、需求
针对上边的业务场景,因为是模块之间的线程间通信,这样搞问题不大;如果是进程之间也要那么高频率的通信,那就不好办了,我们不想使用重量级MQ,又想高频率传输消息,怎么办呢?网上搜索了一番,貌似没看到有成熟的速度又快、体量又小,部署又简单的中间件。
所以在下不才,针对这个问题抛砖引玉,做一个demo出来供大家讨论一下。
三、原理
应题,就是使用内存映射来做同一个机器下各种消息的通信,内存映射比较适合做消息队列,因为消息可以持久化在本地,没读完下次进来还可以接着读。
我预想是这样设计:
1、发布订阅涉及到2个主要方法:Publish(string channel)、Subscribe(string channel, Callback callback);
2、为每个channel生成一个文件:channel.db,默认每个db可以存储1000个同类型的结构体消息作为消息队列,从头部写入,尾部读出。
每个db文件前面留一个索引区作为发布方与订阅方各自的读写位置。发布与订阅前,先读写这个索引区,因为是一对一读写,所以可以完美避开读写锁,大大提高性能。
3、针对一对多需求,单独设计一个config.db文件存储种channel与其相关订阅信息,大概原理图如下:
4、解决读写不加锁问题
我们看结构体:SIndex有三个属性
1) WriteIndex 记录发布方(Pubish)最后写入数据的位置
2) ReadIndex 记录订阅方(Subscribe)最后读取数据的位置
3) Over 表示WriteIndex已达到队列最大值,再WriteIndex小于等于队列最大值前,读写如下图:
WriteIndex达到最大值后再往下写Over就要取反,如由False变为True。WriteIndex=0
如果此时没有订阅方,那新消息就会被抛弃,因为已无空间存储。
4) 如果ReadIndex数值到队列最大值,Over也取反,此时ReadIndex = 0,读写又变成图1所示
5) 读写过程中并不存在互斥的情况,只要管理好读写位置,就可以避免加锁。
四、接口设计
4.1、主要参数定义
#define FM_MAX_CHANNEL 100 // 暂定最多100个不同频道
#define FM_MAX_SUBSCRIBE 3 // 暂定最多3个订阅用户
#define FM_MAX_ROWS 1000 // 暂定最多队列大小为1000
#define FM_DISCONNECT_TIME 5000 // 超过5000毫秒无心跳更新视为订阅断开
#define FM_KEEP_CONN_CYCLE 1000 // 保持心跳连接的时间周期
#define FM_NOTHING -1 // 空白,数组为0等
#define FM_WORD_SIZE sizeof(WORD) // WORD长度
#define FM_INDEX_SIZE sizeof(SIndex) // SIndex长度
4.2、结构体
View Code
4.3、主要方法
// 发布信息
template<typename T>
int Publish(const char *channel, T* data);
// 订阅信息
template<typename T>
void Subscribe(const char *channel, SubscribeCallBackHandle callback);
五、代码实现
5.1 、FMDBManager,主要管理内存映射相关操作,因为是读写位置不一样,所以不需要加互斥量
View Code
5.2、FMDBClient,内存映射客户端,主要封装Publish与Subscribe方法给前端调用,屏蔽复杂性
View Code
请注意上边控制读写的2个方法
bool CanWrite(SIndex *sIndex)
{
int nextWriteIndex = sIndex->WriteIndex + 1;
if(nextWriteIndex > FM_MAX_ROWS) nextWriteIndex = 0;
return nextWriteIndex != sIndex->ReadIndex;
}
bool CanRead(SIndex *sIndex)
{
if(sIndex->Over) return sIndex->ReadIndex > sIndex->WriteIndex;
else return sIndex->ReadIndex + 1 <= sIndex->WriteIndex;
}
我们可以分析一下,下一个WriteIndex值如果大于队列最大值 WriteIndex置0,下一个WriteIndex数值如果不等于
正在读的位置ReadIndex就能写;如果WriteIndex没有超出最大值,只要ReadIndex小于等于WriteIndex就能读,
如果超出,就判断ReadIndex大于WriteIndex就能读。WriteIndex与ReadIndex数值在Publish与Subscribe中维护
5.3、建立新线程获取最新订阅的客户端信息,这个功能主要是动态地像多个Subscribe端发生消息,比如订阅发生在发布之后,
也应该能收到消息。
View Code
六、Demo测试
6.1、Producer.cpp
1 #include "pch.h"
2 #include "../FMDB.h"
3
4 using namespace std;
5
6 int main()
7 {
8 FMClient * client = new FMClient();
9
10 int times = 0;
11 int index = 0;
12 int total = 0;
13 UINT structSize = sizeof(SPerson);
14 DWORD dwStartTmp = GetTickCount();
15
16 while(TRUE)
17 {
18 times++;
19 if(index == 0)
20 {
21 dwStartTmp = GetTickCount();
22 }
23
24 SPerson sPerson = { 0 };
25 sPerson.Idx = index;
26 sprintf_s(sPerson.Name, "Name.%d", index);
27 sPerson.Age = index;
28
29 if(client->Publish("Person", &sPerson) == enumSuccess)
30 {
31 if(index % 2 == 0) total = total + sPerson.Idx;
32 else total = total - sPerson.Idx;
33
34 index++;
35 if(index % 50000 == 0)
36 printf_s("发送条数: %d, 耗时:%d \n", index, (GetTickCount() - dwStartTmp));
37 }
38
39 if(index >= 2000000) break;
40 }
41
42 printf_s("调用次数: %d, 成功条数: %d, 检验值: %d \n", times, index, total);
43 system("pause");
44 }
6.2、Consumer.cpp
1 #include "pch.h"
2 #include "../FMDB.h"
3
4 using namespace std;
5
6 int index = 0;
7 int total = 0;
8 DWORD dwStartTmp = GetTickCount();
9
10 int SubscribeCallback(void *msg)
11 {
12 SPerson * person = (SPerson *)msg;
13
14 if(index == 0)
15 {
16 dwStartTmp = GetTickCount();
17 }
18
19 if(index % 2 == 0) total = total + person->Idx;
20 else total = total - person->Idx;
21
22 index++;
23 if(index % 50000 == 0)
24 {
25 printf("接收条数: %d, 耗时:%d, Idx:%d, Name:%s, Age:%d\n",
26 index, (GetTickCount() - dwStartTmp), person->Idx, person->Name, person->Age);
27 }
28
29 if(index >= 2000000)
30 {
31 return enumBreak;
32 }
33
34 return enumSuccess;
35 };
36
37 int main()
38 {
39 FMClient * client = new FMClient();
40 client->Subscribe("Person", SubscribeCallback);
41
42 printf("接收条数: %d, 检验值: %d \n", index, total);
43 system("pause");
44 }
6.3、运行,测试用例中使用了向队列发送200万条数据,消息大小128字节,订阅端也是接受到200万数据后退出,并且打印检验值。
1) 检验值计算:0+1-2+3-4+ --------- - 2000000 = -1000000,如果队列运行正常,那两边的检验值应该都是是 -1000000.
2) 每5万条打印一次日志,运行情况如下
一对一方式运行三次,分别耗时(毫秒):2886、2979、2871
3) 一对二方式运行三次,分别耗时(毫秒):4087、4009、4040
4)运行过程中产生的文件
6.4、200万数据一对一耗时近3秒,貌似也不是非常快是不是?但是这就是最大速度了吗?
当然不是哦,别忘了这是debug版本,我们切换到release版本看速度会不会有所提升。
一对一运行三次耗时分别是:1224、1373、1326
厉害了,
SPerson结构体128字节,每秒可以处理180万数据,当然实际运用肯定达不到,因为处理其他业务逻辑也要耗时间。
网友评论