美文网首页
RabbitMQ原生API实现消息自动过期

RabbitMQ原生API实现消息自动过期

作者: 砒霜拌辣椒 | 来源:发表于2020-09-27 22:37 被阅读0次

    主要有2种方式:

    1. 指定一条消息的过期时间。
    2. 给队列设置消息过期时间,队列中的所有消息都有同样的过期时间。

    1、指定消息的过期时间

    public class ProducerTTL {
        public static final String HOST = "148.70.153.63";
        public static final String USER_NAME = "libai";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(HOST);
            connectionFactory.setPort(AMQP.PROTOCOL.PORT);
            connectionFactory.setUsername(USER_NAME);
            connectionFactory.setPassword(System.getProperty("password"));
            connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST);
    
            // 创建连接和通道
            @Cleanup Connection connection = connectionFactory.newConnection();
            @Cleanup Channel channel = connection.createChannel();
    
            String exchangeName = "amq.direct", queueName = "TTLQueue", routingKey = "TTL";
            // 声明交换机(如果不存在才创建),交换机名称、类型、交换机是否持久化
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
            // 声明队列(如果不存在才创建),队列名称、队列是否持久化、是否排他(连接可见性)、是否自动删除(所有消费者断开连接后删除队列)、参数
            channel.queueDeclare(queueName, true, false, false, null);
            // 将队列和交换机绑定并指定路由键
            channel.queueBind(queueName, exchangeName, routingKey);
    
            // 发送消息
            String msg = "测试消息自动过期";
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(1) // 1-表示消息不做持久化,2-表示消息会持久化到磁盘(对性能会有些影响)
                    .expiration("30000") // 设置消息过期时间,单位:毫秒
                    .build();
            // 把消息发送到指定的交换机,交换机根据路由键推送到绑定的队列中;交换机名称、路由键、属性、消息字节
            channel.basicPublish(exchangeName, routingKey, properties, msg.getBytes());
        }
    }
    

    这里构建一个AMQP.BasicProperties对象,设置过期时间,推送消息时传入该对象就可以了。

    消息进入队列后,等待30秒后,消息自动过期就失效了。

    消息过期

    注意:
    RabbitMQ只会对队列头部的消息进行过期淘汰。如果单独给消息设置TTL,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短。会造成消息不会及时地过期淘汰,导致消息的堆积。

    2、给队列中的所有消息设置过期时间

    需要把前面的TTLQueue队列删除,否则会报错。

    public class ProducerTTL {
        public static final String HOST = "148.70.153.63";
        public static final String USER_NAME = "libai";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(HOST);
            connectionFactory.setPort(AMQP.PROTOCOL.PORT);
            connectionFactory.setUsername(USER_NAME);
            connectionFactory.setPassword(System.getProperty("password"));
            connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST);
    
            // 创建连接和通道
            @Cleanup Connection connection = connectionFactory.newConnection();
            @Cleanup Channel channel = connection.createChannel();
    
            String exchangeName = "amq.direct", queueName = "TTLQueue", routingKey = "TTL";
            // 声明交换机(如果不存在才创建),交换机名称、类型、交换机是否持久化
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
            Map<String, Object> argMap = new HashMap<>();
            argMap.put("x-message-ttl", 30 * 1000); // 设置队列里消息的ttl的时间30s
            // 声明队列(如果不存在才创建),队列名称、队列是否持久化、是否排他(连接可见性)、是否自动删除(所有消费者断开连接后删除队列)、参数
            channel.queueDeclare(queueName, true, false, false, argMap);
            // 将队列和交换机绑定并指定路由键
            channel.queueBind(queueName, exchangeName, routingKey);
    
            // 发送消息
            String msg = "测试消息自动过期";
            // 把消息发送到指定的交换机,交换机根据路由键推送到绑定的队列中;交换机名称、路由键、属性、消息字节
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        }
    }
    

    声明1个x-message-ttl的属性,同时设置过期时间。在创建队列时,传入该参数。
    凡是推送到该队列中的所有消息,都会有一个30秒后过期的属性。

    可以看到创建的队列有TTL的特性,表示该队列中的消息会自动过期。

    TTL队列

    Springboot整合RabbitMQ(四)——设置消息过期时间TTL

    代码地址

    相关文章

      网友评论

          本文标题:RabbitMQ原生API实现消息自动过期

          本文链接:https://www.haomeiwen.com/subject/hrlruktx.html