美文网首页
第三章-高级特性6: TTL队列/消息 死信队列

第三章-高级特性6: TTL队列/消息 死信队列

作者: yanghx | 来源:发表于2019-03-28 23:09 被阅读0次

    TTL

    • TTL 是Time To Live 的缩写。也就是生存时间
    • RabbitMQ支持消息的过期时间,在消息发送时可以进行指定。
    • RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除,

    死信队列

    死信队列: DLX ,Dead-Letter-Exchange

    • 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange.这个Exchange就是DLX

    消息变成死信有以下几种情况

    • 消息被拒绝(basic.reject/ basic.nack)并且 requeue=false(没有被重回队列)
    • 消息TTL过期
    • 队列到达最大长度

    描述

    • DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
    • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
    • 可以监听这个队列中消息做相应的处理。这个特性可以弥补RabbitMQ 3.0前支持的immediate参数功能。

    死信队列设置

    • 首先需要设置死信队列的Exchange和queue.然后进行绑定:

      • Exchange: dlx.exchange
      • Queue: dlx.queue
      • RoutingKey: #
    • 然后我们正常声明交换机,队列,绑定,只不过我们需要在队列加上一个参数即可:arguments.put("x-dead-letter-exchange","dlx.exchange");

    • 这样消息在过期,requeue(重回队列) ,队列在达到最大长度时,消息就可以直接路由到死信队列!

    
    
    
    /**
     * 死信队列
     *
     * @author yangHX
     * createTime  2019/3/28 22:51
     */
    public class Producer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
    
            String exchangeName = "test_dlx_exchange";
            String routingKey = "dlx.sve";
            String msg = "Hello RabbitMQ DLX Message";
    
            for (int i = 0; i < 5; i++) {
                AMQP.BasicProperties build = new AMQP.BasicProperties().builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF-8")
                        .expiration("10000")
                        .build();
                channel.basicPublish(exchangeName, routingKey, build, msg.getBytes());
            }
        }
    }
    
    
    /**
     * 死信队列
     *
     * @author yangHX
     * createTime  2019/3/28 22:55
     */
    public class Consumer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = RabbitMqUtil.getConnectionFactory();
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
    
            //这就是一个普通的交换机,和队列 以及路由
            String exchangeName = "test_dlx_exchange";
            String routingKey = "dlx.#";
            String queueName = "test_dlx_queue";
    
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
    
            HashMap<String, Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange", "dlx.exchange");
    
            //这个arguments属性,要设置到声明队列上
            channel.queueDeclare(queueName, true, false, false, arguments);
            channel.queueBind(queueName, exchangeName, routingKey);
    
            //要进行死信队列的声明
            channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
            channel.queueDeclare("dlx.queue", true, false, false, null);
            channel.queueBind("dlx.queue", "dlx.exchange", "#");
    
            channel.basicConsume(queueName, true, new MyConsumer(channel));
    
    
        }
    }
    
    
    
    
    /**
     * @author yangHX
     * createTime  2019/3/28 23:02
     */
    public class MyConsumer extends DefaultConsumer {
        Channel channel;
    
        /**
         * Constructs a new instance and records its association to the passed-in channel.
         *
         * @param channel the channel to which this consumer is attached
         */
        public MyConsumer(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.err.println("-----------consume message----------");
            System.err.println("consumerTag: " + consumerTag);
            System.err.println("envelope: " + envelope);
            System.err.println("properties: " + properties);
            System.err.println("body: " + new String(body));
        }
    }
    
    
    
    
    image.png

    相关文章

      网友评论

          本文标题:第三章-高级特性6: TTL队列/消息 死信队列

          本文链接:https://www.haomeiwen.com/subject/kcdhbqtx.html