美文网首页
RocketMQ 8.可靠性消息投递

RocketMQ 8.可靠性消息投递

作者: 香沙小熊 | 来源:发表于2021-01-14 15:33 被阅读0次

方案1:消息落库

消息落库重发是基于RocketMQ的confirm机制,在消息发送失败后自动重发。该方案只能保证消息从生产者到MQ之间的可靠性投递

image.png
image.png
  • Step 1: 首先把消息信息(业务数据)存储到数据库中,紧接着,我们再把这个消息记录也存储到一张消息记录表里(或者另外一个同源数据库的消息记录表)

  • Step 2:发送消息到MQ Broker节点(采用confirm方式发送,会有异步的返回结果)

  • Step 3、4:生产者端接受MQ Broker节点返回的Confirm确认消息结果,然后进行更新消息记录表里的消息状态。比如默认Status = 0 当收到消息确认成功后,更新为1即可!

  • Step 5:但是在消息确认这个过程中可能由于网络闪断、MQ Broker端异常等原因导致 回送消息失败或者异常。这个时候就需要发送方(生产者)对消息进行可靠性投递了,保障消息不丢失,100%的投递成功!(有一种极限情况是闪断,Broker返回的成功确认消息,但是生产端由于网络闪断没收到,这个时候重新投递可能会造成消息重复,需要消费端去做幂等处理)所以我们需要有一个定时任务,(比如每5分钟拉取一下处于中间状态的消息,当然这个消息可以设置一个超时时间,比如超过1分钟 Status = 0 ,也就说明了1分钟这个时间窗口内,我们的消息没有被确认,那么会被定时任务拉取出来)

  • Step 6:接下来我们把中间状态的消息进行重新投递 retry send,继续发送消息到MQ ,当然也可能有多种原因导致发送失败

  • Step 7:我们可以采用设置最大努力尝试次数,比如投递了3次,还是失败,那么我们可以将最终状态设置为Status = 2 ,最后 交由人工解决处理此类问题(或者把消息转储到失败表中)。
    数据库库表结构:订单表和消息记录表

-- 表 order 订单结构
CREATE TABLE IF NOT EXISTS `t_order` (
  `id` varchar(128) NOT NULL, -- 订单ID
  `name` varchar(128), -- 订单名称 其他业务熟悉忽略
  `message_id` varchar(128) NOT NULL, -- 消息唯一ID
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- 表 broker_message_log 消息记录结构
CREATE TABLE IF NOT EXISTS `broker_message_log` (
  `message_id` varchar(128) NOT NULL, -- 消息唯一ID
  `message` varchar(4000) DEFAULT NULL, -- 消息内容
  `try_count` int(4) DEFAULT '0', -- 重试次数
  `status` varchar(10) DEFAULT '', -- 消息投递状态  0 投递中 1 投递成功   2 投递失败
  `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',  -- 下一次重试时间 或 超时时间
  `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', -- 创建时间
  `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', -- 更新时间
  PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

创建订单方法
@Service
public class OrderService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private BrokerMessageLogMapper brokerMessageLogMapper;
    
    @Autowired
    private RocketMQProducer rocketMQProducer;
    
    
    //创建订单
    public void createOrder(Order order) {
        //使用当前时间当做订单创建时间
        Date orderTime = new Date();
        //插入业务数据
        orderMapper.insert(order);
        //插入消息记录表数据
        BrokerMessageLog brokerMessageLog = new BrokerMessageLog();
        //消息唯一ID
        brokerMessageLog.setMessageId(order.getMessageId());
        //保存消息整体
        brokerMessageLog.setMessage(JSONObject.toJSONString(order));
        //设置消息状态为0 表示发送中
        brokerMessageLog.setStatus("0");
        //设置消息未确认超时时间窗口为一分钟
        brokerMessageLog.setNextRetry(DateUtils.addMinutes(orderTime, Constants.ORDER_TIMEOUT));
        brokerMessageLog.setCreateTime(new Date());
        brokerMessageLog.setUpdateTime(new Date());
        brokerMessageLogMapper.insert(brokerMessageLog);
        
        //发送消息
        rocketMQProducer.sendOrder(order);

    }
}
消息生产者
@Component
@Component
public class RocketMQProducer {

    private static final String PRODUCER_GROUP_NAME = "producer_group_name";
    private final DefaultMQProducer producer;

    @Autowired
    private BrokerMessageLogMapper brokerMessageLogMapper;


    private RocketMQProducer() {
        this.producer = new DefaultMQProducer(PRODUCER_GROUP_NAME);
        this.producer.setNamesrvAddr(NAMESRV_ADDR);
        start();
    }

    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public SendResult sendMessage(Message message) {
        SendResult sendResult = null;
        try {
            sendResult = this.producer.send(message);
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return sendResult;
    }

    public void shutdown() {
        this.producer.shutdown();
    }

    public void sendOrder(Order order) {

        //1.创建消息
        Message message = new Message("test_quick_topic",// 主题
                "TagA",// 标签
                "KeyA",// 用户自定义的key,唯一的标识
                FastJsonConvertUtil.convertObjectToJSON(order).getBytes()); //消息内容实体(byte[])

        try {
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.err.println("msgId: " + sendResult.getMsgId() + ", status: " + sendResult.getSendStatus());

                    Order object = FastJsonConvertUtil.convertJSONToObject(message.getBody().toString(), Order.class);

                    //如果confirm返回成功 则进行更新
                    brokerMessageLogMapper.changeBrokerMessageLogStatus(object.getMessageId(), Constants.ORDER_SEND_SUCCESS, new Date());

                }

                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                    //失败则进行具体的后续操作:重试 或者补偿等手段
                    System.err.println("异常处理...");
                }
            });
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
定时任务
@Component
public class RetryMessageTasker {
    @Autowired
    private RocketMQProducer rocketMQProducer;
     
    @Autowired
    private BrokerMessageLogMapper brokerMessageLogMapper;
     
    @Scheduled(initialDelay = 5000, fixedDelay = 10000)
    public void reSend(){
        System.err.println("----------------定时任务开始----------------");
        //pull status = 0 and timeout message 
        List<BrokerMessageLog> list = brokerMessageLogMapper.query4StatusAndTimeoutMessage();
        list.forEach(messageLog -> {
            if(messageLog.getTryCount() >= 3){
                //update fail message 
                brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLog.getMessageId(), Constants.ORDER_SEND_FAILURE, new Date());
            } else {
                // resend 
                System.out.println("进来更改次数");
                brokerMessageLogMapper.update4ReSend(messageLog.getMessageId(),  new Date());
                Order reSendOrder = JSONObject.parseObject(messageLog.getMessage(), Order.class);
                try {
                    rocketMQProducer.sendOrder(reSendOrder);
                } catch (Exception e) {
                    e.printStackTrace();
                    System.err.println("-----------异常处理-----------");
                }
            }            
        });
    }
}
方案1的优点:
  • 能保证消息可靠性投递
方案1 的缺点:
  • 发送消息前需要2次DB操作,影响并发性能
  • 只能保证消息从生产者到MQ的可靠性投递

方案2:二次确认检测

二次确认检测是基于延时投递机制实现的,该方案能够保证消息从生成者端到消费者的可靠性投递。
同时对比方案一,生产者操作数据库的次数相对较少,并发性能较高。
image.png
image.png
  • 1.先将业务数据进行入库,然后生产端将消息发送出去,注意一定是等数据库操作完成之后再去发送消息

  • 2.在发送消息之后,紧接着生产端再次发送一条消息(Second Send Delay Check),即延迟消息投递检查,这里需要设置一个延迟时间,比如5分钟之后进行投递

  • 3.消费端去监听指定队列,将收到的消息进行处理

  • 4.处理完成之后,发送一个confirm消息,也就是回送响应,但是这里响应不是正常的ACK,而是重新生成一条消息,投递到MQ中

  • 5.上面的Callback service是一个单独的服务,其实它扮演了方案一的存储消息的DB角色,它通过MQ去监听下游服务发送的confirm消息,如果Callback service收到confirm消息,那么就对消息做持久化存储,即将消息持久化到DB中

  • 6.5分钟之后延迟消息发送到MQ了,然后Callback service还是去监听延迟消息所对应的队列,收到Check消息后去检查DB中是否存在消息,如果存在,则不需要做任何处理,如果不存在或者消费失败了,那么Callback service就需要主动发起RPC通信给上游服务,告诉它延迟投递的这条消息没有找到,需要重新发送,生产端收到信息后就会重新查询业务消息然后将消息发送出去

方案2的缺点:
  • 方案2不一定能保障百分百投递成功,但是基本上可以保障大概99.9%的消息是OK的,有些特别极端的情况只能是人工去做补偿了,或者使用定时任务去做
方案2的优点:
  • 方案2主要目的是为了减少数据库操作,提高并发量,在高并发场景下,最关心的不是消息100%投递成功,而是一定要保证性能,保证能抗得住这么大的并发量,所以能减少数据库的操作就尽量减少,可以异步的进行补偿

特别感谢:

阿神

相关文章

网友评论

      本文标题:RocketMQ 8.可靠性消息投递

      本文链接:https://www.haomeiwen.com/subject/sganoktx.html