美文网首页
SpringBoot整合中间件RabbitMQ(六)

SpringBoot整合中间件RabbitMQ(六)

作者: Invi | 来源:发表于2019-05-06 09:58 被阅读0次

    使用AmqpAdmin创建,删除Queue,Exchanges,Binding。

    1.注入 AmqpAdmin

        @Autowired
        AmqpAdmin amqpAdmin;
    

    AmqpAdmin是由RabbitAutoConfiguration 自动注入的。

    Path:org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration

    2.创建交换器Exchange

    比如创建:DirectExchange

    调用amqpAdmin的declareExchange方法。

    package com.invi;
    
    import com.invi.bean.Book;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class Springboot02AmqpApplicationTests {
    
        //使用*AmqpAdmin*创建,删除Queue,Exchanges,Binding。
    
        @Autowired
        AmqpAdmin amqpAdmin;
    
        @Test
        public void createExchange() {
            DirectExchange directExchange = new DirectExchange("AmqpAdmin-DirectExchange");
            amqpAdmin.declareExchange(directExchange);
        }
    }
    
    

    amqpAdmin.declareXXXX() 都是创建组件的方法哦。

    接口: org.springframework.amqp.core.Exchange

    实现类:

    • AbstractExchange (org.springframework.amqp.core)
      • DirectExchange (org.springframework.amqp.core)
      • FanoutExchange (org.springframework.amqp.core)
      • CustomExchange (org.springframework.amqp.core)
      • TopicExchange (org.springframework.amqp.core)
      • HeadersExchange (org.springframework.amqp.core)

    DirectExchange 可以通过构造方法定制:

    org.springframework.amqp.core.DirectExchange#DirectExchange(java.lang.String)

    package org.springframework.amqp.core;
    
    import java.util.Map;
    
    public class DirectExchange extends AbstractExchange {
    
      public static final DirectExchange DEFAULT = new DirectExchange("");
    
    
      public DirectExchange(String name) {
          super(name);
      }
    
      public DirectExchange(String name, boolean durable, boolean autoDelete) {
          super(name, durable, autoDelete);
      }
    
      public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
          super(name, durable, autoDelete, arguments);
      }
    
      @Override
      public final String getType() {
          return ExchangeTypes.DIRECT;
      }
    
    }
    
    

    运行测试方法:

    Name Type Features Message rate in Message rate out
    (AMQP default) direct D
    AmqpAdmin-DirectExchange direct D
    amq.direct direct D
    amq.fanout fanout D
    amq.headers headers D
    amq.match headers D
    amq.rabbitmq.trace topic D I
    amq.topic topic D
    exchange.direct direct D 0.00/s 0.00/s

    发现已经创建好新的Exchange:AmqpAdmin-DirectExchange ,类型 direct !

    3.创建队列Queue

    编码:

    
        @Autowired
        AmqpAdmin amqpAdmin;
    
        @Test
        public void createQueue() {
            //通过构造方法 定制队列属性
            Queue queue = new Queue("AmqpAdmin-Queue", true);
            amqpAdmin.declareQueue(queue);
        }
    

    Queue可以通过构造方法定制:

    package org.springframework.amqp.core;
    
    import java.util.Map;
    
    import org.springframework.util.Assert;
    
    public class Queue extends AbstractDeclarable {
    
      private final String name;
    
      private final boolean durable;
    
      private final boolean exclusive;
    
      private final boolean autoDelete;
    
      private final java.util.Map<java.lang.String, java.lang.Object> arguments;
    
     
      public Queue(String name) { this(name, true, false, false); }
    
     
      public Queue(String name, boolean durable) { this(name, durable, false, false, null); }
        
    
      /**
       * 构造一个新的队列,给定一个名称、持久性、排他性和自动删除标志。
         * @param name 指定队列的名称。
         * @param durable 为true如果我们声明一个耐久队列(该队列将在服务器重启后存活)
         * @param exclusive 为true如果我们声明了一个独占队列(队列只会被声明者使用连接)
         * @param autoDelete为true 如果服务器在队列不再使用时应该删除队列, 
       */
      public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)  {
          this(name, durable, exclusive, autoDelete, null);
      }
    
      ......
    
    }
    
    
    

    运行测试方法:

    Overview Messages Message rates +/-
    Name Features State Ready Unacked Total incoming deliver / get ack
    AmqpAdmin-Queue D idle 0 0 0
    unionpaysmart D idle 1 0 1 0.00/s 0.00/s 0.00/s
    unionpaysmart.news D idle 1 0 1 0.00/s 0.00/s 0.00/s
    invi D idle 1 0 1 0.00/s 0.00/s 0.00/s
    invi.emps D idle 1 0 1 0.00/s 0.00/s 0.00/s
    invi.idea D idle 1 0 1 0.00/s 0.00/s 0.00/s
    invi.news D idle 0 0 0 0.00/s 0.00/s 0.00/s

    队列 AmqpAdmin-Queue 已经被创建。

    4.将交换器和队列绑定。

    编码:

    
        @Autowired
        AmqpAdmin amqpAdmin; 
    
        @Test
        public void createBinding() {
            Binding binding
                    = new Binding("AmqpAdmin-Queue", Binding.DestinationType.QUEUE, "AmqpAdmin-DirectExchange", "AmqpAdmin-routingKey", null);
            amqpAdmin.declareBinding(binding);
        }
    

    构造Binding:

    • String destination 目的地

      • 和那个队列绑定:AmqpAdmin-Queue ,......
    • DestinationType destinationType 目的地类型( QUEUE, EXCHANGE;)

      • Binding.DestinationType.QUEUE
      • Binding.DestinationType.EXCHANGE
    • String exchange 交换器名

    • String routingKey 路由键 自定义名字

    • Map<String, Object> arguments 参数头信息 ,没有的话 写NULL

    源码Binding类:

    package org.springframework.amqp.core;
    
    import java.util.Map;
    
    public class Binding extends AbstractDeclarable {
    
        public enum DestinationType {
            QUEUE, EXCHANGE;
        }
    
        private final String destination;
    
        private final String exchange;
    
        private final String routingKey;
    
        private final Map<String, Object> arguments;
    
        private final DestinationType destinationType;
    
        public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
                Map<String, Object> arguments) {
            this.destination = destination;
            this.destinationType = destinationType;
            this.exchange = exchange;
            this.routingKey = routingKey;
            this.arguments = arguments;
        }
    
        public String getDestination() {
            return this.destination;
        }
    
        public DestinationType getDestinationType() {
            return this.destinationType;
        }
    
        public String getExchange() {
            return this.exchange;
        }
    
        public String getRoutingKey() {
            return this.routingKey;
        }
    
        public Map<String, Object> getArguments() {
            return this.arguments;
        }
    
        public boolean isDestinationQueue() {
            return DestinationType.QUEUE.equals(this.destinationType);
        }
    
        @Override
        public String toString() {
            return "Binding [destination=" + this.destination + ", exchange=" + this.exchange + ", routingKey="
                        + this.routingKey + "]";
        }
    
    }
    
    

    Exchange: AmqpAdmin-DirectExchange 默认

    ... no bindings ...

    运行测试方法:

    To Routing key Arguments
    AmqpAdmin-Queue AmqpAdmin-routingKey

    至此已经完成了:AmqpAdmin-DirectExchange 和 AmqpAdmin-Queue的绑定。

    相关文章

      网友评论

          本文标题:SpringBoot整合中间件RabbitMQ(六)

          本文链接:https://www.haomeiwen.com/subject/jhnloqtx.html