美文网首页
C1-RabbitMQ(消息队列)--- 分解 B 2021年

C1-RabbitMQ(消息队列)--- 分解 B 2021年

作者: 鄙人_阿K | 来源:发表于2021-06-21 00:34 被阅读0次

    分布式框架中间件总纲

    https://www.jianshu.com/p/00aa796bb5b8

    友情链接(消息三解序)

    1、RabbitMQ(消息队列)--- 分解 A
    2、RabbitMQ(消息队列)--- 分解 B
    3、RabbitMQ(消息队列)--- 分解 C
    4、RabbitMQ(消息队列)--- 面试题

    本章目录

    一、发布确定
           1、发布确认原理
           2、发布确认的策略
                  开启发布确认的方法
                  单个确认发布
                  批量确认发布
                  异步确认发布
                  如何处理异步未确认消息
                  以上 3 种发布确认速度对比
    二、交换机
           1、Exchanges
           2、临时队列
           3、绑定(bindings)
           4、Fanout
           5、Direct exchange
           6、Topics
    三、死信队列
           1、死信的概念
           2、死信的来源
           3、死信实战
    四、延迟队列
           1、延迟队列概念
           2、延迟队列运用场景
           3、RabbitMQ 中的 TTL
           4、整合 springboot
           5、队列 TTL
           6、延时队列优化
           7、Rabbitmq 插件实现延迟队列

    一、发布确定

    1、发布确认原理

    生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消
    息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会
    发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,
    如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产
    者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的
    multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

    confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道
    返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方
    法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,
    生产者应用程序同样可以在回调方法中处理该 nack 消息。

    2、发布确认的策略

    2.1、开启发布确认的方法

    发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布
    确认,都需要在 channel 上调用该方法

            Channel channel = connection.createChannel ( );
            channel.confirmSelect ();
    

    2.2、单个确认发布

    这是一种简单的确认方式,它是一种 同步确认 发布的方式,也就是发布一个消息之后只有它
    被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认
    的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

    这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会
    阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某
    些应用程序来说这可能已经足够了。

    public class ReleaseConfirmation {
        
            public static void main(String[] args) throws Exception{
                // 1.单个确认发布
                ReleaseConfirmation.publishMessageIndividually();// 耗时27289ms
            }
        
            private static final int MESSAGE_COUNT = 999;
        
            public static void publishMessageIndividually() throws Exception {
        
                Channel channel = RabbitMqUtils.getChannel ( );
                String queryName = UUID.randomUUID().toString();
                channel.queueDeclare (queryName, false, false, false, null);
        
                // 开始发布确认
                channel.confirmSelect ( );
                long begin = System.currentTimeMillis ( );
        
                for (int i = 0; i < MESSAGE_COUNT; i++) {
                    String message = i + "";
                    // 发送消息
                    channel.basicPublish ("", queryName, null, message.getBytes ( ));
                    // 单个消息发送后,马上进行消息确认
                    boolean flag = channel.waitForConfirms ( );
                    if (flag) {
                        //System.out.println ("消息发送成功");
                    }
                }
        
                long end = System.currentTimeMillis ( );
                System.out.println ("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) +
                        "ms");
            }
        }
    

    2.3、批量确认发布

    上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地
    提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现
    问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种
    方案仍然是同步的,也一样阻塞消息的发布。

    public class ReleaseConfirmation {
    
        public static void main(String[] args) throws Exception {
            // 1.单个确认发布
            // ReleaseConfirmation.publishMessageIndividually ( );// 耗时27289ms
            // 2.批量确认发布
            ReleaseConfirmation.publishMessageIndividually2 ( );// 耗时415ms
            // 
        }
    
        private static final int MESSAGE_COUNT = 999;
    
        public static void publishMessageIndividually2() throws Exception {
    
            Channel channel = RabbitMqUtils.getChannel ( );
            String queryName = UUID.randomUUID ( ).toString ( );
            channel.queueDeclare (queryName, false, false, false, null);
    
            // 开始发布确认
            channel.confirmSelect ( );
            // 批量确认消息大小
            int batchSize = 100;
            long begin = System.currentTimeMillis ( );
    
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                // 发送消息
                channel.basicPublish ("", queryName, null, message.getBytes ( ));
                // 判断达到 100条消息的时候,批量确认一次
                if (i%batchSize==0) {
                    // 发布确认
                    channel.waitForConfirms ();
                }
            }
    
            long end = System.currentTimeMillis ( );
            System.out.println ("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) +
                    "ms");
        }
    
    
    

    2.4、异步确认发布

    异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,
    他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,
    下面就让我们来详细讲解异步确认是怎么实现的。


    image.png
     public static void main(String[] args) throws Exception {
            // 1.单个确认发布
            // ReleaseConfirmation.publishMessageIndividually ( );// 耗时27289ms
            // 2.批量确认发布
            // ReleaseConfirmation.publishMessageIndividually2 ( );// 耗时415ms
            // 3.异步确认发布
            ReleaseConfirmation.publishMessageIndividuallyAsync ();// 耗时133ms
        }
    
        private static final int MESSAGE_COUNT = 999;
    
        public static void publishMessageIndividuallyAsync() throws Exception {
    
            Channel channel = RabbitMqUtils.getChannel ( );
            String queryName = UUID.randomUUID ( ).toString ( );
            channel.queueDeclare (queryName, false, false, false, null);
            // 开始发布确认
            channel.confirmSelect ( );
            long begin = System.currentTimeMillis ( );
    
    
            /**
             * 消息确认成功的一个回调(函数式接口)
             */
            ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
                System.out.println ("确认的消息:" + deliveryTag);
    
            };
    
            /**
             * 消息确认失败的一个回调(函数式接口)
             * 1. 参数一:消息的标识(标记)
             * 2. 参数二:是否为批量确认
             *
             */
            ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
                System.out.println ("未确认的消息:" + deliveryTag);
            };
    
            // 消息的监听器:用于监听哪些成功了,哪些失败了
            channel.addConfirmListener (ackCallback, nackCallback);
    
    
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                // 发送消息
                channel.basicPublish ("", queryName, null, message.getBytes ( ));
            }
    
            long end = System.currentTimeMillis ( );
            System.out.println ("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) +
                    "ms");
        }
    
    

    2.5、 如何处理异步未确认消息

    最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,
    比如说用 ConcurrentLinkedQueue 或者 ConcurrentSkipListMap 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

     public static void main(String[] args) throws Exception {
            // 1.单个确认发布
            // ReleaseConfirmation.publishMessageIndividually ( );// 耗时27289ms
            // 2.批量确认发布
            // ReleaseConfirmation.publishMessageIndividually2 ( );// 耗时415ms
            // 3.异步确认发布
            ReleaseConfirmation.publishMessageIndividuallyAsync ();// 耗时133ms
        }
    
        private static final int MESSAGE_COUNT = 999;
    
        public static void publishMessageIndividuallyAsync() throws Exception {
    
            Channel channel = RabbitMqUtils.getChannel ( );
            String queryName = UUID.randomUUID ( ).toString ( );
            channel.queueDeclare (queryName, false, false, false, null);
            // 开始发布确认
            channel.confirmSelect ( );
            long begin = System.currentTimeMillis ( );
    
            // 线程安全且有序的的哈希表,适用于高并发场景
            ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<> ( );
    
            /**
             * 消息确认成功的一个回调(函数式接口)
             */
            ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
                // 2.处理异步未确认:删除掉已经确认的消息,剩下的就是未确认的消息了
                if (multiple) {// 消息是批量的
                    // headMap(),截取到指定集合
                    ConcurrentNavigableMap<Long, String> confirmed = map.headMap (deliveryTag);
                    confirmed.clear ();
                    System.out.println (map.size ()+":最终:"+map );
    
                }else {// 单个消息只删除对应的序号键
                    map.remove (deliveryTag);
    
                }
    
                System.out.println ("确认的消息:" + deliveryTag);
    
            };
    
            /**
             * 消息确认失败的一个回调(函数式接口)
             * 1. 参数一:消息的标识(标记)
             * 2. 参数二:是否为批量确认
             *
             */
            ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
                String message = map.get (deliveryTag);
                System.out.println ("未确认的消息序号:" + deliveryTag+",未确认的消息内容:"+message);
                // 再次发送失败的(单次),若是批量则拿map操作
                channel.basicPublish ("", queryName, null, message.getBytes ( ));
                
            };
    
            // 消息的监听器:用于监听哪些成功了,哪些失败了
            channel.addConfirmListener (ackCallback, nackCallback);
    
    
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                // 发送消息
                channel.basicPublish ("", queryName, null, message.getBytes ( ));
                // 1.处理异步未确认:记录所有要发送的消息,Key:序号,value:消息
                map.put (channel.getNextPublishSeqNo (),message);
            }
    
            long end = System.currentTimeMillis ( );
            System.out.println ("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) +
                    "ms");
    
        }
    

    2.6、以上 3 种发布确认速度对比

    单独发布消息:同步等待确认,简单,但吞吐量非常有限。
    批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题。
    异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

    二、交换机

    假设工作队列背后,每个任务都恰好交付给一个消费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-
    (我们将消息传达给多个消费者。这种模式称为 ”发布/订阅”)。
    为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:第一个程序将发出日志消息,
    第二个程序是消费者。其中我们会启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘,

    1、Exchanges

    1.1、Exchanges概念

    RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递到了哪些队列中。

    相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来
    自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消
    息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。


    image.png

    1.2、Exchanges类型

    直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)

    1.3、无名Exchanges

    前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列,原因是因为我们使用的是默认交换,我们通过空字符串("")进行标识。


    image.png

    第一个参数是交换机的名称。空字符串表示默认或无名称交换机:
    消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话

    2、临时队列(未持久化)

    每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。

    创建临时队列的方式:

    String queueName = channel.queueDeclare().getQueue();
    

    创建出来之后长成这样:


    image.png

    3、绑定(bindings)

    什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队
    列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定


    image.png image.png

    1、通过客户端创建测试队列


    image.png

    2、通过客户端创建测试交换机、


    image.png
    3、用创建好的测试交换机绑定测试队列
    image.png

    4、Fanout(扇出,也称发布/订阅模式)

    Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的
    所有队列中。系统中默认有些 exchange 类型


    image.png
    根据下图的关系编写一个实战 demo:
    image.png

    Logs 和临时队列的绑定关系如下图:


    image.png

    ReceiveLogs01:消费者01代码

    package com.kk.rabbitmq.fanout;
    
    import com.kk.rabbitmq.util.RabbitMqUtils;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    public class ReceiveLogs01 {
    
    
        // 交换机名称
        public static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel ( );
            // 声明一个交换机
            channel.exchangeDeclare (EXCHANGE_NAME,"fanout");
    
            // 声明一个临时队列,作用
            // 1、队列名随机
            // 2、当消费者断开与队列的连接时,队列就自动删除
            String queueName = channel.queueDeclare ( ).getQueue ( );
    
            // 绑定交换机与队列,参数
            // 1、队列
            // 2、交换机
            // 3、Routing key
            channel.queueBind (queueName,EXCHANGE_NAME,"");
            System.out.println ("等待消息接收,把接收到的消息打印出来.............." );
    
            DeliverCallback deliverCallback = (consumerTag,message)->{
            // 接收消息的回调
                System.out.println ("ReceiveLogs01,接收到的消息:"+ new String (message.getBody (),"UTF-8")+",当前时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date () ));
            };
    
            // 当消费者取消消息的回调
            CancelCallback cancelCallback=(consumerTag)->{
    
            };
    
            // 设置消息接收等回调
            // 1、队列名
            // 2、是否自动接收
            // 3、接收消息的回调
            // 4、当消费者取消消息的回调
            channel.basicConsume (queueName,true, deliverCallback,cancelCallback);
        }
    }
    
    
    

    ReceiveLogs01:消费者01代码,跟01一样

    EmitLog:生产者代码

    package com.kk.rabbitmq.fanout;
    import com.kk.rabbitmq.util.RabbitMqUtils;
    import com.rabbitmq.client.Channel;
    import java.util.Scanner;
    public class EmitLog {
    
        // 交换机名称
        public static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel ( );
            // 声明一个交换机(已经被消费者声明出来,所以生产者不需要重复声明)
            //channel.exchangeDeclare (EXCHANGE_NAME, "fanout");
    
            Scanner sc = new Scanner(System.in);
            System.out.println("请输入信息");
            while (sc.hasNext()) {
                String message = sc.nextLine();
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println("生产者发出消息" + message);
            }
        }
    }
    
    

    扇出测试结果


    image.png image.png image.png

    5、Direct exchange(直接交换机,也称路由模式)

    5.1、简单说明和扇出的区别

    扇出的 routing key 都是相同的,所以生产者发出,两个消费者同时收到,
    而直接路由则是两个 routing key 不相同 ,可以直接交换机是扇出的变种

    5.2、介绍

    上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希
    望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志
    消息避免浪费磁盘空间。Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的
    广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的
    routingKey 队列中去。(针对性)


    image.png

    在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列Q1 绑定键为 orange,
    队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.

    在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列
    Q1。绑定键为 black 和 green 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

    5.3、多重绑定
    image.png

    当然如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情
    况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多。

    5.4、实战:根据下图写出代码
    image.png
    image.png

    生产者:需要注意,这次写法和扇出不同,在生产者创建交换机,所以先启动生产者

    package com.kk.rabbitmq.exchange.direct;
    
    import com.kk.rabbitmq.util.RabbitMqUtils;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import java.util.Scanner;
    public class DirectEmitLog {
    
        // 交换机名称
        public static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel ( );
            // 声明一个交换机(已经被消费者声明出来,所以生产者不需要重复声明)
            channel.exchangeDeclare (EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            Scanner sc = new Scanner(System.in);
            System.out.println("请输入要发送的 routing key值");
            String message2 = null;
            while (sc.hasNext()) {
                System.out.println("请输入要发送的 routing key值");
    
                String message = sc.nextLine();
    
                if ("info".equals (message)){
                    System.out.println ("请输入要发送的内容" );
                    message2 = sc.nextLine();
                }else if ("warning".equals (message)){
                    System.out.println ("请输入要发送的内容" );
                    message2 = sc.nextLine();
                }
                // 参数二:发送时被绑定的 routing key
                channel.basicPublish(EXCHANGE_NAME, message, null, message2.getBytes("UTF-8"));
                System.out.println("生产者发出消息:" + message2+",给 "+message);
                continue;
            }
        }
    }
    
    

    消费者01

    package com.kk.rabbitmq.exchange.direct;
    
    import com.kk.rabbitmq.util.RabbitMqUtils;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    public class DirectReceiveLogs01 {
        
        // 交换机名称
        public static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel ( );
            // 声明一个交换机(交给生产者声明)
            //channel.exchangeDeclare (EXCHANGE_NAME,"fanout");
    
            // 声明一个临时队列,作用
            // 1、队列名随机
            // 2、当消费者断开与队列的连接时,队列就自动删除
            String queueName = channel.queueDeclare ( ).getQueue ( );
    
            // 绑定交换机与队列,参数
            // 1、队列
            // 2、交换机
            // 3、Routing key
            channel.queueBind (queueName,EXCHANGE_NAME,"info");
            System.out.println ("等待消息接收,把接收到的消息打印出来.............." );
    
            DeliverCallback deliverCallback = (consumerTag,message)->{
            // 接收消息的回调
                System.out.println ("ReceiveLogs01,接收到的消息:"+ new String (message.getBody (),"UTF-8")+",当前时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date () ));
            };
    
            // 当消费者取消消息的回调
            CancelCallback cancelCallback=(consumerTag)->{
    
            };
    
            // 设置消息接收等回调
            // 1、队列名
            // 2、是否自动接收
            // 3、接收消息的回调
            // 4、当消费者取消消息的回调
            channel.basicConsume (queueName,true, deliverCallback,cancelCallback);
        }
    }
    
    

    消费者02 同上

    测试结果


    image.png
    image.png
    image.png

    6、Topics(主题交换机=扇出+直接)

    6.1、主题交换机=扇出+直接

    简单来说就是,扇出只能广播全部,直接直接有选择的单个,
    主题交换机则把他们的有点融合了

    6.2、Topic 的规范

    1、routing_key 不能随意写,必须是一个单词列表,以点号分隔开
    eg:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit"

    2、单词列表最多不能超过 255 个字节

    3、
    *(星号)可以代替一个单词
    #(井号)可以替代零个或多个单词

    6.3、Topic 的规范案例

    (1)绑定的是中间带 orange 带 3 个单词的字符串
    (\*.orange.\*)
    (2)绑定的是 最后一个单词是 rabbit 的 3 个单词
    (*.*.rabbit)
    第一个单词是 lazy 的多个单词
    (lazy.#)

    (3)下图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的


    image.png

    quick.orange.rabbit    ------》    被队列 Q1Q2 接收到
    lazy.orange.elephant    ------》    被队列 Q1Q2 接收到
    quick.orange.fox    ------》    被队列 Q1 接收到
    lazy.brown.fox    ------》    被队列 Q2 接收到
    lazy.pink.rabbit    ------》    虽然满足两个绑定但只被队列 Q2 接收一次
    quick.brown.fox    ------》    不匹配任何绑定不会被任何队列接收到会被丢弃
    quick.orange.male.rabbit    ------》    是四个单词不匹配任何绑定会被丢弃
    lazy.orange.male.rabbit    ------》    是四个单词但匹配 Q2

    注意:

    当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
    如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

    6.4、Topic 实战

    生产者:

    package com.kk.rabbitmq.exchange.topic;
    
    import com.kk.rabbitmq.util.RabbitMqUtils;
    import com.rabbitmq.client.Channel;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class TopicEmitLog {
    
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel ( );
    
            // 声明交换机类型
            channel.exchangeDeclare (EXCHANGE_NAME,"topic");
    
            /**
             * Q1-->绑定的是
             * 中间带 orange 带 3 个单词的字符串(*.orange.*)
             * Q2-->绑定的是
             * 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
             * 第一个单词是 lazy 的多个单词(lazy.#)
             *
             */
            Map<String, String> bindingKeyMap = new HashMap<> ();
            bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
            bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
            bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
            bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
            bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
            bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
            bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
            bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
    
            // 等待消费者启动
            Thread.sleep (10000);
    
            for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet ( )) {
                String bindingKey = bindingKeyEntry.getKey ( );
                String message = bindingKeyEntry.getValue ( );
                channel.basicPublish (EXCHANGE_NAME,bindingKey,null,message.getBytes ("utf-8"));
                System.out.println("生产者发出消息" + message);
            }
        }
    }
    
    

    消费者01:

    package com.kk.rabbitmq.exchange.topic;
    
    import com.kk.rabbitmq.util.RabbitMqUtils;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    public class TopicReceiveLogs01 {
    
        // 交换机名称
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel ( );
            // 声明一个交换机(交给生产者声明)
            //channel.exchangeDeclare (EXCHANGE_NAME,"fanout");
    
            //声明 Q1 队列与绑定关系
            String queueName="Q1";
            channel.queueDeclare (queueName,false,false,false,null);
            channel.queueBind (queueName,EXCHANGE_NAME,"*.orange.*");
    
            System.out.println ("等待消息接收,把接收到的消息打印出来.............." );
    
            DeliverCallback deliverCallback = (consumerTag,message)->{
            // 接收消息的回调
                System.out.println ("ReceiveLogs01,接收到的消息:"+ new String (message.getBody (),"UTF-8")+",当前时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date () ));
            };
    
            // 当消费者取消消息的回调
            CancelCallback cancelCallback=(consumerTag)->{
    
            };
    
            // 设置消息接收等回调
            // 1、队列名
            // 2、是否自动接收
            // 3、接收消息的回调
            // 4、当消费者取消消息的回调
            channel.basicConsume (queueName,true, deliverCallback,cancelCallback);
        }
    }
    
    

    消费者02:

    package com.kk.rabbitmq.exchange.topic;
    
    import com.kk.rabbitmq.util.RabbitMqUtils;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    public class TopicReceiveLogs02 {
    
        // 交换机名称
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel ( );
            // 声明一个交换机(交给生产者声明)
            //channel.exchangeDeclare (EXCHANGE_NAME,"fanout");
    
            //声明 Q1 队列与绑定关系
            String queueName="Q2";
            channel.queueDeclare (queueName,false,false,false,null);
            channel.queueBind (queueName,EXCHANGE_NAME,"*.*.rabbit");
            channel.queueBind (queueName,EXCHANGE_NAME,"lazy.#");
    
            System.out.println ("等待消息接收,把接收到的消息打印出来.............." );
    
            DeliverCallback deliverCallback = (consumerTag,message)->{
            // 接收消息的回调
                System.out.println ("ReceiveLogs02,接收到的消息:"+ new String (message.getBody (),"UTF-8")+",当前时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date () ));
            };
    
            // 当消费者取消消息的回调
            CancelCallback cancelCallback=(consumerTag)->{
    
            };
    
            // 设置消息接收等回调
            // 1、队列名
            // 2、是否自动接收
            // 3、接收消息的回调
            // 4、当消费者取消消息的回调
            channel.basicConsume (queueName,true, deliverCallback,cancelCallback);
        }
    }
    
    

    测试结果:


    image.png image.png image.png

    三、死信队列

    1、死信队列概念

    先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理
    解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息
    进行消费,但某些时候由于特定的原因 导致 queue 中的某些消息无法被消费,这样的消息如果没有
    后续的处理,就变成了死信,有死信自然就有了死信队列。

    应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息
    消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时
    间未支付时自动失效

    2、死信队列原由

    1、消息 TTL 过期
    2、队列达到最大长度(队列满了,无法再添加数据到 mq 中)
    3、消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

    3、死信队列实战

    3.1、代码架构图

    image.png

    normal-queue(是普通队列),会因为这三种情况转发到dead_exchange(死信交换机),
    并且设置 routingKey(lisi(李四)),成为死信队列(dead-queue)

    3.2、消息TTL 过期

    消费者C1 代码,启动之后关闭该消费者 模拟其接收不到消息(第一个启动)

    // 消费者 1
    public class Consumer1 {
    
        // 普通交换机名称
        private final static String NORMAL_EXCHANGE = "normal_exchange";
        // 死信交换机名称
        private static final String DEAD_EXCHANGE = "dead_exchange";
        // 声明普通队列
        private final static String NORMAL_QUERY = "normal_query";
        // 声明死信队列
        private static final String DEAD_QUEUE = "dead_queue";
    
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel ( );
            // 声明死信交换机和普通交换机,类型为 direct
            channel.exchangeDeclare (NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare (DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    
            // 普通队列绑定 死信交换机信息
            Map<String, Object> params = new HashMap<> ( );
            // 普通队列设置 死信交换机
            params.put ("x-dead-letter-exchange", DEAD_EXCHANGE);
            // 普通队列设置 死信的 routingkey
            params.put ("x-dead-letter-routing-key", "lisi");
            // 声明普通队列 并且根据参数信息绑定 死信交换机
            channel.queueDeclare (NORMAL_QUERY,false,false,false,params);
            channel.queueBind (NORMAL_QUERY, NORMAL_EXCHANGE, "zhangsan");
    
    
            // 声明死信队列 并 绑定 死信交换机与 routingKey
            channel.queueDeclare (DEAD_QUEUE, false, false, false, null);
            channel.queueBind (DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
    
            System.out.println ("C1等待接受消息..." );
    
            channel.basicConsume (NORMAL_QUERY,true,(consumerTag,message)->{
                System.out.println (new java.lang.String (message.getBody (),"UTF-8"));
            },consumerTag -> {});
    
        }
    }
    
    

    启动C1后创建,普通和死信交换机已经对应的队列和绑定


    image.png

    生产者(先关闭已经启动好的C1,在启动生产者,模拟TTL过期)

    // 生产者
    public class Productr {
    
        // 普通交换机名称
        private final static String NORMAL_EXCHANGE = "normal_exchange";
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel ( );
    
            // 死信消息 设置TTL值
            // 设置 10000ms 为 10s
    
            AMQP.BasicProperties preperties =
                    new AMQP.BasicProperties ( ).builder ( ).expiration ("10000").build ( );
    
            for (int i = 0; i < 11; i++) {
                String message = "info:"+i;
                channel.basicPublish (NORMAL_EXCHANGE,"zhangsan",preperties,message.getBytes ());
            }
        }
    }
    

    启动生产者之后,没有消费者消费,过期10s后进入了死信队列


    image.png

    消费者C2 代码(启动C2,消费死信队列的消息)

    // 消费者 2 负责接收死信队列的消息
    public class Consumer2 {
        // 声明死信队列
        private static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel ( );
            System.out.println ("C2等待接受消息..." );
    
            // 接收死信队列的消息
            channel.basicConsume (DEAD_QUEUE,true,(consumerTag,message)->{
                System.out.println (new String (message.getBody (),"UTF-8"));
            },consumerTag -> {});
        }
    }
    

    3.3、队列达到最大长度(在TTL案例的代码上更改)

    改动消费者C1代码:增加队列(操作的第二步)


    image.png

    生产者:去掉TTL(启动生产者之前,先关闭已经启动的消费者1)(操作的第三步)


    image.png

    C2代码不变

    注意:启用前先删除原本普通队列(因为参数被修改了),多了LIM标签既队列长度限制(操作的第一步)
    image.png

    3.4、消息被拒(在TTL案例的代码上更改)

    改动消费者C1代码:拒绝和接收区分


    image.png

    生产者(没有TTL的设置)和C2不变

    先启动C1,然后生产者


    image.png

    四、延迟队列(死信队列中的一种)

    1、延迟队列概念

    延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望
    在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的
    元素的队列。

    2、延迟队列运用场景

    1.订单在十分钟之内未支付则自动取消
    2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
    3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
    4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
    5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

    这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:
    发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎
    使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果
    数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,
    如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支
    付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十
    分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万
    级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单
    的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。


    image.png

    3、RabbitMQ 中的 TTL

    TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,
    单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这
    条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的
    TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

    方式一:消息设置TTL,针对每条消息设置TTL


    image.png

    方式二:队列设置TTL


    image.png

    4、整合 springboot

    1、引入坐标

        <!--指定 jdk 编译版本-->
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>8</source>
                        <target>8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
        <dependencies>
            <!--RabbitMQ 依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.47</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
            <!--swagger-->
            <dependency>
                <groupId>io.springfox</groupId>
                <artifactId>springfox-swagger2</artifactId>
                <version>2.9.2</version>
            </dependency>
            <dependency>
                <groupId>io.springfox</groupId>
                <artifactId>springfox-swagger-ui</artifactId>
                <version>2.9.2</version>
            </dependency>
            <!--RabbitMQ 测试依赖-->
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    

    2、修改配置文件

    spring.rabbitmq.host=106.52.23.202
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=mykk
    spring.rabbitmq.password=aaa666
    
    

    3、添加Swagger 配置类

    package com.kk.rabbit.config;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import springfox.documentation.builders.ApiInfoBuilder;
    import springfox.documentation.service.ApiInfo;
    import springfox.documentation.service.Contact;
    import springfox.documentation.spi.DocumentationType;
    import springfox.documentation.spring.web.plugins.Docket;
    import springfox.documentation.swagger2.annotations.EnableSwagger2;
    
    @Configuration
    @EnableSwagger2
    public class SwaggerConfig {
    
        @Bean
        public Docket webApiConfig() {
            return new Docket (DocumentationType.SWAGGER_2)
                    .groupName ("webApi")
                    .apiInfo (webinfo ( ))
                    .select ( ).build ( );
        }
    
        
        // 文档注释信息,没什么用
        private ApiInfo webinfo() {
            return new ApiInfoBuilder ( )
                    .title ("rabbitmq 接口文档")
                    .description ("本文档描述了 rabbitmq 微服务接口定义")
                    .version ("1.0")
                    .contact (new Contact ("mykk", "http://jianshu.com","763856958@qq.com"))
                    .build ( );
        }
    }
    

    5、队列 TTL

    1、代码架构图(P是生产者,C是消费者,XA,XB,YD均为routingKey )
    创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交
    换机 Y,它们的类型都是direct,创建一个死信队列 QD,它们的绑定关系如下:


    image.png

    2、配置文件类代码

    package com.kk.rabbit.config;
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class TtlQueueConfig {
    
        // 普通交换机
        private final static String X_EXCHANGE = "X";
        // 死信交换机
        private final static String Y_DEAD_LETTER_EXCHANGE = "Y";
        // 队列 A
        private final static String QUERY_A = "QA";
        // 队列 B
        private final static String QUERY_B = "QB";
        // 死信队列
        private final static String QUERY_DEAD_LETTER = "QD";
    
        // 声明 xExchange 用于后面注入
        @Bean("xExchange")
        public DirectExchange xExchange() {
            return new DirectExchange (X_EXCHANGE);
        }
    
        // 声明 yExchange 用于后面注入
        @Bean("yExchange")
        public DirectExchange yExchange() {
            return new DirectExchange (Y_DEAD_LETTER_EXCHANGE);
        }
    
        // 声明队列 A ttl 为 10s 并绑定到对应的死信交换
        @Bean("queryA")
        public Queue queueA(){
            Map<String, Object> args = new HashMap<> ( );
            // 声明当前队列绑定的死信交换机
            args.put ("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            // 声明当前队列绑定的死信 routingKey
            args.put ("x-dead-letter-routing-key","YD");
            // 声明当前队列的 TTL
            args.put ("x-message-ttl",10000);
    
            return QueueBuilder.durable (QUERY_A).withArguments (args).build ();
        }
    
        // 声明队列 B ttl 为 40s 并绑定到对应的死信交换
        @Bean("queryB")
        public Queue queueB(){
            Map<String, Object> args = new HashMap<> ( );
            // 声明当前队列绑定的死信交换机
            args.put ("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            // 声明当前队列绑定的死信 routingKey
            args.put ("x-dead-letter-routing-key","YD");
            // 声明当前队列的 TTL
            args.put ("x-message-ttl",40000);
    
            return QueueBuilder.durable (QUERY_B).withArguments (args).build ();
        }
    
    
        // 声明队列 A 绑定 X 交换机
        @Bean
        public Binding queueaBindingX(@Qualifier("queryA") Queue queueA,
                                      @Qualifier("xExchange") DirectExchange xExchange){
            return BindingBuilder.bind (queueA).to (xExchange).with ("XA");
        }
        // 声明队列 B 绑定 X 交换机
        @Bean
        public Binding queuebBindingX(@Qualifier("queryB") Queue queueB,
                                      @Qualifier("xExchange") DirectExchange xExchange){
            return BindingBuilder.bind (queueB).to (xExchange).with ("XB");
        }
    
        // 声明死信队列 QD
        @Bean("queueD")
        public Queue queueD(){
            return new Queue (QUERY_DEAD_LETTER);
        }
    
        // 声明死信队列 QD 的绑定关系
        @Bean
        public Binding deadLetterBindingQD(@Qualifier("queueD")Queue queueD,
                                           @Qualifier("yExchange")DirectExchange yExchange){
            return BindingBuilder.bind (queueD).to (yExchange).with ("YD");
        }
    }
    
    

    3、消息生产者

    package com.kk.rabbit.controller;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import java.util.Date;
    
    @Slf4j
    @RequestMapping("ttl")
    @RestController
    public class SendMsgController {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("sendMsg/{message}")
        public void sendMsg(@PathVariable String message){
            log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date (), message);
            rabbitTemplate.convertAndSend ("X","XA","消息来自 ttl 为 10S 的队列: "+message);
            rabbitTemplate.convertAndSend ("X","XB","消息来自 ttl 为 40S 的队列: "+message);
        }
    
    }
    
    

    4、消息消费者

    package com.kk.rabbit.consumer;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import java.util.Date;
    
    @Slf4j
    @Component
    public class DeadLetterQueueConsumer {
        @RabbitListener(queues = "QD")
        public void receiveD(Message message, Channel channel) throws Exception{
            String msg = new String (message.getBody (), "utf-8");
            log.info("当前时间:{},收到死信队列信息{}", new Date ().toString(), msg);
        }
    }
    
    

    5、测试
    url输入内容:http://127.0.0.1:8080/ttl/sendMsg/阿k,你好呀

    输出结果:
    当前时间:Thu Jul 29 00:01:37 CST 2021,发送一条信息给两个 TTL 队列:阿k,你好呀
    当前时间:Thu Jul 29 00:01:47 CST 2021,收到死信队列信息消息来自 ttl 为 10S 的队列: 阿k,你好呀
    当前时间:Thu Jul 29 00:02:17 CST 2021,收到死信队列信息消息来自 ttl 为 40S 的队列: 阿k,你好呀

    6、总结和分析
    第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,
    然后被消费掉,这样一个延时队列就打造完成了

    不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S
    两个时间选项,如果需要一个小时后处理,那么就需要增加TTL 为一个小时的队列,如果是预定会议室然
    后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

    此时延迟队列的优化来了,宝

    6、延时队列优化(基于死信队列优化)

    1、代码架构图:新增了一个队列 QC,绑定关系如下,该队列不设置TTL 时间


    image.png

    2、配置文件类,扩展代码

    //------------------------------基于死信队列的延迟优化扩展---start-------------------
        // 普通队列(扩展)
        private final static String QUERY_C = "QC";
    
        // 声明队列 C ,绑定死信交换机(扩展)
        @Bean("queryC")
        public Queue queueC(){
            Map<String, Object> args = new HashMap<> ( );
            // 声明当前队列绑定的死信交换机
            args.put ("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            // 声明当前队列绑定的死信 routingKey
            args.put ("x-dead-letter-routing-key","YD");
            // 无声明TTL
            return QueueBuilder.durable (QUERY_C).withArguments (args).build ();
        }
        // 声明队列 C 绑定 X 交换机(扩展)
        @Bean
        public Binding queueaCindingX(@Qualifier("queryC") Queue queueC,
                                      @Qualifier("xExchange") DirectExchange xExchange){
            return BindingBuilder.bind (queueC).to (xExchange).with ("XC");
        }
    //------------------------------基于死信队列的延迟优化扩展---end---------------------
    
    

    3、消息生产者,扩展代码

    
        // 基于死信队列的 延迟优化测试
        @GetMapping("sendMsg/{message}/{ttlTime}")
        public void sendMsg(@PathVariable String message,
                            @PathVariable String ttlTime) {
            log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttlTime, message);
            rabbitTemplate.convertAndSend ("X", "XC", message, msg -> {
                msg.getMessageProperties ( ).setExpiration (ttlTime);
                return msg;
            });
        }
    

    4、测试
    发送请求:

    1. http://127.0.0.1:8080/ttl/sendMsg/测试基于死信队列的延迟优化---1/20000
    2. http://127.0.0.1:8080/ttl/sendMsg/测试基于死信队列的延迟优化---2/2000

    输出结果:


    image.png

    5、测试后的分析:
    看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消
    息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,
    如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

    此刻这个问题就需要插件了,宝

    7、Rabbitmq 插件实现延迟队列

    概述:
    如果不能实现在消息粒度上的 TTL,并使其在设置的TTL 时间及时死亡,就无法设计成一个通用的延时队列。
    然后插件对交换机进行扩展,即可解决。

    1、安装延时队列插件

    第一步:官网上下载 https://www.rabbitmq.com/community-plugins.html,下载
    rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。
    或者去QQ群:62263397获取

    第二步:将插件拷贝到 /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins 下
    命令: cp rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

    image.png

    第三步:安装插件,然后再重启 rabbitMQ
    安装插件命令(无需指定版本号):rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    重启命令:systemctl restart rabbitmq-server

    此时客户端就多了一个交换机类型


    image.png

    2、代码架构图
    在这里新增了一个队列delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:


    image.png

    3、配置文件类代码
    在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并
    不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,
    才投递到目标队列中。

    package com.kk.rabbit.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class DelayedQueueConfig {
    
        // 队列
        private final static String DELAYED_QUEUE_NAME = "delayed.queue";
        // 交换机
        private final static String DELAYED_EXCHANGE_NAME = "delayed.exchange";
        // routingKey
        private final static String DELAYED_ROUTING_KEY = "delayed.routingkey";
    
        // bean若没有指定名字,方法名默认就是
    
        // 声明延迟队列
        @Bean
        public Queue delayedQueue(){
            return new Queue (DELAYED_QUEUE_NAME);
        }
    
        // 声明交换机,基于插件的自定义交换机
        @Bean
        public CustomExchange delayedExchange(){
            Map<String, Object> arguments = new HashMap<> ( );
            // 设置延迟类型为直接类型
            arguments.put ("x-delayed-type","direct");
    
            /**
             * 1. 交换机名称
             * 2. 交换机类型
             * 3. 是否需要持久化
             * 4. 是否需要自动删除
             * 5. 其他参数
             */
            return new CustomExchange (DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);
        }
    
        // 延迟队列绑定延迟交换机
        @Bean
        public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue")Queue queue,
                                           @Qualifier("delayedExchange") CustomExchange customExchange){
            // noargs 构建
            return BindingBuilder.bind (queue).to (customExchange).with (DELAYED_ROUTING_KEY).noargs ();
        }
    }
    
    

    4、生产者代码

        // 基于延迟交换机
        @GetMapping("sendDelayMsg/{message}/{delayTime}")
        public void sendMsg(@PathVariable String message,
                            @PathVariable Integer delayTime) {
    
            rabbitTemplate.convertAndSend ("delayed.exchange", "delayed.routingkey", message, correlationData -> {
                correlationData.getMessageProperties ().setDelay (delayTime);
                return correlationData;
            });
    
            log.info (" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 delayed.queue:{}", new
                    Date ( ), delayTime, message);
        }
    

    5、消费者代码

        public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    
        @RabbitListener(queues = DELAYED_QUEUE_NAME)
        public void receiveDelayedQueue(Message message) throws Exception {
            String msg = new String (message.getBody (), "utf-8");
            log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
        }
    

    6、测试
    发送请求:

    1. http://127.0.0.1:8080/ttl/sendDelayMsg/测试基于插件的延迟交换机---1/20000
    2. http://127.0.0.1:8080/ttl/sendDelayMsg/测试基于插件的延迟交换机---2/2000

    输出结果:第二个消息被先消费掉了,符合预期


    image.png

    7、结论

    延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用
    RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正
    确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为
    单个节点挂掉导致延时队列不可用或者消息丢失。
    当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz
    或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

    相关文章

      网友评论

          本文标题:C1-RabbitMQ(消息队列)--- 分解 B 2021年

          本文链接:https://www.haomeiwen.com/subject/aypeyltx.html