美文网首页
rabbitMQ可靠性投递

rabbitMQ可靠性投递

作者: 靈08_1024 | 来源:发表于2019-02-19 10:00 被阅读12次

可靠性投递

关于消息的可靠性投递,从以下几个方面来着手:

  • 保障消息成功发出;
  • 保障MQ成功接收;
  • 发送方接收到MQ响应;
  • 完善的消息补偿机制(如重试机制);

本文关于消息投递的两种方式:消息落库打标二次消息机制

消息落库打标

1)发送方将消息存到RDB中(如MySQL);该步骤可以通过事务来确保 业务数据消息数据 落库成功;
2)发送方发送消息给MQ broker;
3)MQ broker返回给发送方确认ACK(rabbitmq自带的confirm机制);
4)修改消息数据库;
5)分布式定时任务会检查消息数据库,取出其中的异常数据(如超时状态为修改),进入重试阶段;
6)重试阶段,针对于每个任务都有一个业务数据来关联重试次数。超过次数,报告失败;

其中,在第1)步,如果持久化失败,会快速失败,便于进行下一次重试。
在第3)步,如果出现网络抖动或者异常,即MQ broker返回给发送方失败。会导致该消息数据长时间在数据库中没有被更新,即状态还是没有被MQ broker确认的状态。所以需要有另外的定时任务去检查消息入库时长,超过一定时间会踢出任务,进入步骤5)。

缺点:因为在第一步需要对数据库进行2次持久化操作,还需要额外的事务来保证。所以对于并发高的环境,其实还是需要深思熟虑的。


二次消息机制

其中涉及服务:上游发送方,MQ broker,下游接收方,回调方。


image.png

1)业务数据落库,落库完毕再发送消息,此处发送2条消息;
2)MQ broker的消息到下游服务;
3)confirm消息:下游服务消费后,重新生成一条消息,投递给MQ broker;
4)此时callback service监听下游服务生成的消息,对收到的消息进行持久化,保存到DB中;
5)callback service监听发送方的第二条消息,并去数据库中进行查询,有即处理好了,没有即处理异常;
6)第5)步异常后,进入该步骤。带着相关业务信息通知发送方重新发送该消息。

在第1)步中,第一条消息是发送给下游业务方的,为立即发送;第二条消息延迟5分钟后进行发送,是发送给callback service来作为检查条件,查看下游服务是否处理完毕该消息。

相比于第一种方案,在主业务流程中减少了DB操作,将对消息的保存移动到了callback service中。

幂等性

幂等性,就是一条数据无论经过多少次该方法的操作,对于我们拿到的结果都是一样的。

常见的幂等性解决方案:

  • 唯一ID+指纹码,利用数据主键来去重;
    其中的指纹码,比如时间戳,业务规则,或者相关业务字段等。
    先查询,后插入。如果根据唯一ID+指纹码查询出数量为0,而在插入时提示已经插入而失败了,此时放弃该操作,说明有别的服务器已经替我们做了这些事情。
    好处:实现简单。
    坏处:高并发DB写入瓶颈。
    解决方案:分库分表。
  • 利用redis原子性去重。
    分布式锁和exists操作。

return消息机制

在某些情况下,我们发送的消息,当前的exchange或routing不存在或者不可达,这个时候就需要return listener。
通过设置mandatory属性来完成。该属性为true,当消息不可达时,会触发return机制;为false时,该消息删除,不报告错误。

//第三个参数就是mandatory:true开启,false关闭
channel.basicPublish(exchangeName, routingKey, true,null, msg.getBytes());

消息限流机制

在消费者里面代码如下设置:

            //prefetchSize  消息大小限制,一般设置为0,表示不限制
            //prefetchCount  一次最多处理多少条消息,一般设置为1。在有这么多消息返回ack后,该consumer阻塞
            //global  true在channel上限制,false在consumer上限制。一般设置为false
            channel.basicQos(0,1,false);
            //autoAck是否自动签收
            channel.basicConsume(queueName, false, new MyConsumer(channel));

在自定义的类MyConsumer中,添加如下配置:

public class MyConsumer extends DefaultConsumer {

    private Channel channel ;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    //如果上面的basicQos中第二个数值设置为1,则此处第二个值设置为false,大于1则设置为true。表示是否多个ack返回。
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
}

相关文章

  • RabbitMQ消息可靠性投递分析

    1、消息可靠性投递分析 由RabbitMQ消息发送的过程,我们可以知道如果要保证消息的可靠性投递,必须要下保证如下...

  • RabbitMQ之八高级特性

    个人专题目录 1. RabbitMQ 高级特性 1.1 消息可靠性投递 在使用 RabbitMQ 的时候,作为消息...

  • RabbitMQ 可靠性投递

    起因:在实际项目开发过程中,需要使用RabbitMQ来实现消息队列的功能,但仅仅实现功能之后并不能对自己满足,既然...

  • rabbitMQ可靠性投递

    可靠性投递 关于消息的可靠性投递,从以下几个方面来着手: 保障消息成功发出; 保障MQ成功接收; 发送方接收到MQ...

  • rabbitmq 可靠性投递(四)之实现可靠性投递

    前言 在之前介绍了可靠性投递方案和项目搭建::https://juejin.im/post/5c3f43dae51...

  • RabbitMq笔记

    1、AMQP 2、rabbitmq安装启动 3、消息的可靠性投递 4、消息端-幂等性保障 5、Return 消息机...

  • rabbitMQ重点篇

    各种MQ的比较 rabbitMQ的优点?答:跨平台跨语言,开源,可靠性投递,返回模式,与spring的整合很好,集...

  • RabbitMQ实现消息可靠性投递

    rabbitmq的消息可靠性投递提现在两个方面,分别是生产者端和消费者端的可靠性控制 1.生产者端 生产端可靠性一...

  • RabbitMQ高级特性

    0. 前言 本文内容分为如下三部分RabbitMQ高级特性 消息可靠性投递Consumer ACK消费端限流TTL...

  • RabbitMQ高级篇一 本章导航及BAT大厂如何保障生产端可靠

    RabbitMQ高级篇一 本章导航及BAT大厂如何保障生产端可靠性投递 一:本章导航 从本章节开始我们就要进入ra...

网友评论

      本文标题:rabbitMQ可靠性投递

      本文链接:https://www.haomeiwen.com/subject/rauisqtx.html