如何保证消息不会被重复消费(保证消息消费时的幂等性)
kafka
- 按照数据进入kafka的顺序,kafka会给每条数据分配一个offset代表这个数据代号
- 消费者会提交offset,告诉kafka已消费到多少offset条数据。
- zk记录消费者当前消费到多少offset条消息。
遇到的坑:
消费者不是消费完就提交offset的,而是定时定期提交。
消费者如果在准备提交offset时,但是还未提交,消费者被重启了,那么此时消费过的数据offset还没有提交,kafka也就不知道已经消费了哪些条消息,一旦消费者启动,就会重复消费。
幂等性:通俗的讲,就是一个数据或者一个请求,重复几次,确保对应数据不回改变,不能出错。
保证幂等性结合业务来思考,以下是几个思路:
- 比如消费到数据用来写库,先查询,如果有了就不插入,update一下。
- 比如redis,没问题了,每次都set,天然幂等性。
- 使用唯一键,重复插入报错。
如何确保消息可靠性传输(如何处理消息丢失问题)?
rabbitmq
1.写消息过程,消息都没到rabbitmq在网络传输过程中就丢失了,或者消息到了rabbitmq,但是内部出现错误没有保存下来。
2.rabbitmq接收到消息后先暂时存在内存中,结果消息还没有被消费,rabbitmq自己挂掉了,导致内存中的消息搞丢。
3.消费者消费到这个消息,还没有来得及处理,自己挂掉了,但是rabbitmq以为这个消息已经被消费掉了。
解决写消息丢失:
- 把channel设置为confirm模式。
- 发送一个消息。
- 发送完消息就不管了。
- rabbitmq如果接收了这条消息,就回调生产者本地的接口
- 如果接收失败,回调生产者本地的失败接口
channel.confirm;
//发送消息
//回调接口
public void ack(String message){
}
public void nack(String massage){
//重新发送
}
rabbitmq生产者这块如果要保证消息不丢失,一般是用connfirm机制,异步的模式,你发送消息后不回阻塞,直接发下一个,吞吐量高一些。
解决mq自身丢失问题:
开启消息持久化。
解决消费者丢失问题:
关闭autoAck机制,每次自己确定处理完再发送ack给rabbitmq。
kafka
消费端弄丢数据:
唯一可能导致消费者弄丢数据的情况,也就是说,你那个消费到了这个消息,然后消费者那边自动提交了offset,让kafka以为已经消费好了这个消息,但是在准备处理时,消费端挂了,此时消息丢失。
解决方法:kafka会自动提交offset,那么只要关闭自动提交offset,在处理完后自己手动提交offset,就可以保证数据不丢失,但是此时还是会遇到重复消费问题,自己保证幂等性即可。
kafka弄丢了数据:
kafka某个broker宕机,然后重新选举出来的partiton的leader时。如果此时其他follower刚好还有些数据没有同步,结果此时leader挂了,然后选举出某个follower成了leader,就造成数据丢失。
所以此时一般 要求设置一下 4个参数。
- 给这个topic设置replication.factor参数:这个值必须大于1,要求每个partition必须有至少两个副本。
- 在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知一到一个follower还跟自己保持联系,没有掉队。才能确保leader挂了还有一个follower吧。
- 在producer端设置acks=all:这个要求每条数据,必须写入所有replica之后才能认为写成功。
- 在producer端设置retries=MAX(很大很大的值,无限重试的意思)
这样配置后,至少可以在kafka broker端保证leader所在broker发生故障,进行leader切换时,数据不回丢失。
生产者会不会丢失数据:
如果按上述思路配置ack=all,一定不会丢失,因为leader接收到消息,所有follower都同步到了消息之后,才认为本次写入成功,如果没有满足这个条件,生产者会自动不断重试,重试无限次。
如何保证消息消费顺序正确
rabbitmq如何保证
给每个消费者开一个queue 图片02.pngkafka如何保证
写入一个partition是有顺序的,生产者在写时,可以指定一个key,比如说指定某个订单id作为key,这个订单相关数据一定会被分发到一个partition中去。
partition只能被一个消费者消费。
可确保消费者以顺序取出。
但是可能会出现问题:
消费者内部多线程,消费者内部可能造成顺序不一致。
图片02.png
如何解决消息队列的延时以及过期失效时间?消息队列满了以后怎么处理?有几百万消息持续积压了几个小时,怎么解决?
如果消息积压了几百万或者上千万数据,及时消费者恢复了,也需要大概1小时时间才可以恢复过来。
一般这个时候就需要紧急扩容了,具体操作及思路如下:
- 先修复consumer的问题,确保恢复消费速度,然后将consumer都停掉
- 新建一个topic,partition是原来的10倍,临时建好原先10倍或者20倍的queue数量
- 然后写一个临时的分发数据的consumer程序,这个程序部署上去消息积压的数据,消息之后不做耗时处理,直接均匀轮询的写入临时建好的10倍数量的queue
- 接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据。
- 这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度消费。
- 等快速消费完积压数据后,得恢复原先部署架构,重新用原先的consumer机器来消费消息。
第二个坑(设置消息过期时间)
等过了高峰期后,这个时候开始写程序,将丢失的那批数据,写一个临时程序,一点点查出来,然后重新灌入mq里,进行补偿。
第三个坑(mq快满了)
如果走的方式是消息积压在mq里,那么如果你很长时间都没有处理掉,此时导致mq都快写满了,临时写程序,消费一个丢弃一个,尽快消费掉所有消息,然后写程序进行补偿。
如何设计一个消息队列的架构
- 首先这个mq得支持可伸缩性,需要时可快速扩容,就可以增加吞吐量和容量。参照kafka的设计理念,broker->topic->partition,每个partition放一个机器,就存一部分数据。如果资源不够,给topic增加partition,然后数据迁移,增加机器。
- 其次考虑mq数据落地磁盘,才能保证数据不会丢失,顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写性能是很高的,kafka思路。
- 考虑mq可用性。参考kafka高可用保障机制。多副本->leader&follower->broker挂了重新选举leader即可对外服务。
- 能不能支持数据0丢失,参考kafka数据零丢失方案。
网友评论