美文网首页
RocketMQ 保证消息不丢失

RocketMQ 保证消息不丢失

作者: markeNick | 来源:发表于2021-06-26 11:17 被阅读0次

    消息从生产到消费,一共经历三个阶段:

    • 生产:Producer创建消息,发送至Broker

    • 存储:Broker将受到的消息存储到磁盘中

    • 消费:Consumer从Broker拉取消息

    要保证消息不丢失就需要解决这三个阶段的消息丢失

    示意图如下

    RocketMQ消息流传的三个阶段.png

    生产阶段

    生产者只要接收到返回的ack,就代表这个阶段的消息未丢失。

    生产者通过网络将消息发送到Broker,然后等待Broker响应ack,此时的网络是不可靠的,极有可能导致消息发不出去,或者Broker在ack时网络故障导致生产者收不到ack

    这个阶段有三种发送消息方式:

    同步:同步发送消息的时候就会阻塞并等待Broker返回ack

    异步:异步发送消息,然后在回调函数中得知Broker是否ack

    单向:单向发送消息,只管发送,不管结果,因此无法保证消息不丢失

    Broker返回的ack状态如下:

    • SendStatus.SEND_OK:发送成功

    • SendStatus.FLUSH_DISK_TIMEOUT:消息发送成功,但刷盘超时

    • SendStatus.FLUSH_SLAVE_TIMEOUT:消息发送成功,但同步到Slave超时

    • SendStatus.SLAVE_NOT_AVAILABLE:消息发送成功,但此时Slave不可用

    发送消息如果失败或者超时,Producer的send方法支持自动重试,默认重试2次,可以通过api修改

    // 设置同步重试次数
    producer.setRetryTimesWhenSendFailed(3);
    
    // 设置异步重试次数
    producer.setRetryTimesWhenSendAsyncFailed(3);
    
    • 如果同步模式发送失败,则轮转到下一个Broker,如果异步模式发送失败,则只会在当前Broker进行重试。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
    • 如果本身向broker发送消息产生超时异常,就不会再重试。

    另一种情况,Broker宕机了,一般生产的Broker是集群部署,有多个master和多个slave节点,当消息发送到某个节点的Broker上,然后宕机,producer收到响应失败,会自动重试。

    总结:Producer如何保证发送阶段消息可达

    失败会自动重试,重试后仍然失败,那么Producer会知道消息没发送成功,这个时候可以进行补偿,或者业务做兜底处理。

    Broker是集群部署,高可用,挂了一个节点仍然可以提供服务

    存储阶段

    Broker收到消息后是先存储在内存中的,然后再持久化到磁盘,Broker刚收到Producer消息存储在内存中,然后发生宕机,就会导致消息丢失

    RocketMQ的持久化消息有两种方式:

    同步刷盘:Broker收到消息后会在持久化到磁盘完成后才发送ack

    异步刷盘:Broker收到消息存到内存后返回ack,然后Broker定期将一组消息持久化到磁盘

    默认是异步刷盘,要保证存储阶段不丢失消息,可以修改为同步刷盘,即确保消息持久化后再ack

    # 默认是:ASYNC_FLUSH,异步刷盘
    flushDiskType = SYNC_FLUSH 
    

    即使使用了同步刷盘,但是Broker刷盘后,磁盘坏了,也会导致消息丢失,不过这种几率应该比较小。

    解决方法就是:不仅同步刷盘,并且保证主从同步后,再ack

    master端

    # 设置同步刷盘才返回ack给producer
    flushDiskType = SYNC_FLUSH
    # 设置同步消息给salve
    brokerRole = SYNC_MASTER
    

    slave端

    # 角色为salve
    brokerRole = slave
    # 设置同步刷盘才返回ack给master
    flushDiskType = SYNC_FLUSH
    

    总结:想要在存储阶段保证消息不丢失,可以同步刷盘和主从同步后再发送ack,但是性能肯定会差

    消费阶段

    在消费时失败了也会导致消息丢失,这个阶段采用重试也可以解决消息不丢失。

    所以必须在业务逻辑完成再返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS

    否则,返回ConsumeConcurrentlyStatus.RECONSUME_LATER,稍后重试即可

    相关文章

      网友评论

          本文标题:RocketMQ 保证消息不丢失

          本文链接:https://www.haomeiwen.com/subject/hixzyltx.html