在 Kafka 工作机制 一文提及了 Kafka 消息的不可靠性。本文就 Kafka 消息的三种不可靠性(重复、丢失、乱序),分析它们出现的内部原因和解决办法。
作者:王克锋
出处:https://kefeng.wang/2017/11/22/kafka-reliability/
版权:自由转载-非商用-非衍生-保持署名,转载请标明作者和出处。
1 Kafka 消息的问题
Kafka就比较适合高吞吐量并且允许少量数据丢失的场景,如果非要保证“消息只读取一次”,可以使用JMS。
参考: http://blog.csdn.net/u012050154/article/details/78592854
Kafka Producer 消息发送有两种方式(配置参数 producer.type
):
-
producer.type=sync
(默认值): 后台线程中消息发送是同步方式,对应的类为 kafka.producer.SyncProducer; -
producer.type=async
: 后台线程中消息发送是异步方式,对应的类为 kafka.producer.AyncProducer;优点是可批量发送消息(消息个数达到batch.num.messages=200
或时间达到 “ 时发送)、吞吐量佳,缺点是发送不及时可能导致丢失;
对于同步方式(producer.type=sync
)?Kafka Producer 消息发送有三种确认方式(配置参数 acks
):
-
acks=0
: producer 不等待 Leader 确认,只管发出即可;最可能丢失消息,适用于高吞吐可丢失的业务; -
acks=1
(默认值): producer 等待 Leader 写入本地日志后就确认;之后 Leader 向 Followers 同步时,如果 Leader 宕机会导致消息没同步而丢失,producer 却依旧认为成功; -
acks=all/-1
: producer 等待 Leader 写入本地日志、而且 Leader 向 Followers 同步完成后才会确认;最可靠。
Kafka Consumer 有两个接口:
- Low-level API: 消费者自己维护 offset 等值,可以完全控制;
- High-level API: 封装了对 parition 和 offset 的管理,使用简单;可能遇到 Consumer 取出消息并更新了 offset,但未处理消息即宕机,从而相当于消息丢失;
- 最多一次 -消息可能会丢失,但永远不会重新发送。
consumer.poll(); consumer.commitOffset(); processMsg(messages);
- 至少一次 -消息永远不会丢失,但可能会重新传递。
consumer.poll(); processMsg(messages); consumer.commitOffset();
- 恰恰一次 - 这就是人们真正想要的,每条信息只传递一次。以事务来保证。
2 消息重复
- 根本原因:已经消费了数据,但是 offset 没提交。
- 外在原因:(1)消费数据后、提交 offset 前,线程被杀;
(2)设置 offset 为自动提交,consumer.close() 之前 consumer.unsubscribe();
(3)consumer 取了一批数据,尚未处理完毕时,达到了session.timeout.ms
,导致没有接收心跳而挂掉,自动提交offset失败,下次会重复消费本批消息; - 解决办法:(1)唯一 ID 保存在外部介质中,每次消费时根据它判断是否已处理;
(2)如果在统计用,丢失几条关系不大,则无需理会;
(3)如果消费者来不及处理,可以这样优化:增加分区以提高并行能力;增加消费者线程;关闭自动提交enable.auto.commit=false
3 消息丢失
- 根本原因:已经提交了 offset,但数据在内存中尚未处理,线程就被杀掉。
同步模式下,确认机制设置为-1(不可为1),即让消息写入Leader和Follower之后再确认消息发送成功;
异步模式下,设置为不限制阻塞超时时间(不可为acks=0),当缓冲区满时不清空缓冲池,而是让生产者一直处于阻塞状态;
4 消息乱序
传统的队列,在并行处理时,由于网络故障或速度差异,尽管服务器传递是有序的,但消费者接收的顺序可能不一致;
Kafka 在主题内部有分区,并行处理时,每个分区仅由消费者组中的一个消费者使用,确保了消费者是该分区的唯一读者,并按顺序使用这些数据。
但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序,除非只提供一个分区。
网友评论