美文网首页RabbitMQ学习RabbitMQ
RabbitMQ笔记十四:ReturnListener

RabbitMQ笔记十四:ReturnListener

作者: 二月_春风 | 来源:发表于2017-10-22 12:20 被阅读116次

    ReturnListener

    mandatory

    This flag tells the server how to react if the message cannot be routed to a queue.if this flag is set,the server will return an unroutable message with a return method.if this flag is zero,the server silently drops the message
    如果mandatory有设置,则当消息不能路由到队列中去的时候,会触发return method。如果mandatory没有设置,则当消息不能路由到队列的时候,server会删除该消息。

    immediate

    This flag tell the server how to react if the message cannot be routed to a queue consumer immediately.if the flag is set ,the server will return an undeliverable message with a return method.if this flag is zero,the server will queue the message ,but with no guarantee that it will ever be consumed.
    如果有设置immediate,则当消息路由到队列的时候,没有消费者的时候,会触发return method。如果immediate标识是0,则服务器就会将消息加入队列,但是不能保证这个消息能被消费。

    注意
    immediate属性在Rabbitmq3.x的时候,被废弃了。
    因为这个关键字违背了生产者和消费者之间解耦的特性,因为生产者不关心消息是否被消费者消费掉,所以这个字段被drop掉了。

    RabbitMQ java client 相关API

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    
    public class Send {
        public static void main(String[] args) throws Exception{
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //或者
            connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
    
            Connection connection = connectionFactory.newConnection();
    
            Channel channel = connection.createChannel();
    
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode,
                                         String replyText,
                                         String exchange,
                                         String routingKey,
                                         AMQP.BasicProperties properties,
                                         byte[] body)
                        throws IOException {
                    System.out.println("=========handleReturn===method============");
                    System.out.println("replyText:"+replyText);
                    System.out.println("exchange:"+exchange);
                    System.out.println("routingKey:"+routingKey);
                    System.out.println("message:"+new String(body));
                }
            });
    
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).
                    contentEncoding("UTF-8").build();
    
            //第三个参数是设置的mandatory
            channel.basicPublish("zhihao.direct.exchange","log.aaa",true,properties,"注册验证码".getBytes());
    
            TimeUnit.SECONDS.sleep(10);
    
            channel.close();
            connection.close();
        }
    }
    
    zhihao.direct.exchange的路由信息

    因为路由不到正确的队列所以触发了ReturnListener方法的回调,控制台上打印:

    replyText:NO_ROUTE
    exchange:zhihao.direct.exchange
    routingKey:log.aaa
    message:注册验证码
    

    spring-amqp的相关api

    配置:

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            factory.setPublisherReturns(true);
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMandatory(true);
            //rabbitTemplate.setMandatoryExpression(new SpelExpressionParser().parseExpression("(1+2) > 3")); //表达式的值为false
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
                @Override
                public void returnedMessage(Message message,
                                            int replyCode,
                                            String replyText,
                                            String exchange,
                                            String routingKey){
                    System.out.println("============returnedMessage==method=========");
                    System.out.println("replyCode: "+replyCode);
                    System.out.println("replyText: "+replyText);
                    System.out.println("exchange: "+exchange);
                    System.out.println("routingKey: "+routingKey);
                }
            });
            return rabbitTemplate;
        }
    }
    

    发送消息

    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
    
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
    
            RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
            System.out.println(rabbitTemplate);
    
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.getHeaders().put("desc","消息发送");
            messageProperties.getHeaders().put("token","234sdfsdf3r342dsfd1232");
    
            //路由不到指定的队列
            rabbitTemplate.convertAndSend("zhihao.direct.exchange","log.aaa","hello welcome");
    
            TimeUnit.SECONDS.sleep(10);
    
            context.close();
        }
    }
    
    zhihao.direct.exchange的路由信息

    rabbitTemplate.setReturnCallback的方法会被执行,控制台打印:

    ============returnedMessage==method=========
    replyCode: 312
    replyText: NO_ROUTE
    exchange: zhihao.direct.exchange
    routingKey: log.aaa
    

    总结

    • 设置factory.setPublisherReturns(true);
    • rabbitTemplate.setMandatory(true);或rabbitTemplate.setMandatoryExpression(new SpelExpressionParser().parseExpression("(1+2) > 2")); //表达式的值为true
      才会触发returnCallback回调方法的执行。

    相关文章

      网友评论

        本文标题:RabbitMQ笔记十四:ReturnListener

        本文链接:https://www.haomeiwen.com/subject/sbivextx.html