在使用消息队列的过程中,可能会碰到需要实现延迟消息的功能。延迟消息是指因特殊原因,消息需要在指定的时间才发布。比如发布一个半小时后才开始的活动、执行定时作业等。
本文将从设计上阐述如何实现延迟消息功能,分别使用如下两种方案来实现:
-
基于MySQL+Redis实现
-
基于RocketMQ实现
基于MySQL+Redis实现
直接上图:
延迟消息-Redis实现.png整个设计中,参与者包括以下几部分:
消息发布方:负责消息的生产。
消息消费方:负责消息的接受以及后续业务处理。
MySQL数据库:存储消息的内容,并生产消息ID。
延迟消息处理服务:负责接收延迟消息,并将消息存储到MySQL以及Redis中。同时需要负责延迟消息的处理实现。是最核心的部分。
Redis数据库:负责保存延迟消息列表,就绪消息列表。
下面,将重点分析Redis以及延迟消息处理服务设计。
Redis延迟消息存储设计
在Redis中将延迟消息按执行时间分为两个队列,一个是Delay Queue,一个是Ready Queue。
Delay Queue
Delay Queue 使用的是Redis的Zset来保存,其中Key为消息ID,Score为执行时间的时间戳。所有消息都是先保存到该消息队列中的。
Ready Queue
Ready Queue 可以使用List或Set来保存,设计成每1秒一个Ready Queue,比如key=1589722632,表示存储消息发送时间为“2020-05-17 21:37:12”的所有消息。
虚拟时间轮
同时业务可以按实际情况来约定维护就绪时长,比如5分钟。使用“虚拟时间轮”来表示该就绪时长,在时间轮中,每一格代表一个Ready Queue,若就绪时长5分钟,则时间轮就包含300个格子。
随着时间移动,在时间轮体现为不断的从最近(接近当前时间)的一格中取出所有消息来发送到消费方。
延迟消息处理服务设计
延迟消息处理服务核心的功能为两个定时线程:查询线程,发送线程,两个线程均为每秒执行一次。
查询线程
查询线程工作主要为从Delay Queue查询属于就绪时长内的消息,若就绪时长为5分钟,则查找延迟时间为当前时间+5分钟内的所有消息。
使用Zset来存储最主要是使用排序功能,能方便的查找就绪时长内的消息,同时消息的插入时间复杂度为O(logn)。
每秒执行一次主要是因为消息一直动态更新,需要保证所有消息均能即时发送。
发送线程
发送线程工作主要是从“虚拟时间轮”中取出最近的一格的所有消息,也就是延迟发送时间等于当前秒的所有消息。
查缺补漏线程
在实际过程中,为了避免因网络或处理性能的原因,造成历史时间轮中的消息没有发送的情况。需要额外启动一个线程,负责从MySQL中读取发送时间到,却没有发送的消息进行异常重发。当然,这个消息若一直发送失败也不会永久保存,我们一般会设定一个阈值。
以上就是基于MySQL和Redis实现延迟消息的基本设计,整体实现难度不大。主要缺点个人认为有两个:
-
在消息量较大时,MySQL将成为性能瓶颈。
-
同样在消息量较大时,使用Zset来存储消息,因为Redis底层是使用数组来存储的,频繁的插入和删除数据都有可能导致频繁的数组扩容,而数组扩容是需要申请一块更大的内存空间的。因此对内存的分配要合理评估。
基于RocketMQ实现
直接上图:
延迟消息-Rocket实现.png基于RocketMQ的实现本质上和MySQL+Redis的实现是类似的,都是将所有延迟消息分为延迟队列和就绪队列来实现。
但是这种方案巧妙的利用了RocketMQ的主题以及主题消息队列来实现。下面准备从消息的整个历程来说明下设计方案。
1、消息发布
首先,需要在消息发布方(Producer)实现单独的消息发送组件。在发送延迟消息前,需要在消息的定常部分设置延迟消息的标志。
然后,将消息发送的Topic重写为独立的延迟消息Topic,比如Topic1。
其次,在延迟消息Topic中,消息队列按照消息延迟发送时间分为多个消息队列,比如每30分钟一个一个消息队列。消息发送组件需要根据延迟的消息,选择需要发送的队列。
最后,将消息投递给MQServer,这时候消息发布的Topic和消息队列已被改写,当然还需要在消息中备份原有的主题信息。
2、消息转发
在MQServer端仍是按既有的逻辑,消息被发送到了延迟消息Topic中。
3、延迟消息Topic消费
单独实现一个Consumer,订阅延迟消息Topic。在收到延迟消息通知后,从延迟消息Topic中消费新的消息。然后类似在MySQL+Redis中一样,维护了一个就绪时长内的“虚拟时间轮”。时间轮中每一格的数据都是改时间需要发送的消息。
因为Consumer是单独实现的业务服务,消息也是存放在内存中的,所以为了保证消息不会因服务宕机而丢失,就绪时长不宜过长。
最后,在Consumer中,也需要单独实现消息发送组件,负责将到点的消息取出,并将消息的Topic重置为初始的目标Topic,然后将延迟消息的标志改为否。将消息重新发送给MQServer。
4、初始Topic消费
最终消息的消费业务服务将会收到上一步重新发送过来的消息,然后进行业务处理。
以上就是基于RocketMQ实现延迟消息的设计,整体思路变化不大。主要有以下的改进:
-
无需引入额外的存储服务,消息就存储在RocketMQ中,而RocketMQ是支持亿级消息量的存储挤压的。
-
整个设计实现对上下游服务和RocketMQ都是透明的。
网友评论