美文网首页
消息队列-3 如何保证消息可靠性(RabbitMQ)

消息队列-3 如何保证消息可靠性(RabbitMQ)

作者: NeXt4 | 来源:发表于2020-12-24 16:57 被阅读0次

    消息丢失的场景

    • 消息发送时消息丢失
    • 路由消息时消息丢失
    • 消息未持久化消息丢失
    • 消费消息时消息丢失

    消息发送可靠性

    AMQP协议提供的一个事务机制
    一般不使用,影响吞吐量

    发送方确认机制(publisher confirm)

    首先生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一deliveryTag和multiple参数),这就使得生产者知晓消息已经正确到达了目的地了。

    Confirm模式有三种方式实现

    1. 串行confirm模式:producer每发送一条消息后,调用waitForConfirms()方法,等待broker端confirm,如果服务器端返回false或者在超时时间内未返回,客户端进行消息重传。
    2. 批量confirm模式:producer每发送一批消息后,调用waitForConfirms()方法,等待broker端confirm。
    3. 异步confirm模式:提供一个回调方法,broker confirm了一条或者多条消息后producer端会回调这个方法。 我们分别来看看这三种confirm模式

    异步Confirm模式

    package com.rabbitmq.client;
    import java.io.IOException;
    
    /**
     * Implement this interface in order to be notified of Confirm events.
     * Acks represent messages handled successfully; Nacks represent
     * messages lost by the broker.  Note, the lost messages could still
     * have been delivered to consumers, but the broker cannot guarantee
     * this.
     */
    public interface ConfirmListener {
        void handleAck(long deliveryTag, boolean multiple) throws IOException;
        void handleNack(long deliveryTag, boolean multiple) throws IOException;
    }
    
    

    这里就需要在发送消息之前将消息存储起来,以便于在ConfirmListener中处理消息发送成功和失败的情况,可以存储到数据库或者Redis中。

    如果发送失败则需要进行消息重发,重试超过一定次数后仍然失败则需要记录日志,告警,人工处理。

    消息端如何保证消息可靠性

    手动确认机制

    消费者消费完毕后手动地向 Broker 发送确认通知,Broker 收到确认通知后再从队列中删除对应的消息。

    重试

    在消息消费处理逻辑中加入重试机制,以处理一些被调用服务网络抖动等情况导致的消息消费失败的情况。

    如何重试超过一定次数后仍然失败则将消息发送到死信队列。

    死信队列

    1、消息被否定确认使用 channel.basicNackchannel.basicReject ,并且此时requeue 属性被设置为false

    2、消息在队列中的时间超过了设置的TTL(time to live)时间。

    3、消息数量超过了队列的容量限制。

    当一个队列中的消息满足上述三种情况任一个时,该消息就会从原队列移至死信队列,若改队列没有绑定死信队列则消息被丢弃。

    死信队列和普通的业务队列没有什么差别,只不过是业务上创建用来存储处理失败的消息的队列。所以其工作方式也和业务队列相同,死信仍然需要交换机的转发到达死信队列。

    根据实际的业务情况,我们可以创建专门的死信消费者对死信进行处理,或者进行人工补偿。

    如何保证消息100%被消费

    举个例子,用户注册赠送积分,这里赠送积分是通过消息队列进行解耦。

    解决方案一、消息落库 + 定时任务 + 幂等 + 重试 + 人工补偿

    用户表 + 消息表,在同一个事务中存储用户注册数据和赠送积分数据。

    在事务之外执行消息发送,通过发送端confirm机制保证消息发送成功。

    消费端消费消息,消费完成后进行手动ack, 这里也会出现ack时消息队列server突然宕机的情况,这时就需要保证消费端消费消息需要实现幂等(因为消息会被重发)。消息消费成功后将消息表中的消息状态设置为完成。

    定时任务,定时扫描未处理的消息,进行消息重发,重发超过一定次数后标记为失败,转人工处理。

    解决方案二、延迟投递 + 回调检查
    上游服务完成业务处理后,发送两条消息,一条给下游服务进行业务处理,如赠送积分业务,另一条给callback服务。
    下游服务接收到业务消息并处理完成之后就直接发送一条消息给callback服务,callback服务接收到消息后就知道刚才有一条消息被成功处理了,callback服务把这条消息持久到数据库中,当上游服务之前发送的延迟消息到达callback服务时进行数据库检查,如果存在则说明消息被成功消费了,如果不存在则通过PRC调用通知上游服务有消息没有处理,上游服务重新发送业务消息和延迟确定消息进行重试。

    Step 1: 上游服务业务处理
    上游服务  --- 【a.业务消息】       ---->  下游服务
    上游服务  --- 【b.延迟确认消息】    ---->  callback服务
    
    Step 2: 下游服务业务处理
    下游服务  --- 消费a消息  ---- 【c.消费确认消息】 ---> callback服务
    
    Step 3: 消息处理情况持久化
    callback服务  --- 消费【c. 消费确认消息】 ---持久化到DB(a消息已被成功消费)
    
    Step 4: 上游服务检查消息处理情况
    callback服务  --- 消费【b.延迟确认消息】 --- 检查DB
    
    Step 5: 重试
    callback服务  --- 检查DB 通过 --- 完成
    callback服务  --- 检查DB 不通过 --- RPC通知上游服务
    

    虽然这种方案也是无法做到 100% 的可靠传递,在特别极端的情况,还是需要定时任务和补偿机制进行辅助。但是该方案的核心是减少数据库操作,这个点很重要,因为这是在高并发的场景下,主要考虑性能。当然我们还是要补偿机制,即可以做到最终一致性。

    RabbitMQ 100% 投递成功方案详解

    相关文章

      网友评论

          本文标题:消息队列-3 如何保证消息可靠性(RabbitMQ)

          本文链接:https://www.haomeiwen.com/subject/kxflnktx.html