美文网首页
MQ随记(2)

MQ随记(2)

作者: 喧嚣城外 | 来源:发表于2019-02-14 16:08 被阅读0次
    如何保证消息不会被重复消费(保证消息消费时的幂等性)
    kafka
    • 按照数据进入kafka的顺序,kafka会给每条数据分配一个offset代表这个数据代号
    • 消费者会提交offset,告诉kafka已消费到多少offset条数据。
    • zk记录消费者当前消费到多少offset条消息。

    遇到的坑:
    消费者不是消费完就提交offset的,而是定时定期提交。
    消费者如果在准备提交offset时,但是还未提交,消费者被重启了,那么此时消费过的数据offset还没有提交,kafka也就不知道已经消费了哪些条消息,一旦消费者启动,就会重复消费。

    幂等性:通俗的讲,就是一个数据或者一个请求,重复几次,确保对应数据不回改变,不能出错。

    保证幂等性结合业务来思考,以下是几个思路:

    • 比如消费到数据用来写库,先查询,如果有了就不插入,update一下。
    • 比如redis,没问题了,每次都set,天然幂等性。
    • 使用唯一键,重复插入报错。

    如何确保消息可靠性传输(如何处理消息丢失问题)?
    rabbitmq

    1.写消息过程,消息都没到rabbitmq在网络传输过程中就丢失了,或者消息到了rabbitmq,但是内部出现错误没有保存下来。
    2.rabbitmq接收到消息后先暂时存在内存中,结果消息还没有被消费,rabbitmq自己挂掉了,导致内存中的消息搞丢。
    3.消费者消费到这个消息,还没有来得及处理,自己挂掉了,但是rabbitmq以为这个消息已经被消费掉了。

    解决写消息丢失:

    • 把channel设置为confirm模式。
    • 发送一个消息。
    • 发送完消息就不管了。
    • rabbitmq如果接收了这条消息,就回调生产者本地的接口
    • 如果接收失败,回调生产者本地的失败接口
    channel.confirm;
    //发送消息
    //回调接口
    public void ack(String message){
    }
    public void nack(String massage){
        //重新发送
    }
    

    rabbitmq生产者这块如果要保证消息不丢失,一般是用connfirm机制,异步的模式,你发送消息后不回阻塞,直接发下一个,吞吐量高一些。

    解决mq自身丢失问题:
    开启消息持久化。

    解决消费者丢失问题:
    关闭autoAck机制,每次自己确定处理完再发送ack给rabbitmq。

    kafka

    消费端弄丢数据:
    唯一可能导致消费者弄丢数据的情况,也就是说,你那个消费到了这个消息,然后消费者那边自动提交了offset,让kafka以为已经消费好了这个消息,但是在准备处理时,消费端挂了,此时消息丢失。
    解决方法:kafka会自动提交offset,那么只要关闭自动提交offset,在处理完后自己手动提交offset,就可以保证数据不丢失,但是此时还是会遇到重复消费问题,自己保证幂等性即可。

    kafka弄丢了数据:
    kafka某个broker宕机,然后重新选举出来的partiton的leader时。如果此时其他follower刚好还有些数据没有同步,结果此时leader挂了,然后选举出某个follower成了leader,就造成数据丢失。
    所以此时一般 要求设置一下 4个参数。

    • 给这个topic设置replication.factor参数:这个值必须大于1,要求每个partition必须有至少两个副本。
    • 在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知一到一个follower还跟自己保持联系,没有掉队。才能确保leader挂了还有一个follower吧。
    • 在producer端设置acks=all:这个要求每条数据,必须写入所有replica之后才能认为写成功。
    • 在producer端设置retries=MAX(很大很大的值,无限重试的意思)
      这样配置后,至少可以在kafka broker端保证leader所在broker发生故障,进行leader切换时,数据不回丢失。

    生产者会不会丢失数据:
    如果按上述思路配置ack=all,一定不会丢失,因为leader接收到消息,所有follower都同步到了消息之后,才认为本次写入成功,如果没有满足这个条件,生产者会自动不断重试,重试无限次。


    如何保证消息消费顺序正确
    rabbitmq如何保证
    给每个消费者开一个queue 图片02.png
    kafka如何保证

    写入一个partition是有顺序的,生产者在写时,可以指定一个key,比如说指定某个订单id作为key,这个订单相关数据一定会被分发到一个partition中去。
    partition只能被一个消费者消费。
    可确保消费者以顺序取出。
    但是可能会出现问题:
    消费者内部多线程,消费者内部可能造成顺序不一致。


    图片02.png
    如何解决消息队列的延时以及过期失效时间?消息队列满了以后怎么处理?有几百万消息持续积压了几个小时,怎么解决?
    如果消息积压了几百万或者上千万数据,及时消费者恢复了,也需要大概1小时时间才可以恢复过来。

    一般这个时候就需要紧急扩容了,具体操作及思路如下:

    • 先修复consumer的问题,确保恢复消费速度,然后将consumer都停掉
    • 新建一个topic,partition是原来的10倍,临时建好原先10倍或者20倍的queue数量
    • 然后写一个临时的分发数据的consumer程序,这个程序部署上去消息积压的数据,消息之后不做耗时处理,直接均匀轮询的写入临时建好的10倍数量的queue
    • 接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据。
    • 这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度消费。
    • 等快速消费完积压数据后,得恢复原先部署架构,重新用原先的consumer机器来消费消息。
    第二个坑(设置消息过期时间)

    等过了高峰期后,这个时候开始写程序,将丢失的那批数据,写一个临时程序,一点点查出来,然后重新灌入mq里,进行补偿。

    第三个坑(mq快满了)

    如果走的方式是消息积压在mq里,那么如果你很长时间都没有处理掉,此时导致mq都快写满了,临时写程序,消费一个丢弃一个,尽快消费掉所有消息,然后写程序进行补偿。


    如何设计一个消息队列的架构
    • 首先这个mq得支持可伸缩性,需要时可快速扩容,就可以增加吞吐量和容量。参照kafka的设计理念,broker->topic->partition,每个partition放一个机器,就存一部分数据。如果资源不够,给topic增加partition,然后数据迁移,增加机器。
    • 其次考虑mq数据落地磁盘,才能保证数据不会丢失,顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写性能是很高的,kafka思路。
    • 考虑mq可用性。参考kafka高可用保障机制。多副本->leader&follower->broker挂了重新选举leader即可对外服务。
    • 能不能支持数据0丢失,参考kafka数据零丢失方案。

    相关文章

      网友评论

          本文标题:MQ随记(2)

          本文链接:https://www.haomeiwen.com/subject/jdveeqtx.html