美文网首页
rabbitmq延时任务(插件实现,过期时间不会堵塞)

rabbitmq延时任务(插件实现,过期时间不会堵塞)

作者: 寂静的春天1988 | 来源:发表于2019-05-31 10:32 被阅读0次

    使用插件模式实现延时任务
    rabbitmqConfig类:建立路由和队列并且绑定

    package com.plugins.config;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.CustomExchange;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @title rabbitmq配置类
     * @author yl
     * @time 2019年5月9日
     * @Description
     */
    @Configuration
    public class RabbitConfig {
    
        // 拼团超时延时交换机
        public static final String DELAY_GROUP_ACTIVITTY_EXCHANGE = "DELAY_GROUP_ACTIVITTY_EXCHANGE";
    
        // 拼团超时队列
        public static final String DELAY_GROUP_ACTIVITTY_QUEUE = "DELAY_GROUP_ACTIVITTY_QUEUE";
    
        // 拼团路由键
        public static final String DELAY_GROUP_ACTIVITTY_ROUTING_KEY = "DELAY_GROUP_ACTIVITTY_ROUTING_KEY";
    
        // 创建一个超时消费队列
        @Bean
        public Queue delayGroupActivityQueue() {
            // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
            return new Queue(DELAY_GROUP_ACTIVITTY_QUEUE, true);
        }
    
        /**
         * 死信交换机
         * 
         * @return
         */
        @Bean
        public CustomExchange delayGroupActivityExchange() {
            Map<String, Object> args = new HashMap<String, Object>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(DELAY_GROUP_ACTIVITTY_EXCHANGE, "x-delayed-message", true, false, args);
        }
    
        /**
         * 将交换机和队列绑定
         * 
         * @return
         */
        @Bean
        public Binding bindingNotify() {
            return BindingBuilder.bind(delayGroupActivityQueue()).to(delayGroupActivityExchange())
                    .with(DELAY_GROUP_ACTIVITTY_ROUTING_KEY).noargs();
        }
    }
    

    Receiver:消费者类

    package com.plugins.config;
    
    import java.io.IOException;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.EnableRabbit;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.stereotype.Component;
    
    import com.rabbitmq.client.Channel;
    
    @Component
    public class Receiver {
        
        @RabbitListener(queues = RabbitConfig.DELAY_GROUP_ACTIVITTY_QUEUE)
        public void get(Message message, Channel channel) throws IOException {
            String msg=new String(message.getBody(),"utf-8");
            System.out.println(message.getBody().toString());
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
    

    sender:生产者类

    package com.plugins.config;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    @Service
    public class Sender {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        
        /**
         * 通过延迟消息插件发动延迟消息
         * @param msg
         * @param expiration
         */
        public void sendDelayMessageByPlugins(Long groupActivityId,Long expiration){
             //消息发送失败返回到队列中, yml需要配置 publisher-returns: true
    //      rabbitTemplate.setMandatory(true);
    //        // 消息返回, yml需要配置 publisher-returns: true
    //      rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
    //            String correlationId = message.getMessageProperties().getCorrelationId();
    //            System.out.println("消息发送失败"+replyText+exchange+routingKey);
    //        });
    
            //绑定异步监听回调函数
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                   System.out.println("成功");
                } else {
                    System.out.println("失败");
                }
            });
            rabbitTemplate.convertAndSend(RabbitConfig.DELAY_GROUP_ACTIVITTY_EXCHANGE,RabbitConfig.DELAY_GROUP_ACTIVITTY_ROUTING_KEY, groupActivityId,(message)->{
                System.out.println("过期时间"+expiration); 
                message.getMessageProperties().setHeader("x-delay", expiration);//设置延迟时间
                 return message;
            });
            
        }
    }
    

    生产者类注释掉的代码因为即使消息发送成功也会触发setReturnCallback(消息未到队列时,触发该方法)这个方法,不知道怎么解决这个问题?而使用死信队列的方式不会有该问题。

    相关文章

      网友评论

          本文标题:rabbitmq延时任务(插件实现,过期时间不会堵塞)

          本文链接:https://www.haomeiwen.com/subject/ranpgqtx.html