1. 实现原理
Kafka主题中的每个分区都有一个预写日志(write-ahead log),我们写入Kafka的消息就存储在这里面。这里面的每条消息都有一个唯一的偏移量,用于标识它在当前分区日志中的位置。如下图所示:
Kafka中的每个主题分区都被复制了n次,其中的n是主题的复制因子(replication factor)。这允许Kafka在集群服务器发生故障时自动切换到这些副本,以便在出现故障时消息仍然可用。Kafka的复制是以分区为粒度的,分区的预写日志被复制到n个服务器。 在n个副本中,一个副本作为Leader,其他副本成为Follower。顾名思义,Producer只能往Leader分区上写数据(读也只能从Leader分区上进行),Follower只按顺序从Leader上复制日志。
日志复制算法(log replication algorithm)必须提供的基本保证是,如果它告诉客户端消息已被提交,而当前Leader出现故障,新选出的Leader也必须具有该消息。在出现故障时,Kafka 会从挂掉Leader的 ISR 里面选择一个Follower作为这个分区新的Leader;换句话说,是因为这个Follower是跟上Leader写进度的。
每个分区的Leader会维护一个in-sync replica
(同步副本列表,又称 ISR)。当 Producer往Broker发送消息,消息先写入到对应Leader分区上,然后复制到这个分区的所有副本中。只有将消息成功复制到所有同步副本(ISR)后,这条消息才算被提交。由于消息复制延迟受到最慢同步副本的限制,因此快速检测慢副本并将其从ISR 中删除非常重要。
2. 副本与Leader失去同步的原因?
-
慢副本(Slow replica):
Follower Replica
在一段时间内一直无法赶上Leader的写进度。造成这种情况的最常见原因之一是Follower Replica上的 I/O 瓶颈,导致它持久化日志的时间比它从Leader消费消息的时间要长。 -
卡住副本(Stuck replica):
Follower Replica
在很长一段时间内停止从Leader获取消息。这可能是因为GC停顿,或者副本出现故障。 -
刚启动副本(Bootstrapping replica):当用户给某个主题增加副本因子时,新的
Follower Replicas
是不同步的,直到它跟上Leader的日志。
当副本落后于Leader分区时,这个副本被认为是不同步或滞后的。在Kafka 0.8.2 中,副本的滞后于Leader是根据
replica.lag.max.messages
或replica.lag.time.max.ms
来衡量的。
- replica.lag.max.messages:用于检测慢副本(slow replica)。
- replica.lag.time.max.ms:用于检测卡住副本(stuck replica)。
3. 如何确认某个副本处于滞后状态?
通过replica.lag.time.max.ms
来检测卡住副本(Stuck replica)在所有情况下都能很好地工作。它跟踪Follower副本没有向Leader发送获取请求的时间,通过这个可以推断Follower是否正常。另一方面,使用消息数量检测不同步慢副本(Slow replica)的模型只有在为单个主题或具有同类流量模式的多个主题设置这些参数时才能很好地工作,但我们发现它不能扩展到生产集群中所有主题。
在我之前的示例的基础上,如果主题foo以2 msg/sec的速率写入数据,其中Leader收到的单个批次通常永远不会超过3条消息,那么我们知道这个主题的replica.lag.max.messages
参数可以设置为4。为什么? 因为我们以最大速度往Leader写数据并且在Follower副本复制这些消息之前,Follower的日志落后于Leader不超过3条消息。同时,如果主题foo的Follower副本始终落后于Leader超过3条消息,则我们希望Leader删除慢速Follower副本以防止消息写入延迟增加。
这本质上是replica.lag.max.messages
的目标,能够检测始终与Leader不同步的副本。假设现在这个主题的流量由于峰值而增加,生产者最终往主题foo发送了一批包含4条消息,等于replica.lag.max.messages = 4
的配置值。此时,两个Follower副本将被视为与Leader不同步,并被移除ISR。
但是,由于两个Follower副本都处于活动状态,因此它们将在下一个fetch请求中赶上Leader的日志结束偏移量并被添加回ISR。如果生产者继续向Leader发送大量的消息,则将重复上述相同的过程。这证明了Follower副本进出ISR时触发不必要的错误警报的情况。
replica.lag.max.messages
参数的核心问题是,用户必须猜测如何配置这个值,因为我们不知道Kafka的传入流量到底会到多少,特别是在网络峰值的情况下。
4. 一个参数搞定一切
我们意识到,检测卡住或慢速副本真正重要的事情,是副本与Leader不同步的时间。我们删除了通过猜测来设置的replica.lag.max.messages
参数。现在,我们只需要在服务器上配置replica.lag.time.max.ms
参数即可,这个参数的含义为副本与Leader不同步的时间。
- 如果副本未能在
replica.lag.time.max.ms
时间内发送fetch请求,则会将其视为已死的副本并从ISR中删除。 - 如果副本落后于Leader的时间超过
replica.lag.time.max.ms
,则认为它太慢并且从ISR中删除。
因此,即使在峰值流量下,生产者往Leader发送大量的消息,除非副本始终和Leader保持replica.lag.time.max.ms
时间的落后,否则它不会随机进出ISR。
网友评论