1、生产者
1.1 丢失原因:
- kafka生产端异步发送消息后,不管broker是否响应,立即返回,伪代码producer.send(msg),由于网络抖动,导致消息压根就没有发送到broker端;
- kafka生产端发送消息超出大小限制,broker端接到以后没法进行存储;
1.2 解决方案:
- 1、生产者调用异步回调消息。伪代码如下: producer.send(msg,callback);
- 2、生产者增加消息确认机制,设置生产者参数:acks = all。partition的leader副本接收到消息,等待所有的follower副本都同步到了消息之后,才认为本次生产者发送消息成功了;
- 3、生产者设置重试次数。比如:retries>=3,增加重试次数以保证消息的不丢失;
- 4、定义本地消息日志表,定时任务扫描这个表自动补偿,做好监控告警。
- 5、后台提供一个补偿消息的工具,可以手工补偿。
2、Broker
2.1 丢失原因:
kafka broker集群接收到数据后会将数据进行持久化存储到磁盘,消息都是先写入到页缓存,然后由操作系统负责具体的刷盘任务或者使用fsync强制刷盘,如果此时Broker宕机,且选举一个落后leader副本很多的follower副本成为新的leader副本,那么落后的消息数据就会丢失。
2.2 解决方案:
-
1、同步刷盘(不太建议)。同步刷盘可以提高消息的可靠性,防止由于机器掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。但是会严重影响性能。
-
2、利用partition的多副本机制(建议)
unclean.leader.election.enable=false:数据丢失太多的副本不能选举为leader副本,防止落后太多的消息数据而引起丢失;
replication.factor >= 3:消息分区的副本个数,这个值建议设为>=3;
min.insync.replicas >1:消息写入多少副本才算已提交,这个值必须大于1,这个是要求一个leader 至少感知到有至少一个 follower还跟自己保持联系;(replication.factor>min.insync.replicas 这样消息才能保存成功)
3、消费者
3.1 丢失原因:
- 1、消费者配置了offset自动提交参数。enable.auto.commit=true。
- 2、消息者收到了消息,进行了自动提交offset,kafka以为消费者已经消费了这个消息,但其实刚准备处理这个消息,还没处理完成,消费者自己挂了,此时这条消息就会丢失。
- 3、多线程消费消息,某个线程处理消息出现异常,还是会出现自动提交offset。
3.2 解决方案:
- 1、消费者关闭自动提交,采用手动提交offset。通过配置参数:enable.auto.commit=false,关闭自动提交offset,在完成业务逻辑以后手动提交offset,这样就不会丢失数据。
- 2、消费者多线程处理业务逻辑,等待所有线程处理完成以后,才手工提交offset。
- 3、消费者消费消息需要进行幂等处理,防止重复消费。
4、假如kafka挂了,如何保证高可用?
消息生产服务A 所有消息入库,然后通过 定时任务job 直接调用 消息消费服务B。
网友评论