前言
这篇文章我们讨论一下消息中间件如何保证消息不丢失,其实这是分布式系统面临的一个基本问题。
分布式系统必然涉及网络数据传输和数据存储,在传输方面数据会面临网络异常,
在存储方面数据会面临单点故障、磁盘损坏以及服务重启等故障,从而导致数据丢失。
所以,如何确保数据不丢失便成了所有分布式系统面临的一个基本问题。
接下来,我们以消息中间件为例,首先讨论一下消息丢失的原因,然后讨论它的解决方案,最后我们在简单了解一下Kafka具体是如何解决消息丢失的问题。
问题
avatar上图是最简单的消息生产与消费模型图,图中生产者表示消息的发送方,消费者表示消息的接受方,箭头表示的是网络连接,中间的消息中间件用于传输和存储消息。
我们知道,当消息从生产者发送到消息中间件的过程中,可能会因网络问题导致消息发送失败,如果没有其它措施,那么消息就会丢失。
即使消息成功到达消息中间件,也会因为存储方面的问题,导致消息丢失,比如:不对消息持久化处理而是只存在内存中,那么服务重启便会导致消息丢失,即使做持久化处理存储在磁盘,也有可能因磁盘故障导致消息丢失。
退一步说,如果传输和存储都正常,那么也会因为消费机制的问题,导致消息丢失,比如:消息中间件向消费者投递消息但消费者还未确认消息时就将数据删除或更新消息的消费偏移量。
所以,任意一款消息中间件无论是Kafka、Rabbitmq还是RocketMQ,为了保证消息不丢失,都必须处理这三方面的问题:网络、存储、消费机制。
方案
针对以上三个方面的问题,各消息中间件解决问题的逻辑都是一样的,都是采用确认机制解决网络传输的问题,采用持久化和副本解决存储的问题,采用提交机制解决消费机制方面的问题。
下面,我们分别来看一下对应问题的解决方案。
确认机制
avatar从上面的问题分析中,我们已经知道:生产者将消息发往消息中间件的过程中,可能会因网络问题导致消息丢失;
之所以说可能丢失,是因为一次完整的消息发送过程包含发送和响应二个阶段,如果消息在发送阶段发生网络异常,那么消息会丢失,但如果消息在响应阶段出现网络异常,那么消息未必丢失。
因此,为了保证消息不丢失,可能的解决方式有两种:第一、出现异常时重新发送消息不管消息之前是否发送成功,第二、出现异常时先查询消息状态然后再根据情况选择是否重试。
因为,消息的数据量不是很大,所以一般情况下消息中间件会选择第一种方式:当生产者没有收到消息中间件(接收者)的确认回复(ACK)时,生产者就不断地进行重试,直到收到ACK或者重试超过一定的阀值为止。
ACK是一种消息发送确认机制——发送者和接收者约定,在消息发送过程中,如果接收者收到了某条信息,那么就向发送者回复一个ack字段,用该字段用来标示消息是否成功被接收者处理或存储。
持久化
通过确认机制我们可以确保消息可以成功到达消息中间件,但如果消息中间件存储消息的方式是采用内存,或者是异步定时持久化到磁盘,那么消息也是有可能丢失的。
因为,当服务器重启时,内存中的数据便会丢失,即使是异步刷盘也会造成部分数据来不及持久化而丢失,之所以将消息存在内存中是为了提高消息发送和消费的处理效率。
因此,为了保证数据(消息)不在消息中间件中丢失同时又能保证消息的发送和消费效率,大多数消息中间件以及数据库都会采用追加日志的方式顺序地将变化的数据先写入到日志文件中。
这样,通过顺序地将数据写入日志文件中,既保证了数据持久化也保证了数据处理的效率。
副本
虽然持久化可以保证发送成功的消息不丢失,但是如果出现磁盘损坏或者单点故障的情况,那么相当于消息丢失了。
因此,在其它服务器上冗余相应的数据,便成了分布式数据库软件,解决单点故障的唯一方案。
实现方式有两种一种是客户端往不同的数据服务器发送同一份数据,直到这些服务器都收到为止;另一个方式是将数据发往集群中的Leader节点,然后由Leader同步到Follower中,然后再响应客户端的请求。
其中第二种方式是主流的数据副本的处理方式,其实现原理也很简单,如下图所示:
当消息中间件中的Leader节点收到Producer发过来的消息后,它会将该消息同步给它的Follower节点,当所有的Follower节点都收到了同步的数据后,Leader便响应Producer的请求。
当然,同步的Follower数量以及是同步进行还是异步,这些要根据实际情况来决定。
提交机制
当保证了消息在生产者和消息中间件中不会丢失后,最后我们还需要保证在消费者消费消息的时候消息也"不丢失"。
这里的不丢失,其实是每一条消息都要能被消费者消费不能出现遗漏,这就和具体中间件的消费机制有关。
最常见的一种消息消费机制,是通过Offset来维护消费者消费消息的进度。
比如说,消息中间件中有一个消息队列A,里面有从0到100编好序号的100条消息,有一个消费者B,它要从队列中获取消息;
假设,在没有Offset的情况,消费者B从0开始每次拉取一条处理一条,当它拉去到50时,此时消费者B挂了,重启之后从什么地方开始?
也许你会说记录一下消息的消费进度?对,这个消费进度就是Offset,但它不能由消费者进行维护而是由消息中间件进行维护。
那么接着的问题便是,消息中间件在什么时候跟新Offset呢?如果在消息中间件将消息投递给消费者后,但消费者还未处理之前,消息中间件就跟新offset,
那么在消费者挂了重启后,就会出现某些消息没有消费到的情况。显然,只能将Offset的更新即提交Offset的权限交给消费者,这样消费者消费完数据后主动请求更新Offset便不会导致消息消费不到的情况出现。
Kafka
Kafka的配置中有一配置项spring.kafka.producer.acks,它有三个可选值如下所示:
ack=0:相当于不需要消息确认,发出去之后就不管Broker是否收到,哪怕网络异常丢失也不管。
ack=1(默认):只要Leader节点收到了消息,就返回ACK,不管Follower有没有收到。
ack=-1(ALL):要求Leader和所有正常的Follower都收到,才放回ACK。
Kafka的Offset提交机制有两种方式,一种是自动提交,一种是手动提交。
自动提交是在一定时间间隔后,Kafka客户端将当前消费者的Offset进度,在后端自动提交给Broker,这不仅会存在消息丢失的可能,而且还有可能导致消息被重复消费。
因此,对消息丢失敏感的应用都会选择手动提交,虽然也有可能导致消息重复消费,但最少可以保证消息没有丢失的可能。
Kafka在持久化和副本方面的处理方式,大致的逻辑上面已经提到过了,具体实现细节比较复杂,由于篇幅有限就不详细细说了。
总结
为了保证消息不丢失,那么生产者在消息发送的时候需要有确认机制,消费者在消费消息的时候需要有提交机制,以及消息中间件需要对消息做持久化处理以及采用多副本策略存储消息,这样就可以保证绝大多数的消息是不会丢失的。
之所以说绝大多少,是因为消息在生产者这边如果采用的是异步的方式而且没有做持久化存储,那么消息也是有丢失的可能的。
网友评论