1. 数据可靠性
Kafka作为一个商业级消息中间件,消息可靠性的重要性可想而知。本文从Producter 往Broker发送消息、Topic分区副本以及Leader选举几个角度介绍数据的可靠性。
1.1 Topic 分区副本
在Kafka 0.8.0 之前,Kafka是没有副本的概念的,那时候人们只会用Kafka存储一些不重要的数据,因为没有副本,数据很可能会丢失。但是随着业务的发展,支持副本的功能越来越强烈,所以为了保证数据的可靠性,Kafka从0.8.0版本开始引入了分区副本(详情请参见 KAFKA-50)。也就是说每个分区可以人为的配置几个副本(比如创建主题的时候指定 replication-factor
,也可以在Broker级别进行配置 default.replication.factor
),一般会设置为3。
Kafka可以保证单个分区里的事件是有序的,分区可以在线(可用),也可以离线(不可用)。在众多的分区副本里面有一个副本是Leader,其余的副本是Follower,所有的读写操作都是经过Leader进行的,同时Follower会定期地去Leader上的复制数据。当Leader挂了的时候,其中一个Follower会重新成为新的Leader。通过分区副本,引入了数据冗余,同时也提供了Kafka的数据可靠性。
Kafka的分区多副本架构是Kafka可靠性保证的核心,把消息写入多个副本可以使Kafka在发生崩溃时仍能保证消息的持久性。
1.2 Producer往Broker发送消息
如果我们要往Kafka对应的主题发送消息,我们需要通过Producer完成。前面我们讲过Kafka主题对应了多个分区,每个分区下面又对应了多个副本;为了让用户设置数据可靠性, Kafka在Producer里面提供了消息确认机制。也就是说我们可以通过配置来决定消息发送到对应分区的几个副本才算消息发送成功。可以在定义Producer时通过acks
参数指定(在 0.8.2.X 版本之前是通过 request.required.acks
参数设置的,详见 KAFKA-3043)。这个参数支持以下三种值:
-
acks = 0:意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入Kafka。在这种情况下还是有可能发生错误,比如发送的对象无能被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。在acks=0模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式, 一定会丢失一些消息。
-
-acks = 1:意味若 Leader 在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。在这个模式下,如果发生正常的Leader选举,生产者会在选举时收到一个LeaderNotAvailableException异常,如果生产者能恰当地处理这个错误,它会重试发送悄息,最终消息会安全到达新的Leader那里。不过在这个模式下仍然有可能丢失数据,比如消息已经成功写入Leader,但在消息被复制到 Follower副本之前Leader发生崩溃。
-
acks = all(这个和 request.required.acks = -1 含义一样):意味着Leader在返回确认或错误响应之前,会等待所有同步副本都收到悄息。如果和
min.insync.replicas
参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到悄息,生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,因为生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。
根据实际的应用场景,我们设置不同的
acks
,以此保证数据的可靠性。
另外,Producer发送消息还可以选择同步(默认,通过producer.type=sync
配置) 或者异步(producer.type=async
)模式。如果设置成异步,虽然会极大的提高消息发送的性能,但是这样会增加丢失数据的风险。如果需要确保消息的可靠性,必须将producer.type
设置为sync。
1.3 Leader选举
在介绍Leader选举之前,让我们先来了解一下ISR(in-sync replicas)
列表。每个分区的Leader会维护一个ISR列表,ISR列表里面就是Follower副本的Borker编号,只有跟得上Leader的Follower副本才能加入到ISR里面,这个是通过replica.lag.time.max.ms
参数配置的,只有ISR里的成员才有被选为Leader的可能。
所以当Leader挂掉了,而且unclean.leader.election.enable=false
的情况下,Kafka 会从ISR列表中选择第一个Follower作为新的Leader,因为这个分区拥有最新的已经committed的消息。通过这个可以保证已经committed的消息的数据可靠性。
综上所述,为了保证数据的可靠性,我们最少需要配置一下几个参数:
- Producer级别:acks=all(或者 request.required.acks=-1),同时发生模式为同步 producer.type=sync。
- Topic级别:设置 replication.factor>=3,并且min.insync.replicas>=2。
- Broker级别:关闭不完全的Leader选举,即unclean.leader.election.enable=false。
2. 数据一致性
这里介绍的数据一致性主要是说不论是老的Leader还是新选举的Leader,Consumer都能读到一样的数据。那么Kafka是如何实现的呢?
假设分区的副本为 3,其中副本0是Leader,副本1和副本2是Follower,并且在ISR列表里面。虽然副本0已经写入了Message4,但是Consumer只能读取到Message2。因为所有的ISR都同步了Message2,只有High Water Mark
以上的消息才支持Consumer读取,而High Water Mark取决于ISR列表里面偏移量最小的分区,对应于上图的副本 2,这个很类似于木桶原理。
这样做的原因是还没有被足够多副本复制的消息被认为是 “不安全” 的,如果Leader发生崩溃,另一个副本成为新Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前Leader(副本 0) 读取并处理了Message4,这个时候Leader挂掉了,选举了副本1为新的Leader,这时候另一个消费者再去从新的Leader读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。
当然,引入了High Water Mark机制,会导致Broker间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms
参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。
网友评论