kafka无消息丢失配置

作者: 献给记性不好的自己 | 来源:发表于2017-04-21 17:00 被阅读458次

    kafka在配置不合理的情况,会丢失消息,当业务的需求不允许消息丢失的场景,就需要配置kafka的一些参数来保证消息不丢失,但是鱼和熊掌不可兼得,所以如果保证消息不丢失,就要牺牲TPS(吞吐量)
    kafka消息丢失分为两种,一是Producer丢失消息,二是Consumer端丢失消息

    一、Producer端配置

    1、acks设置为-1

    设置为0表示producer不需要等待任何确认收到的信息,副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)
    设置为1意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失
    设置为-1意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。

    2、retries设置大一些

    设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。

    3、max.in.flight.requests.per.connection设置为1

    kafka可以在一个connection中发送多个请求,叫作一个flight,这样可以减少开销,但是如果产生错误,可能会造成数据的发送顺序改变,默认是5 (修改)
    这个参数其实为了避免消息乱序,如果你的场景不需求消息顺序,可以不设置此值,设置此值为1表示kafka broker在响应请求之前client不能再向同一个broker发送请求

    4、使用KafkaProducer.send(record, callback)方法而不是send(record)方法

    自定义回调逻辑处理消息发送失败

            >producer.send(new ProducerRecord<String, String>(topic, key, msg),
                    new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata metadata,
                                Exception exception) {
    //处理消息发送失败的业务逻辑
                        }
                    });
    

    5、min.insync.replicas > 1

    消息至少要被写入到这么多副本才算成功,也是提升数据持久性的一个参数。与acks配合使用

    6、replication.factor > min.insync.replicas

    如果两者相等,当一个副本挂掉了分区也就没法正常工作了。通常设置replication.factor = min.insync.replicas + 1即可

    二、Consumer端

    消费者端防止消息丢失相对简单,在消息处理完成前就提交了offset这样就会有可能造成消息丢失,所以要在消息处理完成后在提交offset
    首先要关闭enable.auto.commit=false,此参数默认是true,所以需要改成false,然后需要在你消费消息后在提交offset

    相关文章

      网友评论

        本文标题:kafka无消息丢失配置

        本文链接:https://www.haomeiwen.com/subject/tevdzttx.html