美文网首页
RabbitMQ原生API实现消息自动过期

RabbitMQ原生API实现消息自动过期

作者: 砒霜拌辣椒 | 来源:发表于2020-09-27 22:37 被阅读0次

主要有2种方式:

  1. 指定一条消息的过期时间。
  2. 给队列设置消息过期时间,队列中的所有消息都有同样的过期时间。

1、指定消息的过期时间

public class ProducerTTL {
    public static final String HOST = "148.70.153.63";
    public static final String USER_NAME = "libai";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(HOST);
        connectionFactory.setPort(AMQP.PROTOCOL.PORT);
        connectionFactory.setUsername(USER_NAME);
        connectionFactory.setPassword(System.getProperty("password"));
        connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST);

        // 创建连接和通道
        @Cleanup Connection connection = connectionFactory.newConnection();
        @Cleanup Channel channel = connection.createChannel();

        String exchangeName = "amq.direct", queueName = "TTLQueue", routingKey = "TTL";
        // 声明交换机(如果不存在才创建),交换机名称、类型、交换机是否持久化
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
        // 声明队列(如果不存在才创建),队列名称、队列是否持久化、是否排他(连接可见性)、是否自动删除(所有消费者断开连接后删除队列)、参数
        channel.queueDeclare(queueName, true, false, false, null);
        // 将队列和交换机绑定并指定路由键
        channel.queueBind(queueName, exchangeName, routingKey);

        // 发送消息
        String msg = "测试消息自动过期";
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .deliveryMode(1) // 1-表示消息不做持久化,2-表示消息会持久化到磁盘(对性能会有些影响)
                .expiration("30000") // 设置消息过期时间,单位:毫秒
                .build();
        // 把消息发送到指定的交换机,交换机根据路由键推送到绑定的队列中;交换机名称、路由键、属性、消息字节
        channel.basicPublish(exchangeName, routingKey, properties, msg.getBytes());
    }
}

这里构建一个AMQP.BasicProperties对象,设置过期时间,推送消息时传入该对象就可以了。

消息进入队列后,等待30秒后,消息自动过期就失效了。

消息过期

注意:
RabbitMQ只会对队列头部的消息进行过期淘汰。如果单独给消息设置TTL,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短。会造成消息不会及时地过期淘汰,导致消息的堆积。

2、给队列中的所有消息设置过期时间

需要把前面的TTLQueue队列删除,否则会报错。

public class ProducerTTL {
    public static final String HOST = "148.70.153.63";
    public static final String USER_NAME = "libai";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(HOST);
        connectionFactory.setPort(AMQP.PROTOCOL.PORT);
        connectionFactory.setUsername(USER_NAME);
        connectionFactory.setPassword(System.getProperty("password"));
        connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST);

        // 创建连接和通道
        @Cleanup Connection connection = connectionFactory.newConnection();
        @Cleanup Channel channel = connection.createChannel();

        String exchangeName = "amq.direct", queueName = "TTLQueue", routingKey = "TTL";
        // 声明交换机(如果不存在才创建),交换机名称、类型、交换机是否持久化
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
        Map<String, Object> argMap = new HashMap<>();
        argMap.put("x-message-ttl", 30 * 1000); // 设置队列里消息的ttl的时间30s
        // 声明队列(如果不存在才创建),队列名称、队列是否持久化、是否排他(连接可见性)、是否自动删除(所有消费者断开连接后删除队列)、参数
        channel.queueDeclare(queueName, true, false, false, argMap);
        // 将队列和交换机绑定并指定路由键
        channel.queueBind(queueName, exchangeName, routingKey);

        // 发送消息
        String msg = "测试消息自动过期";
        // 把消息发送到指定的交换机,交换机根据路由键推送到绑定的队列中;交换机名称、路由键、属性、消息字节
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
    }
}

声明1个x-message-ttl的属性,同时设置过期时间。在创建队列时,传入该参数。
凡是推送到该队列中的所有消息,都会有一个30秒后过期的属性。

可以看到创建的队列有TTL的特性,表示该队列中的消息会自动过期。

TTL队列

Springboot整合RabbitMQ(四)——设置消息过期时间TTL

代码地址

相关文章

网友评论

      本文标题:RabbitMQ原生API实现消息自动过期

      本文链接:https://www.haomeiwen.com/subject/hrlruktx.html