美文网首页
RabbitMQ 高级特性

RabbitMQ 高级特性

作者: 左师兄zuosx | 来源:发表于2021-02-07 09:04 被阅读0次

RabbitMQ 高级特性

消息可靠性

我们可以从以下几方面来保证消息的可靠性:

  1. 客户端代码中的异常捕获,包括生产者和消费者
  2. AMQP/RabbitMQ的事务机制
  3. 发送端确认机制
  4. 消息持久化机制
  5. Broker端的高可用集群
  6. 消费者确认机制
  7. 消费端限流
  8. 消息幂等性

异常捕获机制

    先执行业务操作,业务操作成功后执行消息发送,消息发送过程通过 try catch 方式捕获异常,在异常处理的代码块中执行回滚业务操作或者执行消息重发操作等。这是一种最大努力确保的方式,并无法保证100%绝对可靠,意味这里没有异常并不代表消息就一定投递成功。
boolean  result = doBiz();
if (result){
    try{
        sendMsg();
    } catch (Exception e){
        // 业务回滚、消息重发
        rollbackBiz();
    }
}

AMQP/RabbitMQ事务机制

没有捕获到异常并不能代表消息就一定投递成功了。

一直到事务提交后都没有异常,确实说明消息是投递成功了,但是,这种方式在性能方面的开销比较大,一般不推荐使用。

mport com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author jie.luo
 * @since 2021/1/29
 */
public class Producer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // 声明一个交换器
        channel.exchangeDeclare("ex.rb", BuiltinExchangeType.DIRECT);
        // 声明一个消息队列
        channel.queueDeclare("queue.rb", false, false, false, null);
        // 消息队列与交换器绑定
        channel.queueBind("queue.rb", "ex.rb", "key.rb");

        try {
            // 将channel设置为事务模式
            channel.txSelect();
            // 发送消息到交换器
            channel.basicPublish("ex.rb", "key.rb", null, "message".getBytes());
            // 提交事务,只有消息成功被Broker接收了才能提交成功
            channel.txCommit();
        } catch (Exception e) {
            // 事务回滚
            channel.txRollback();
        }

        channel.close();
        connection.close();

    }
}

发送端确认机制

    RabbitMQ 后来引入了一种轻量级的方式,叫发送方确认(publisher confirm)机制。生产者将信道设置成confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上发布的消息都会被指派一个唯一的消息ID(从1开始),一旦消息被投递到所有匹配的队列后(如果消息和队列都是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确送达了。
image-20210201142504039.png
    RabbitMQ 回传给生产者的确认消息中的 deliveryTag 字段包含了确认消息的序号,另外,通过设置 channel.basicAck 方法中的 mutilple 参数,表示到这个序号之前的所有消息是否都已经得到了处理了。生产者投递消息后并不需要一直阻塞这,可以继续投递下一条消息,并通过回调方式处理ack响应。如果 RabbitMQ 因为自身内部错误导致消息丢失等一次情况发送,就会响应一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该 nack 命令。

同步确认消息

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 同步确认消息
 *
 * @author jie.luo
 * @since 2021/1/29
 */
public class SyncPublisherConfirmsProducer {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.confirmSelect();

        // 声明一个消息队列
        channel.queueDeclare("queue.pc", true, false, false, null);
        // 声明一个 交换器
        channel.exchangeDeclare("ex.pc", BuiltinExchangeType.DIRECT, true, false, null);
        // 将消息队列和交换器绑定,并制定绑卡键
        channel.queueBind("queue.pc", "ex.pc", "key.pc");

        String message = "hello publisher confirm";
        channel.basicPublish("ex.pc", "key.pc", null, message.getBytes());

        try {
            channel.waitForConfirmsOrDie(5_000);
            System.out.println("消息被确认:message = " + message);
        } catch (IOException e) {
            e.printStackTrace();
            System.err.println("消息被拒绝! message = " + message);
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.err.println("在不是Publisher Confirms的通道上使用该方法");
        } catch (TimeoutException e) {
            e.printStackTrace();
            System.err.println("等待消息确认超时! message = " + message);
        }
        channel.close();
        connection.close();
    }

}

同步按批次确认消息

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 同步按批次确认消息
 *
 * @author jie.luo
 * @since 2021/1/29
 */
public class SyncPublisherConfirmsBatchProducer {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.confirmSelect();

        // 声明一个消息队列
        channel.queueDeclare("queue.pc", true, false, false, null);
        // 声明一个 交换器
        channel.exchangeDeclare("ex.pc", BuiltinExchangeType.DIRECT, true, false, null);
        // 将消息队列和交换器绑定,并制定绑卡键
        channel.queueBind("queue.pc", "ex.pc", "key.pc");

        String message = "hello-";
        // 批处理的大小
        int batchSize = 10;
        // 用于对需要等待确认消息的计数
        int outStrandingConfirms = 0;
        for (int i = 0; i < 103; i++) {
            channel.basicPublish("ex.pc", "key.pc", null, (message + 1).getBytes());

            outStrandingConfirms++;
            if (outStrandingConfirms == batchSize) {
                // 此时已经有一个批次的消息需要同步等待broker的确认消息
                channel.waitForConfirmsOrDie(5_000);
                System.out.println("消息已经被确认了");
                outStrandingConfirms = 0;
            }
        }

        if (outStrandingConfirms > 0) {
            channel.waitForConfirmsOrDie(5_000);
            System.out.println("剩余消息已经被确认了");
        }

        channel.close();
        connection.close();
    }

}

通过回调确认消息

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
 * 通过回调确认消息
 *
 * @author jie.luo
 * @since 2021/1/29
 */
public class AayncPublisherConfirmsProducer {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.confirmSelect();

        // 声明一个消息队列
        channel.queueDeclare("queue.pc", true, false, false, null);
        // 声明一个 交换器
        channel.exchangeDeclare("ex.pc", BuiltinExchangeType.DIRECT, true, false, null);
        // 将消息队列和交换器绑定,并制定绑卡键
        channel.queueBind("queue.pc", "ex.pc", "key.pc");


        ConcurrentNavigableMap<Long, String> outStandingConfirms = new ConcurrentSkipListMap<>();

        // 设置channel的监听器,处理确认消息和不确认的消息
        channel.addConfirmListener(new ConfirmCallback() {
            @Override
            public void handle(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    System.out.println("编号小于等于 " + deliveryTag + " 的消息都已经被确认了");
                    ConcurrentNavigableMap<Long, String> headMap = outStandingConfirms.headMap(deliveryTag, true);
                    // 清空 outStandingConfirms 中已经被确认的消息信息
                    headMap.clear();
                } else {
                    System.out.println("编号为:" + deliveryTag + " 的消息被确认");
                    // 移除已经被确认的消息
                    outStandingConfirms.remove(deliveryTag);
                }
            }
        }, new ConfirmCallback() {
            @Override
            public void handle(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    System.out.println("编号小于等于 " + deliveryTag + " 的消息【不】确认");
                } else {
                    System.out.println("编号为:" + deliveryTag + " 的消息【不】确认");
                }
            }
        });

        for (int i = 0; i < 1000; i++) {
            // 获取下一条即将发送消息的消息id
            long nextPublishSeqNo = channel.getNextPublishSeqNo();
            String message = "message-" + 1;
            channel.basicPublish("ex.pc", "key.pc", null, message.getBytes());

            System.out.println("编号为:" + nextPublishSeqNo + " 的消息还未确认");

            outStandingConfirms.put(nextPublishSeqNo, message);
        }

        Thread.sleep(10000);

        channel.close();
        connection.close();
    }

}

Spring Boot 案例

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author jie.luo
 * @since 2021/2/1
 */
@RestController
public class RabbitDemoController {

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("消息确认:" + correlationData.getId() + " " + new String(correlationData.getReturnedMessage().getBody()));
                } else {
                    System.out.println(cause);
                }
            }
        });
    }

    @GetMapping("/sendMsg")
    public String sendMsg(@RequestParam int index) throws Exception {

        MessageProperties properties = new MessageProperties();
        properties.setCorrelationId("1234");
        properties.setConsumerTag("msg-" + index);
        properties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        properties.setContentEncoding("utf-8");
        // 设置消息持久化
        properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

        CorrelationData cd = new CorrelationData();
        cd.setId("msg" + index);
        cd.setReturnedMessage(new Message("这是msg1的响应信息".getBytes("utf-8"), null));

        Message message = new Message("这是等待确认的消息".getBytes("utf-8"), properties);

        this.rabbitTemplate.convertAndSend("ex.ca", "key.ca", message, cd);

        return "ok";
    }

}

消息持久化机制

    持久化是提高 RabbitMQ 可靠性的基础,否则当 RabbitMQ 遇到异常时(重启、端点、停机)数据将会丢失,主要从以下几方面来保障晓得的持久性:
  • Exchange持久化

    通过定义时设置 durable 参数为 ture 来保证 Exchange 相关的元数据不丢失

  • Queue持久化

    通过定义时设置 durable 参数为 ture 来保证 Queue 相关的元数据不丢失

  • 消息的持久化

    通过将消息的投递模式(BasicProperties 中的 deliveryMode 属性)设置为 2 ,即可实现消息的持久化,保证消息自身不丢失。

import com.rabbitmq.client.*;

/**
 * @author jie.luo
 * @since 2021/1/29
 */
public class Producer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // 声明一个交换器
        channel.exchangeDeclare("ex.per", BuiltinExchangeType.DIRECT, true, false, null);
        // 声明一个消息队列
        channel.queueDeclare("queue.per", true, false, false, null);
        // 消息队列与交换器绑定
        channel.queueBind("queue.per", "ex.per", "key.per");


        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        builder.contentEncoding("text/plain");
        // 消息持久化
        builder.deliveryMode(2);
        // 发送消息到交换器
        channel.basicPublish("ex.per", "key.per", null, "this is persistent message".getBytes());


        channel.close();
        connection.close();

    }
}

消费者确认机制

    如何保证消息被消费者成功消费?

    消息被消费者消费的过程中业务失败了但是消息已经出列了(被标记为已经消费了),我们又没有任何重试,那么结果通消息丢失没有什么分别。

    RabbitMQ 在消费端会有 Ack 机制,即消费端消息消费后,需要发送 Ack 确认报文给Broker端,告知自己是否已经消费完成,否则可能会一直重发消息到消息过期(AUTO模式)

    这也是 最终一致性、可恢复性 的基础。

消费者消息确认模式:

  • none模式

    消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表,再通过后台定时任务扫描异常恢复表尝试做重试动作。如果业务不自行处理则有丢失数据的风险。

  • auto模式

    自动Ack模式,不主动捕获异常,当小费过程中出现异常时会将消息放回Queue中,然后消息会被重新分配到其他消费节点(如果没有则还是选择当前节点)重新被消费,默认会一直重发消息并指导消费完成返回Ack或一直到过期

  • manual模式

    手动Ack模式,消费者咨询控制流程并手动调用channel相关的方法返回Ack

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author jie.luo
 * @since 2021/1/29
 */
public class MyConsumer {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();


        channel.queueDeclare("queue.ca", false, false, false, null);
        channel.basicConsume("queue.ca", false, "myConsumer", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {

                System.out.println(new String(body));
                // 确认消息
                // 第一个参数是消息的标签
                // 第二个参数标识确认多个消息还是一个消息,false-确认一个消息
//                channel.basicAck(envelope.getDeliveryTag(), false);

                // 用于拒收多条消息
                // 第一个参数是消息的标签
                // 第二个参数表示不确认多个消息还是一个消息,false-不确认一个消息
                // 第三个参数表示不确认的消息是否需要重新入列,然后重发,true-需要重新入列
//                channel.basicNack(envelope.getDeliveryTag(), false, true);

                // 用于拒收一条消息
                // 第二个参数表示不确认的消息是否需要重新入列,然后重发,true-需要重新入列
                channel.basicReject(envelope.getDeliveryTag(), true);
            }
        });


//        channel.close();
//        connection.close();
    }
}

消费端限流

在电商的秒杀活动中,活动一开始会有大量并发写请求到达服务端,需要对消息进行削峰处理,如何削峰?

当消息投递速度远快于消费速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩溃,而分布式系统的故障往往会发生上下游传递,连锁反应那就会很悲剧...

一、RabbitMQ 可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应项指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接客户端的套接字读取数据。连接心跳监视也将被禁用。所有网络连接将在rabbitmqctl和管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以继续或被阻止,这意味着它们已发布,现在已暂停。兼容的客户端被阻止时将收到通知。

在 /etc/rabbitmq/rabbitmq.conf中配置磁盘可用空间大小

# 设置磁盘可用空间大小,单位字节。
# 当磁盘可用空间低于这个值的时候,发出磁盘警告,触发限流。
# 如果设置了相当大小,则忽略此绝对大小
disk_free_limit.absolute = 50000

# 使用计量单位,从RabbitMQ 3.6.0开始有效。对vm_memory_high_watermark同样有效
disk_free_limit.absolute = 500KB
disk_free_limit.absolute = 50mb
disk_free_limit.absolute = 50GB

# 还可以使用相对于总可用内存的相对值来设置。
# 注意:此相对值不要低于1.0!当磁盘可用空间低于总可用内存的2.0倍的时候,触发限流
disk_free_limit.relative = 2.0


# 内存限流阈值
# 0.4 表示阈值和总可用内存的比值。总可用内存表示操作系统给每个进程分配的大小,或实际内存大小
# 如32位Windows,系统给每个进程最大2GB的内存,则此比值表示阈值为820MB
# vm_memory_high_watermark.relative = 0.4

# 还可以直接通过绝对值限制可用内存的大小。单位字节。
# vm_memory_high_watermark.absolute = 1073741824

# 从RabbitMQ 3.6.0 开始,绝对值支持计量单位。如果设置了相对值,则忽略此绝对值。
# vm_memory_high_watermark.absolute = 2GB

# 支持的单位:

# k,kib: kibibytes (2^10 - 1,024 bytes )
# M,Mib: mebibytes (2^20 - 1,048,576 bytes)
# G,Gib: gibibytes (2^30 - 1,073,741,824 bytes)
# kb: kilobytes (10^3 - 1,000 bytes)
# MB: megabytes (10^6 - 1,000,000 bytes)
# GB: gigabytes (10^9 - 1,000,000,000 bytes)

二、RabbitMQ还默认提供一种基于 credit flow流控 机制,迷香每一个连接进行流控,当单个队列达到最大流速时,或者多个队列达到总流速是,都会触发流控。触发单个链接的流控可能是因为 connection、channel、queue的某一个过程处于flow状态,这些状态都可以从监控平台看到。

image-20210207102604446.png image-20210207102616332.png image-20210207102626470.png

三、RabbitMQ中有一种QoS保证机制,可以 **限制Channel上接收到的未被Ack的消息数量 **,如果超过这个数量限制RabbitMQ将不会再往消费端推送消息。这是一种流控手段,可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)。 比较值得注意的是 QoS机制仅对于消费端推模式有效,对拉模式无效。

而且不支持NONE Ack模式。执行channel.basicConsume 方法之前通过 channel.basicQoS 方法可以设置该数量。消息的发送是异步的,消息的确认也是异步的。 在消费者消费慢的时候,可以设置Qos的prefetchCount,它表示broker在向消费者发送消息的时候,一旦发送了prefetchCount个消息而没有一个消息确认的时候,就停止发送。消费者确认一个,broker就发送一个,确认两 个就发送两个。换句话说,消费者确认多少,broker就发送多少,消费者等待处理的个数永远限制在prefetchCount个。

如果对于每个消息都发送确认,增加了网络流量,此时可以批量确认消息。如果设置了multiple为true,消费者在确认的时候,比如说id是8的消息确认了,则在8之前的所有消息都确认了。

    生产者往往是希望自己产生的消息能快速投递出去,而当消息投递太快且超过了下游的消费速度时就容易出现消息积压/堆积,所以,从上游来讲我们应该在生产端应用程序中也可以加入限流、应急关等控制手段,避免超过Broker端的极限承载能力或者压垮下游消费者。  

    再看看下游,我们期望下游消费端能尽快消费完消息,而且还要防止瞬时大量消息压垮消费端(推模式),我们期望消费端处理速度是最快、最稳定而且还相对均匀(比较理想化)。  

    **提升下游应用的吞吐量** 和 **缩短消费过程的耗时** ,优化主要以下几种方式:  

    1、有害应用程序的性能,缩短响应时间(需要时间)

    2、增加消费者节点实例(成本增加,而且提成数据库操作这些也可能是瓶颈)

    3、调整并发消费的线程数(线程数并非越大越好,需要大量延迟调优至合理值)
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
// 设置 channel 并发请求最大数
factory.setRequestedChannelMax(10);
// 设置自定义的线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
factory.setThreadFactory(threadFactory);
    @Bean
    public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        // SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其
        // 转换为String类型的,没有content_type都按byte[]类型
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 设置并发线程数
        factory.setConcurrentConsumers(10);
        // 设置最大并发线程数
        factory.setMaxConcurrentConsumers(20);
        return factory;
    }
# 并行消费数
spring.rabbitmq.listener.simple.concurrency=5
# 最大并行消费数
spring.rabbitmq.listener.simple.max-concurrency=10

消息可靠性保障

消息可靠性保障:

  • 消息传输保障
  • 各种限流、应急手段
  • 业务层面的一些容错、补偿、异常重试等手段

消息可靠传输 一般是业务系统接入消息中间件时 首要考虑的问题,一般消息中间件的消息传输保障分为三个层级:

  • At most once

    最多一次。消息可能会丢失,但绝不会重复传输

  • At least once

    最少一次。消息绝不会丢失,但可能会重复传输

  • Exactly once

    恰好一次。每条消息肯定会被传输一次且仅传输一次

其中“最少一次”投递实现需要考虑以下几个方面内容:

  1. 消息生产者需要开启事务机制或者publisher confirm 机制,以确保消息可以可靠地传输到 RabbitMQ中。
  2. 消息生产者需要配置使用mandatory 参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。
  3. 消息和队列都需要进行持久化处理,以确保 RabbitMQ服务器在遇到异常情况时不会造成消息丢失。
  4. 消费者在消费消息的同时需要将 autoAck 设置为 false,然后通过手动确认的方式去确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失。

最多一次” 的方式就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这样很难确保消息不会丢失。(估计有不少公司的业务系统都是这样的,想想都觉得可怕)

恰好一次” 是RabbitMQ目前无法保障的。

    考虑这样一种情况,消费者在消费完一条消息之后向RabbitMQ 发送确认Basic.Ack 命令,此时由于网络断开或者其他原因造成RabbitMQ 并没有收到这个确认命令,那么RabbitMQ 不会将此条消息标记删除。在重新建立连接之后,消费者还是会消费到这一条消息,这就造成了重复消费。  

    再考虑一种情况,生产者在使用publisher confirm机制的时候,发送完一条消息等待RabbitMQ返回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样RabbitMQ 中就有两条同样的消息,在消费的时候消费者就会重复消费。  

消息冥等性处理

    刚刚我们讲到,追求高性能就无法保证消息的顺序,而追求可靠性那么就可能产生重复消息,从而导致重复消费...真是应证了那句老话:做架构就是权衡取舍。  

    RabbitMQ层面有实现“**去重机制**”来保证“**恰好一次**”吗?答案是并没有。而且这个在目前主流的消息中间件都没有实现。  

    借用淘宝沈洵的一句话:最好的解决办法就是不去解决。当为了在基础的分布式中间件中实现某种相对不太通用的功能,需要牺牲到性能、可靠性、扩展性时,并且会额外增加很多复杂度,最简单的办法就是**交给业务自己去处理**。事实证明,很多业务场景下是可以容忍重复消息的。例如:操作日志收集,而对一些金融类的业务则要求比较严苛  

一般解决重复消息的办法是,在消费端让我们消费消息的操作具备幂等性

幂等性问题并不是消息系统独有,而是(分布式)系统中普遍存在的问题。例如:RPC框架调用超后会重试,HTTP请求会重复发起(用户手抖多点了几下按钮)

    幂等(Idempotence)是一个数学上的概念,它是这样定义的:  

如果一个函数f(x) 满足:f(f(x)) = f(x),则函数f(x) 满足幂等性。这个概念被拓展到计算机领域,被用来描述一个操作、方法或者服务。

一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。

对于幂等的方法,不用担心重复执行会对系统造成任何改变

举个简单的例子(在不考虑并发问题的情况下):

select * from xx where id=1
delete from xx where id=1

这两条sql语句就是天然幂等的,它本身的重复执行并不会引起什么改变。而update就要看情况的,

update xxx set amount = 100 where id =1

这条语句执行1次和100次都是一样的结果(最终余额都还是100),所以它是满足幂等性的。

update xxx set amount = amount + 100 where id =1

它就不满足幂等性的。

业界对于幂等性的一些常见做法:

  1. 借助数据库唯一索引,重复插入直接报错,事务回滚。还是举经典的转账的例子,为了保证不重复扣款或者重复加钱,我们这边维护一张“资金变动流水表”,里面至少需要交易单号、变动账户、变动金额等3个字段。我们选择交易单号和变动账户做联合唯一索引(单号是上游生成的可保证唯一性),这样如果同一笔交易发生重复请求时就会直接报索引冲突,事务直接回滚。现实中,数据库唯一索引的方式通常做为兜底保证;

  2. 前置检查机制。这个很容易理解,并且有几种实现办法。还是引用上面转账的例子,当我在执行更改账户余额这个动作之前,我得先检查下资金变动流水表(或者Tair中)中是否已经存在这笔交易相关的记录了, select * from xxx where accountNumber=xxx andorderId=yyy ,如果已经存在,那么直接返回,否则执行正常的更新余额的动作。为了防止并发问题,我们通常需要借助“排他锁”来完成。在支付宝有一条铁律叫:一锁、二判、三操作。当然,我们也可以使用乐观锁或CAS机制,乐观锁一般会使用扩展一个版本号字段做判断条件

  3. 唯一Id机制,比较通用的方式。对于每条消息我们都可以生成唯一Id,消费前判断Tair中是否存在(MsgId做Tair排他锁的key),消费成功后将状态写入Tair中,这样就可以防止重复消费了。

对于接口请求类的幂等性保证要相对更复杂,我们通常要求上游请求时传递一个类GUID的请求号(或TOKEN),如果我们发现已经存在了并且上一次请求处理结果是成功状态的(有时候上游的重试请求是正常诉求,我们不能将上一次异常/失败的处理结果返回或者直接提示“请求异常”,如果这样重试就变得没意义了)则不继续往下执行,直接返回“重复请求”的提示和上次的处理结果(上游通常是由于请求超时等未知情况才发起重试的,所以直接返回上次请求的处理结果就好了)。如果请求ID都不存在或者上次处理结果是失败/异常的,那就继续处理流程,并最终记录最终的处理结果。这个请求序号由上游自己生成,上游通用需要根据请求参数、时间间隔等因子来生成请求ID。同样也需要利用这个请求ID做分布式锁的KEY实现排他。

TTL机制

    TTL,Time to Live 的简称,即过期时间

    RabbitMQ 可以对**消息**和**队列**两个维度来设置TTL

    任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是被消费掉,那么需要有一种过期的机制来做兜底。

    目前有两种方式可以设置消息的TTL。
  1. 通过Queue属性设置,队列中所有消息都有相同的过期时间。

  2. 对消息自身进行单独设置,每条消息的TTL可以不同。

    如果两种方法一起使用,则消息的TTL以两者之间较小数字为准。通常来讲,消息在队列中的生成时间一旦超过设置的TTL值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。当然“死信”也可以被取出来消费的。

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {

    channel.exchangeDeclare("ex.ttl",BuiltinExchangeType.DIRECT);

    // 创建队列(实际上使用的是AMQP default这个direct类型的交换器)
    // 设置队列属性
    Map<String, Object> arguments = new HashMap<>();
    // 设置队列的TTL
    arguments.put("x-message-ttl", 30000);
    // 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,队列可以存活多久)
    arguments.put("x-expires", 10000);
    channel.queueDeclare("queue.ttl", false, false, false, arguments);

    channel.queueBind("queue.ttl","ex.ttl","key.ttl");

    for (int i = 0; i < 1000000; i++) {
        String message = "Hello World!" + i;
        channel.basicPublish(
                "ex.ttl",
                "key.ttl",
                new AMQP.BasicProperties().builder().expiration("30000").build(),
                message.getBytes()
        );
        System.out.println(" [X] Sent '" + message + "'");
    }
} catch (TimeoutException e) {
    e.printStackTrace();
} catch (IOException e) {
    e.printStackTrace();
}

此外,还可以通过命令行方式设置全局TTL,执行如下命令:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues

默认规则:

  • 如果不设置TTL,则表示此消息不会过期
  • 如果TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则消息会被立即丢弃

注意理解 message-ttl 、 x-expires 这两个参数的区别,有不同的含义。但是这两个参数属性都遵循上面的默认规则。一般TTL相关的参数单位都是**毫秒(ms) **

死信队列

在定义业务队列时,可以考虑制定一个 死信交换器,并绑定一个死信队列。当消息变成死信时,该消息就会发送到该死信队列上,这样方便我们查看消息失败的原因。

DLX、全称:Dead-Letter-Exchange,死信交换器。消息在一个队列中变成死信(Dead Letter)之后,被重新发送到一个特殊的交换器(DLX)中,同时绑定DLX的队列就变成 死信队列

以下几种情况导致消息变成死信:

  1. 消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false
  2. 消息过期
  3. 队列达到最大长度

对RabbitMQ来说,DLX是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正常消费(消费者调用了Basic.NackBasic.Reject)而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {
    // 定义一个死信交换器(也是一个普通的交换器)
    channel.exchangeDeclare("exchange.dlx", "direct", true);
    // 定义一个正常业务的交换器
    channel.exchangeDeclare("exchange.biz", "fanout", true);
    Map<String, Object> arguments = new HashMap<>();
    // 设置队列TTL
    arguments.put("x-message-ttl", 10000);
    // 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列)
    arguments.put("x-dead-letter-exchange", "exchange.dlx");
    // 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的routingKey
    arguments.put("x-dead-letter-routing-key", "routing.key.dlx.test");
    channel.queueDeclare("queue.biz", true, false, false, arguments);
    channel.queueBind("queue.biz", "exchange.biz", "");
    channel.queueDeclare("queue.dlx", true, false, false, null);
    // 死信队列和死信交换器
    channel.queueBind("queue.dlx", "exchange.dlx", "routing.key.dlx.test");
    channel.basicPublish("exchange.biz", "", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx.test".getBytes());
} catch (Exception e) {
    e.printStackTrace();
}

延迟队列

延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消费。

例如下面的业务场景:

    在支付宝上面买电影票,锁定了一个座位后系统默认会帮你保留15分钟时间,如果15分钟后还没付款那么不好意思系统会自动把座位释放掉。怎么实现类似的功能呢?  
  1. 可以用定时任务每分钟扫一次,发现有占座超过15分钟还没付款的就释放掉。但是这样做很低效,很多时候做的都是些无用功
  2. 可以用分布式锁、分布式缓存的被动过期时间,15分钟过期后锁也释放了,缓存key也不存在了
  3. 还可以用延迟队列,锁座成功后会发送1条延迟消息,这条消息15分钟后才会被消费,消费的过程就是检查这个座位是否已经是“已付款”状态

RabbitMQ 延迟队列可以使用 rabbitmq_delayed_message_exchange 插件来实现

这里和TTL方式有个很大的不同就是TTL存放消息在死信队列(delayqueue)里,而基于插件存放消息在延时交换机里(x-delayed-message exchange)

[图片上传失败...(image-103635-1612746288866)]

  1. 生产者将消息(msg)和路由键(route key)发送指定的延时交换器(exchange)上
  2. 延时交换器(exchange)存储消息等待消息到期根据路由键(route key)找到绑定自己的队列(queue)并把消息给它
  3. 队列(queue)再把消息发送给监听它的消费者(customer)

插件安装步骤:

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

image-20210207161651815.png
  1. 安装插件

    将插件拷贝到 rabbitmq-server的安装路径:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.4/plugins

  2. 启动插件

    rabbitmq-plugins list
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
  3. 重启 rabbitmq-server

    systemctl restart rabbitmq-server
    

案例

配置信息

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 延时队列配置
 *
 * @author jie.luo
 * @since 2021/2/1
 */
@Configuration
public class RabbitDelayConfig {

    @Bean
    public Queue queue() {
        return new Queue("queue.delay", true, false, false, null);
    }

    @Bean
    public Exchange exchange() {
        Map<String, Object> props = new HashMap<>();
        props.put("x-delayed-type", ExchangeTypes.DIRECT);
        return new CustomExchange("ex.delay", "x-delayed-message", true, false, props);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("key.delay").noargs();
    }

}

消息生产者

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author jie.luo
 * @since 2021/2/1
 */
@RestController
public class RabbitDemoController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send-delay-message")
    public String sendDelayMsg(@RequestParam int delayTime) throws Exception {

        MessageProperties properties = new MessageProperties();
        // 指定消息延时时间
        properties.setHeader("x-delay", delayTime * 1000);

        String msg = "这是延迟消息,延迟时间: " + delayTime + " s";
        Message message = new Message(msg.getBytes("utf-8"), properties);

        this.rabbitTemplate.convertAndSend("ex.delay", "key.delay", message);

        return "ok";
    }

}

消费者

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.LocalDateTime;

/**
 * 通过监听器消费
 *
 * @author jie.luo
 * @since 2021/2/1
 */
@Component
public class BootConsumer {

    @RabbitListener(queues = "queue.delay")
    public void handlerDelayMessage(Message message, Channel channel) throws IOException {
        System.out.println("消费消息: " + new String(message.getBody()) + "     消费时间: " + LocalDateTime.now());

        // 手动确认消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

相关文章

  • RabbitMQ高级特性

    消息如何保障100%的投递成功? 什么是生产端的可靠性投递? 1、保障消息的成功发出 2、保障MQ节点的成功接收 ...

  • RabbitMQ高级特性

    0. 前言 本文内容分为如下三部分RabbitMQ高级特性 消息可靠性投递Consumer ACK消费端限流TTL...

  • RabbitMQ高级特性

  • RabbitMQ高级特性

    1、Confirm机制 消息的确认,是指生产者投递消息后,如果Broker收到消息,会给我们生产者一个应答。生产者...

  • RabbitMq高级特性

    1.消息如何保证100%投递成功 在学习队列的时候,我想很多朋友都在考虑这个问题。在讲RabbitMq的消息可靠性...

  • RabbitMQ(二):RabbitMQ高级特性

    RabbitMQ是目前非常热门的一款消息中间件,不管是互联网大厂还是中小企业都在大量使用。作为一名合格的开发者,有...

  • RabbitMQ之八高级特性

    个人专题目录 1. RabbitMQ 高级特性 1.1 消息可靠性投递 在使用 RabbitMQ 的时候,作为消息...

  • RabbitMQ高级特性(1)

    一、生产端-可靠性投递 解决方案:消息落库,对消息状态进行打标 二、消费端-幂等性保障 在海量订单产生的业务高峰期...

  • RabbitMQ高级特性(2)

    一、消费端限流 1.什么是消费端的限流 假设一个场景,首先,Rabbitmq服务器有上万条未处理的消息,我们随便打...

  • RabbitMQ实战(三)-高级特性

    0 相关源码 1 你将学到 如何保证消息百分百投递成功 幂等性 如何避免海量订单生成时消息的重复消费 Confir...

网友评论

      本文标题:RabbitMQ 高级特性

      本文链接:https://www.haomeiwen.com/subject/esoztltx.html