美文网首页
死信队列 DLX,延时队列实现

死信队列 DLX,延时队列实现

作者: 爱吃豆包 | 来源:发表于2023-03-22 08:58 被阅读0次

    死信队列:(英文名)DLX, Dead-Letter-Exchange

    利用DLX, 当消息在一个队列中变成死信(dead message) 之后,它能被重新publish到另一个Exchange, 这个Exchange就是DLX

    什么是死信队列?

    就是你的消息发布后,没有消费者去消费!就变成了死信了!在任何MQ产品中都有这个情况!

    在RabbitMQ中,变成死信队列后,它会把这个消息重新publish到另一个Exchange中,这个Exchange就是DLX

    消息变成死信有以下几种情况:

    1.消息被拒绝(basic.reject / basic.nack) 并且requeue=fasle (队列重发设为了false)

    2.消息TTL过期,(超过任何消息有效限制之后,就变成了死信

    3.队列达到最大长度。(如果消息的最大大小满了,后面的消息就被送进死信队里中)

    步骤:

    1.DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性(这个Exchange只需要正常的去定义就好, 和平常没却别)

    2.当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列

    3.可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0以前支持的immediate参数的功能!

    死信队列的设置:

    首先需要设置死信队列的Exchange和Queue,然后进行绑定:

    Exchange: dlx.exchange
    Queue: dlx.queue
    RoutingKey:#

    然后正常的声明交换机、队列、绑定,只不过我们需要在队列加上一个参数:
    argument.put("x-dead-letter-exchange", “dlx.exchange”)
    这个参数表示,当消息正常的路由到队列的时候,但是没有一个消费者去消费的话,就会被重新路由到 dlx.exchange 交换机上
    这样消息在过期,requeue、队列在达到最大长度时,消息就可以直接路由到死信队列!

    延时队列实现

    可以通过死信队列来实现!给消息设置 TTL 有效期时间,比如设置 10 秒,那么在 10 秒后,这个消息成为了死信,然后会转发到另外一个 交换机上进行消息处理!

    生产者

    package com.example.rabbitmqapi.dlx;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 死信队列 DLX
     *
     *      消息变成死信有以下几种情况:
     *         1.消息被拒绝(basic.reject / basic.nack) 并且requeue=fasle (队列重发设为了false)
     *         2.消息TTL过期,(超过任何消息有效限制之后,就变成了死信)
     *         3.队列达到最大长度。(如果消息的最大大小满了,后面的消息就被送进死信队里中)
     *
     * 消费者
     *
     *      消费端 ACK 和 重回队列
     *
     * @author weiximei
     */
    public class Consumer {
    
        public static void main(String[] args) throws Exception {
            // 1. 创建连接工厂并设置属性
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.72.138");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            // 2. 创建连接
            Connection connection = factory.newConnection();
    
            // 3. 创建channel
            Channel channel = connection.createChannel();
    
    
            // 4. 声明Exchange
    
            String exchangeName = "test_dlx_exchange";
            String exchangeType = "topic";
            String routingKey = "dlx.#";
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
    
            // 5. 声明消息队列
            String queueName = "test_dlx_queue";
    
            /**
             * 声明死信队里的属性,出现死信后,会重新发送到这个 dlx.exchange 上
             */
            Map<String,Object> agruments = new HashMap<>();
            agruments.put("x-dead-letter-exchange", "dlx.exchange");
    
            // 需要把 agruments 属性,要设置到声明队列上
            channel.queueDeclare(queueName, true, false, false, agruments);
    
            // 6. 绑定队列和Exchange
            channel.queueBind(queueName, exchangeName, routingKey);
    
    
            /**
             * 声明死信队列
             *
             *      接下来,就可以更具接收到的死信消息,做相应的处理, 也就做一个消息的接收,处理消息(消费者)
             *      在这里只是声明了,并没对消息做处理
             *
             */
            channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
            channel.queueDeclare("dlx.queue", true, false, false, null);
            channel.queueBind("dlx.queue", "dlx.exchange", "#"); // # 表示所有队里都匹配接收
    
    
            // 7. 设置消费者为自定义的消费者, 手工签收ACK,则必须 autoAck=false
            channel.basicConsume(queueName, false, new MyConsumer(channel));
    
        }
    
    }
    
     
    
    

    消费者

     
    package com.example.rabbitmqapi.dlx;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 死信队列 DLX
     *
     *
     * 生产者
     *
     *      在消费端的 ACK 和 重回队列
     *
     * @author weiximei
     */
    public class Producer {
    
        public static void main(String[] args) throws Exception {
            // 1. 创建ConnectionFactory, 并设置属性
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.72.138");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            // 2. 创建连接
            Connection connection = factory.newConnection();
    
            // 3. 创建channel
            Channel channel = connection.createChannel();
    
    
            String exchangeName = "test_dlx_exchange";
            String routingKey = "dlx.saye";
    
            // 发送消息
            String msg = "自定义消费者, 消息发送 : Hello, DLX ";
            for (int i = 0; i < 5; i++) {
    
                // 携带额外的参数
                Map<String,Object> headerMap = new HashMap<>();
                headerMap.put("num", i);
    
                // 添加额外的属性
                AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                        .deliveryMode(2) // 是否持久化 1 不持久化,2 持久化
                        .contentEncoding("UTF-8")  // 设置字符集
                        .headers(headerMap).build();
    
                /**
                 * mandatory 如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
                 */
                // exchange 表示交换机, 不设置交换机就输入空字符串, 就表示走第一个默认的交换机(AMQP default), 也就是说 routingKet 会和交换机绑定一起
                // routingKet 表示key键,发送到哪一个队列
                // mandatory 如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
                // props 表示消息的其他属性 (BasicProperties)
                // body 表示消息内容
                channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
            }
    
            // 关闭连接
            channel.close();
            connection.close();
    
        }
    
    }
    
    
    

    自定义消费者

    package com.example.rabbitmqapi.dlx;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    
    /**
     * 自定义消费者
     *
     * @author weiximei
     */
    public class MyConsumer extends DefaultConsumer {
    
        private Channel channel;
    
        public MyConsumer(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
        // 第一参数:consumerTag 同一个会话, consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1,可以做此消息处理通道的名字。
        //      因此 envelope.deliveryTag 可以用来回传告诉 rabbitmq 这个消息处理成功 清除此消息(basicAck方法)。
        // 第二个参数: envelope 表示消息类型信息
        // 第三个参数:properties 表示消息路由头的其他属性等
        // 第三个参数: body 消息内容
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("-------------自定义消费者------------");
            System.out.println("consumerTag : " + consumerTag);
            System.out.println("envelope : " + envelope);
            System.out.println("properties : " + properties);
            System.out.println("body : " + new String(body));
    
            // 获取消息的额外参数
            Integer num = (Integer) properties.getHeaders().get("num");
            // 比如我让这个的额外参数等于 0 的时候,就进行重回队列
            if (num == 0) {
                // deliveryTag 参数是消息唯一标识
                // multiple 是否是批量的,一般为false
                // requeue 是否重回队列, true 是,false 不是
                channel.basicNack(envelope.getDeliveryTag(), false, true);
            } else {
    
                /**
                 * 手工签收, 第二个参数表示是否批量签收
                 */
                // envelope.getDeliveryTag() 是消息的唯一标识
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
    
        }
    }
     
    
    

    相关文章

      网友评论

          本文标题:死信队列 DLX,延时队列实现

          本文链接:https://www.haomeiwen.com/subject/jgcokdtx.html