美文网首页RabbitMQ学习RabbitMQ
RabbitMQ笔记十七: Alternate Exchange

RabbitMQ笔记十七: Alternate Exchange

作者: 二月_春风 | 来源:发表于2017-10-29 19:31 被阅读131次

    Alternate Exchange

    Rabbitmq自己扩展的功能,不是AMQP协议定义的。
    Alternate Exchange属性的作用,创建Exchange指定该ExchangeAlternate Exchange属性,发送消息的时候根据route key并没有把消息路由到队列中去,这就会将此消息路由到Alternate Exchange属性指定的Exchange上了。

    创建一个Fanout类型的Exchange

    自动声明带有Alternate Exchange的Exchange,

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.Exchange;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public Exchange exchange(){
            Map<String,Object> argsMap = new HashMap<>();
            argsMap.put("alternate-exchange","zhihao.miao.exchange.order");
            return new TopicExchange("zhihao.miao.exchange.pay",true,false,argsMap);
        }
    
    
         @Bean
        public Binding binding(){
            return new Binding("zhihao.miao.pay",Binding.DestinationType.QUEUE,"zhihao.miao.exchange.pay","zhihao.miao.pay.*",new HashMap<>());
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    }
    

    应用启动类

    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    @ComponentScan
    public class Application {
        public static void main(String[] args) {
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            //使得客户端第一次连接rabbitmq
            context.getBean(RabbitAdmin.class).getQueueProperties("**");
            context.close();
        }
    }
    

    启动应用启动类之后生成一个带有alternate-exchange属性的Exchange

    生成了一个带有alternate-exchange属性的Exchange

    zhihao.miao.exchange.pay是个包含alternate-exchange属性的topic类型的exchange(route key是zhihao.miao.pay.*,队列名是zhihao.miao.pay),alternate-exchange属性指定的是fanout类型的exchange,exchange的名称是zhihao.miao.exchange.order(绑定到zhihao.miao.order队列)

    如果正确的路由(符合zhihao.miao.pay.*)规则,则zhihao.miao.pay队列接收到消息。如果是不正确的路由(不符合zhihao.miao.pay.*)规则,则路由到zhihao.miao.exchange.pay Exchange指定的alternate-exchange属性的Exchange中。

    测试

    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    }
    

    启动应用类:

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
            System.out.println(rabbitTemplate);
    
            byte[] body = "hello,zhihao.miao".getBytes();
    
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("json");
    
            Message message = new Message(body,messageProperties);
    
            rabbitTemplate.send("zhihao.miao.exchange.pay","zhihao.miao.pay.aaa",message);
    
            TimeUnit.SECONDS.sleep(3);
    
            context.close();
        }
    }
    

    此时发送消息到名为zhihao.miao.exchange.payExchange,而Route keyzhihao.miao.pay.aaa,所以能正确地路由到zhihao.miao.pay队列中。

    当指定的Route key不能正确的路由的时候,则直接发送到名为zhihao.miao.exchange.orderExchange,而因为我们定义的Exchange类型是fanout类型,所以就能路由到zhihao.miao.order队列中了。

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
            System.out.println(rabbitTemplate);
    
            byte[] body = "hello,zhihao.miao".getBytes();
    
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("json");
    
            Message message = new Message(body,messageProperties);
    
            //此时route不到,那么就路由到alternate-exchange的属性配置的exchage
            rabbitTemplate.send("zhihao.miao.exchange.pay","hehe.zhihao.miao",message);
    
            TimeUnit.SECONDS.sleep(3);
    
            context.close();
        }
    }
    

    一般alternate-exchange属性的值最好是fanout类型的exchange,否则还会根据route keyalternate-exchange属性的exchange进行匹配再去路由。而如果指定了fanout类型的exchange,不需要去匹配routekey

    alternate-exchange配置的Exchange也不能正确路由

    示列说明

    创建了一个topic类型的Exchange带有alternate-exchange属性,其alternate-exchangeexchange也是topic类型的exchange,如果消息的route key既不能,这个消息就会丢失。可以触发publish confirm机制,表示这个消息没有确认。

    创建Exchange binding关系 binding关系

    配置:

    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    }
    

    正常路由到Exchange名为head.info路由的队列中。

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
            System.out.println(rabbitTemplate);
    
            byte[] body = "hello,zhihao.miao".getBytes();
    
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("json");
    
            Message message = new Message(body,messageProperties);
    
            //正确路由到header.info队列
            rabbitTemplate.send("head.info","head.info.a",message);
    
            TimeUnit.SECONDS.sleep(3);
    
            context.close();
        }
    }
    

    路由到Exchange名为head.info指定的alternate-exchange配置的head.error所路由的队列中。

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
            System.out.println(rabbitTemplate);
    
            byte[] body = "hello,zhihao.miao".getBytes();
    
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("json");
    
            Message message = new Message(body,messageProperties);
    
            //正确路由到header.info队列
            rabbitTemplate.send("head.info","head.error.a",message);
    
            TimeUnit.SECONDS.sleep(3);
    
            context.close();
        }
    }
    

    二者都不符合则消息丢失,可以使用publish confirm来做生产端的消息确认,因为消息没有正确路由到队列,所以触发了return method。

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
            System.out.println(rabbitTemplate);
    
            byte[] body = "hello".getBytes();
    
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("json");
    
            Message message = new Message(body,messageProperties);
    
            //正确路由到header.info队列
            rabbitTemplate.send("head.info","header.debug.a",message);
    
            TimeUnit.SECONDS.sleep(30);
    
            context.close();
        }
    }
    

    配置:

    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            factory.setPublisherReturns(true);
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                System.out.println("===========消息无法被路由=========");
                System.out.println("replyCode: "+replyCode);
                System.out.println("replyText: "+replyText);
                System.out.println("exchange: "+exchange);
                System.out.println("routingKey: "+routingKey);
            });
            return rabbitTemplate;
        }
    }
    

    总结

    • 建议Alternate Exchange的类型是fanout,防止出现路由失败。
      fanout exchange一般不需要指定Alternate Exchange属性。
    • 如果一个Exchange指定了Alternate Exchange,那就意味着,当ExchangeAlternate Exchange都无法路由的时候,才会触发return method

    参考资料

    Alternate Exchange官网

    相关文章

      网友评论

        本文标题:RabbitMQ笔记十七: Alternate Exchange

        本文链接:https://www.haomeiwen.com/subject/gbwauxtx.html