生产者根据消息key以及“路由策略”将消息发往对应topic分区,消息数据在broker服务端按照指定备份策略“持久化”保存起来,消费者不断从服务端拉取消息消费,并提交“消费位移”,整个过程需要关注消息的“重复消费”以及“丢失”问题。
路由策略
hash、轮询or自己实现
Java客户端默认的生产者路由策略的实现类org.apache.kafka.clients.producer.internals.DefaultPartitioner。默认策略为:如果指定了partition就直接发送到该分区;如果没有指定分区但是指定了key,就按照key的hash值选择分区;如果partition和key都没有指定就使用轮询策略。而且如果key不为null,那么计算得到的分区号会是所有分区中的任意一个;如果key为null并且有可用分区时,那么计算得到的分区号仅为可用分区中的任意一个。
注意kafka只保证分区层面的消息有序性,topic层面并不保证。
消息持久化
Kafka 只对“已提交”的消息(committed message)做“有限度”的持久化保证。
Topic分区Partion,每个分区又有多个副本Repication,消息在broker中持久化到日志文件,多个follower副本异步从leader副本中同步消息。
如何持久化:每一个topic的分区对应一个目录,该分区的消息追加写到该目录下日志文件,用户可以分配日志保留策略和切割策略。由于采用了顺序写,也就是WAL技术(类似mysql里redolog的使用),性能较高。
消费位移提交
每个消息在服务端对应一个位移Offset,服务端利用位移查找消息。另外消费者需要将自己消费位移在合适时机提交,下次消费时从上次消费位移之后继续消费,即使有宕机。
老版本位移提交依赖于zk,由于zk其实不适合频繁写入,所以新版本kafka,把位移管理重新实现;消费者将消费位移信息写入特殊topic :__consumer_offsets。位移主题的 Key 中保存 3 部分内容::<Group ID,主题名,分区号 >
- 自动提交
消费者poll时提交上一次的位移,另外也有后台线程每隔xx时间(auto.commit.interval.ms)提交一次位移,保证最小的提交间隔。 - 手动提交(同步提交、异步提交)
注意关注CommitFailedException异常 - 细粒度提交
commitSync(Map<TopicPartition, OffsetAndMetadata>)
消息丢失
- 生产者异步发送,消息还没发出时就挂掉
需要关注异常回调 - 消息poll后就先提交位移,消息的处理过程中消费者挂掉
消息处理完成才提交位移(也会引入重复消费) - unclean配置为true,让落后的broker成为leader,丢失消息
在可用性与数据一致性之间做出抉择,若不允许数据丢失,则unclean配置置为false。
另外在生产端以及broker服务端合理配置,设置ack=all,并保证replication.factor > min.insync.replicas>1,保证数据一直冗余备份 - 老版本broker分区之间数据同步机制导致
社区在 0.11 版本正式引入了 Leader Epoch 概念,来规避因高水位更新错配导致的各种不一致问题。
消息重复消费
- 网络问题导致生产端未收到broker回复的ack,引起重发
- 消费者消费过程中挂掉,未提交消费位移,导致重复消费
- 消费reblance,一个正在消费信息时reblance,消息被另一个消费者又消费
可以采用幂等性producer,并且在消费端业务处理逻辑上也支持幂等性
消息交付可靠性保障
消息交付有三种语义:
- 最多一次
- 至少一次
- 精准一次
kafka默认是至少一次语义,想要实现精准一次,需要保证幂等性。幂等性就是任意多次执行所产生的影响均与一次执行的影响相同。
kafka提供了幂等性producer以及事务性producer。幂等性producer底层保证了消息去重,但是其只保证了单分区,单会话上的幂等性。kafka用事务性producer解决上述局限,Kafka 自 0.11 版本开始提供了对事务的支持,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
另外即使kafka保证了消息唯一性,在极端情况下消费端也还会有重复消费问题,消费代码逻辑也需要保证幂等性。
另外补充一点,RocketMQ也实现了事务,但是其事务更倾向于保证本地其他操作与发送消息操作之间原子性。kafka的事务更倾向于保证多条消息原子性地写入到目标分区。
需要注意的细节
-
生产者批量发送、消费者批量拉取
提升吞吐量,降低网络损耗,但是提高了延时 -
压缩
时间换空间,cpu换io;同样提升吞吐量,降低网络损耗,但是提高了延时
尽量避免生产端与消费端采用不同压缩算法,这样会造成broker端解压再重新压缩
正常情况下,broker只解压对消息做简单校验。 -
broker端对采用零拷贝技术(从PageCache到socket缓冲区)
提升broker端处理效率,提升吞吐量、降低延时。
尽量保证生产者、消费者、broker服务端版本一致,防止零拷贝技术失效。
网友评论