在系统中使用中间件进行消息传递的时候,最头疼的问题就是消息丢失了,虽然我们知道中间件一般都提供了消息持久化和消息确认重试的机制,但是如果要和业务功能结合起来的话,这些往往是不够用的,接下来我会和大家分享下,在我接触过的系统中是如何保证消息可靠性的:
保证消息可靠性的架构图 在这里插入图片描述
思路讲解
结合上面的图,我们来了解下详细的处理流程
涉及到的组件介绍:
- Q1: 业务消息队列,被业务消费者监听
- Q2: 消费者收到消息后会发送一个确认消息到此队列中,这个队列被回调检查服务监听
- Q3: 接收延迟消息的队列,被回调检查服务监听,用来实现超时重试的机制
- Producter: 消息的生产者,也就是我们的应用
- DB: 包括业务数据库、生产者消息数据库、消费者消息数据库
- 回调检查服务:确认消息是否超时消费,如果超时,则会通知生产者重新发送消息,否则将消息写入到消费消息数据库
- 定时检查服务:通过比对生产者消息数据库和消费者数据库,比对那些消息已经发送但是还没有到我们的消费消息数据库中,如果存在,则通知生产者重新发送消息
过程讲解
- 当我们的app应用也就是消息的生产者发送消息之前,首先将消息保存一份到消息数据库中;
- 将消息存到数据库之后,生产者会将消息分别发送到两条消息队列中,第一个是将消息立即发送到我们的业务队列Q1中,这个队列会被业务消费者监听,第二个是发送一个延迟消息到Q3队列中,被回调检查服务监听;
- 业务消费者监听到了生产者发送的消息,如果处理成功,则会发送一个确认消息到Q2队列,Q2队列也被回调检查服务监听;
- 回调检查服务的处理过程是这样的:
- 如果接收到Q2队列的消息,则直接把消息保存到消费消息数据库中
- 如果收到Q3延迟队列的消息,则会检查消费消息数据库中是否已经存在该消息消费成功确认的记录,如果存在,则不做任何处理;如果没有记录,就代表该消息在规定时间内没有被业务消费者进行消费,则当作消息消费超时处理,这个时候会通知生产者重新发送一条该消息到队列中;
- 定时检查服务处理过程是这样的:
- 通过比对生产者消息数据库和消费者消息数据库,然后提取消费者消息数据库中不存在的数据进行下一步处理;
- 判断该消息发送时间和当前时间相差的间隔是否大于我们预定的超时限制,然后将超时的消息通知生产者进行重新发送到队列中进行处理;
喜欢的话,请收藏、点赞哦
网友评论