美文网首页
rabbitmq延时队列

rabbitmq延时队列

作者: 禅兜 | 来源:发表于2018-11-26 14:17 被阅读0次

    延时队列
    在实际业务场景中可能会用到延时消息发送,例如支付场景,准时支付、超过未支付将执行不同的方案,其中超时未支付可以看做一个延时消息。
    RabbitMQ本身不具有延时消息队列的功能,但是可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现。其原理给消息设置过期时间,在消息队列上为过期消息指定转发器,这样消息过期后会转发到与指定转发器匹配的队列上,变向实现延时队列。利用RabbitMQ的这种特性,应该可以实现很多现实中的业务,我们可以发挥想象。
    rabbitmq-delayed-message-exchange,我们也可以使用插件来实现延时队列。利用TTL、DLX实现的延时队列可以中断,使用插件实现的延时队列是否可以中断?.该功能是已插件的形式实现的。在做实验时确保有安装过该插件
    查询插件列表:

    rabbitmq-plugins list

    image.png
    如果没有安装插件
    下载参考:https://blog.csdn.net/youjin/article/details/82586888

    参考:https://blog.csdn.net/azhegps/article/details/53815117
    https://www.cnblogs.com/haoxinyue/p/6613706.html

    使用RabbitMQ来实现延迟任务必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现上述需求

    过期方式有两种:
    1.队列过期时间,发送该队列(queue1)的消息都是统一过期时间,过期后会进入死信队列(queue2)
    2.消息过期时间,如果依次发送了10s,5s,3s的三条消息到该队列(queue1)的消息,队列会在10s过期才将消息从
    queue1转发到queue2死信队列中(在队列queue1没有消费的情况下),监听队列queue2的消费者,这个时候才会消费到信息,
    除非发送的消息达到queue1的
    消费者消费的时候,也会在10s后才会消费这个过期消息,只有当过期的消息到了队列的顶端(队首),
    才会被真正的丢弃或者进入死信队列(queue2)
    如果发送的是5s,10s,3s的消息,则5s先消费,再消费10s和3s消息

    实例:
    1.先创建exchange:test.message
    并绑定队列:my.timeout.message
    routing: my.routing.key

    2.创建队列:my.timeout.message
    x-dead-letter-exchange: timeout-exchange
    x-dead-letter-routing-key: test.timeout.message
    durable: true

    image.png

    3.创建消费过期转发exchange:timeout-exchange,类型fanout方式

    并绑定队列:my.timeout.message
    4.创建队列:my.timeout.message


    image.png

    5.发送者

    import java.io.IOException;
    import java.io.Serializable;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    import com.rabbitmq.client.AMQP.BasicProperties;  
    import com.rabbitmq.client.AMQP.BasicProperties.Builder;  
    import com.rabbitmq.client.BuiltinExchangeType;  
    import com.rabbitmq.client.Channel;  
    import com.rabbitmq.client.Connection;  
      
    /**
     * https://www.cnblogs.com/haoxinyue/p/6613706.html
     * 发送消息类 
     * @author
     * 
     */  
    public class Send{
        private static final String EXCHANGE_NAME = "test.message";
    
        private final static String QUEUE_NAME = "my.timeout.message";
    
        private static final String ROUTKEY="my.routing.key";
        /** 
         * 在topic转发器的基础上练习延时转发,发送消息时指定消息过期时间 
         * 消息已发送到queue上,但未有consumer进行消费 
         * @param object 消息主体 
         * @throws IOException 
         */  
        public static void sendAToB(String object) throws Exception{
            Connection conn=MqManager.newConnection();  
            Channel channel=conn.createChannel();  
            //声明headers转发器  
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS,true);
            //定义headers存储的键值对  
            Map<String, Object> headers=new HashMap<String, Object>();  
            headers.put("key", "123456");  
            headers.put("token", "654321");
          Map<String, Object> args = new HashMap<String, Object>();
           // args.put("x-message-ttl",12000); //消息过去
            args.put("x-dead-letter-exchange", "timeout-exchange");//过期消息转向路由
            args.put("x-dead-letter-routing-key", "test.timeout.message");//过期消息转向路由相匹配routingkey
           channel.queueDeclare(QUEUE_NAME, true, false, false, args);
            //把键值对放在properties  
            Builder properties=new BasicProperties.Builder();  
            properties.headers(headers);  
            properties.deliveryMode(2);//持久化
            //指定消息过期时间为12秒,队列上也可以指定消息的过期时间,两者以较小时间为准  
            properties.expiration("12000");//(12000)延时12秒,不会及时删除(在consuemr消费时判定是否过期,因为每条消息的过期时间不一致,删除过期消息就需要扫描整个队列)
            channel.basicPublish(EXCHANGE_NAME,ROUTKEY ,properties.build(),object.getBytes());
            System.out.println("Send '"+object+"'"+",时间:"+DateUtils.dataToStr(new Date()));
            channel.close();  
            conn.close();  
        }
        public static void main(String[] args) throws Exception {  
            sendAToB("我开始测试延时消息了3!");
        }  
    } 
    

    6.延期消费者

    import java.io.IOException;
    import java.util.Date;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.BuiltinExchangeType;  
    import com.rabbitmq.client.Channel;  
    import com.rabbitmq.client.Connection;  
    import com.rabbitmq.client.Consumer;  
    import com.rabbitmq.client.DefaultConsumer;  
    import com.rabbitmq.client.Envelope;  
      
    /** 
     * 延时消息处理类 
     * @author  
     * <p>
     *     即使一个消息比在同一队列中的其他消息提前过期,提前过期的也不会优先进入死信队列,它们还是按照入库的顺序让消费者消费。
     *     如果第一进去的消息过期时间是1小时,那么死信队列的消费者也许等1小时才能收到第一个消息。
     *     参考官方文档发现“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”只有当过期的消息到了队列的顶端(队首),
     *     才会被真正的丢弃或者进入死信队列。所以在考虑使用RabbitMQ来实现延迟任务队列的时候,需要确保业务上每个任务的延迟时间是一致的。
     *    如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延迟时间的消息建立单独的消息队列
     *
     *
     * </p>
     */
    
    public class DelayRecv {
        private static final String EXCHANGE_NAME = "timeout-exchange";
        private final static String QUEUE_NAME = "my.timeout.message2";
        private static final String ROUTKEY="test.timeout.message";
        /** 
         * 创建队列并声明consumer用于处理转发过来的延时消息 
         * @throws Exception 
         */  
        public static void delayRecv() throws Exception{  
            Connection conn=MqManager.newConnection();  
            final  Channel channel=conn.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,true);
            String queueName=channel.queueDeclare().getQueue();
            System.out.println("队列名称"+queueName);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTKEY);
            Consumer consumer=new DefaultConsumer(channel){  
                @Override  
                public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{
                    String mes= new String(body);
                    System.out.println(envelope.getRoutingKey()+":delay Received :'"+mes+"' done,接受时间:"+DateUtils.dataToStr(new Date()));
                   // channel.basicAck(envelope.getDeliveryTag(), true);
    //                if(mes.indexOf("8")>0){
    //                    System.out.println("测试回滚");
    //                    channel.basicNack(envelope.getDeliveryTag(),true, true);
    //                    System.out.println("测试回滚完成");
    //                }
                }  
            };
            System.out.println("执行开始 ");
            //关闭自动应答机制,默认开启;这时候需要手动进行应该  
          channel.basicConsume(QUEUE_NAME, true, consumer);
    
            System.out.println("执行完了");
        }  
          
        public static void main(String[] args) throws Exception {  
            delayRecv();  
        }  
      
    } 
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     */
    public class MqManager {
    
        public static Connection newConnection()throws Exception{
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
            // 获取到连接以及mq通道
            Connection connection = factory.newConnection();
            return connection;
        }
    }
    

    sprintboot实现方式请见:https://www.jianshu.com/p/e75dab831d95

    相关文章

      网友评论

          本文标题:rabbitmq延时队列

          本文链接:https://www.haomeiwen.com/subject/qslgnftx.html