美文网首页RabbitMQ学习RabbitMQ
RabbitMQ笔记六:Spring AMQP简介与quick

RabbitMQ笔记六:Spring AMQP简介与quick

作者: 二月_春风 | 来源:发表于2017-09-25 22:22 被阅读332次

    Spring AMQP

    先对本片博客进行总结,可直接跳过总结看下面的博客正文。

    总结:
    spring-amqp二个核心类RabbitAdmin和RabbitTemplate类
    1.RabbitAdmin类完成对Exchange,Queue,Binging的操作,在容器中管理了RabbitAdmin类的时候,可以对Exchange,Queue,Binging进行自动声明。
    2.RabbitTemplate类是发送和接收消息的工具类。(下一篇博客具体讲解)

    简介

    Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。 它提供了一个“模板”(template)作为发送和接收消息的高级抽象。 它还通过“侦听器容器(listener container)”为消息驱动的POJO提供支持。 这些库促进AMQP资源的管理,同时促进使用依赖注入和声明式配置。 在所有这些情况下,您将看到与Spring框架中的JMS支持的相似之处。

    Spring AMQP包括两个部分;spring-amqp是对amqp的一些概念的一些抽象。spring-rabbit是对AMQP的实现RabbitMQ的实现。

    特征

    1. 异步处理消费消息的一个监听容器(Listener container
    2. 使用RabbitTemplate类的实例来发送和接收消息。
    3. 使用RabbitAdmin去自动声明队列(queues),交换机(exchanges),绑定(bindings

    spring-amqp模块是对AMQP协议的一个抽象和封装。所以说对所有的AMQP的实现都进行的抽象和封装,比如
    org.springframework.amqp.core.Binding:绑定的封装,类型有QUEUEEXCHANGE
    org.springframework.amqp.core.Exchange:其有基本的四种实现

    Exchange的实现

    org.springframework.amqp.core.Message:消息是由属性和body构成,将属性也封装成一个对象MessageProperties。
    org.springframework.amqp.core.MessageProperties:对消息属性进行了抽象。
    org.springframework.amqp.core.Queue:队列的封装。

    还有对消息的转换进行了封装,相关的类在org.springframework.amqp.support.converter包下面。(下面的博客会专门讲解消息转换converter的一些实现)。

    spring-rabbit模块是建立在springspring-amqpamqp-client(rabbitmq java client)之上的,是具体操作RabbitMQ的,底层对Rabbitmq的操作是使用amqp-client的。

    二个核心类,一个是org.springframework.amqp.rabbit.core.RabbitAdminorg.springframework.amqp.rabbit.core.RabbitTemplate

    spring-rabbit对日志进行了扩展,可以将日志发送到mq中。

    Demo

    加入spring-amqp依赖:

        <dependencies>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>1.7.3.RELEASE</version>
            </dependency>
        </dependencies>
    

    RabbitmqAdmin使用

    容器中纳入ConnectionFactory和RabbitAdmin管理

    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            return new RabbitAdmin(connectionFactory);
        }
    }
    

    应用类,使用RabbitAdmin进行Exchange,Queue,Binding操作

    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @ComponentScan
    public class Application {
        public static void main(String[] args) {
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
    
            RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
            System.out.println(rabbitAdmin);
    
            //创建四种类型的Exchange,可重复执行
            rabbitAdmin.declareExchange(new DirectExchange("zhihao.direct.exchange",true,false));
            rabbitAdmin.declareExchange(new TopicExchange("zhihao.topic.exchange",true,false));
            rabbitAdmin.declareExchange(new FanoutExchange("zhihao.fanout.exchange",true,false));
            rabbitAdmin.declareExchange(new HeadersExchange("zhihao.header.exchange",true,false));
    
            //删除Exchange
            //rabbitAdmin.deleteExchange("zhihao.header.exchange");
    
            //定义队列
            rabbitAdmin.declareQueue(new Queue("zhihao.debug",true));
            rabbitAdmin.declareQueue(new Queue("zhihao.info",true));
            rabbitAdmin.declareQueue(new Queue("zhihao.error",true));
    
            //删除队列
            //rabbitAdmin.deleteQueue("zhihao.debug");
    
            //将队列中的消息全消费掉
            rabbitAdmin.purgeQueue("zhihao.info",false);
    
            //绑定,指定要绑定的Exchange和Route key
            rabbitAdmin.declareBinding(new Binding("zhihao.debug",Binding.DestinationType.QUEUE,
                    "zhihao.direct.exchange","zhihao.hehe",new HashMap()));
    
            rabbitAdmin.declareBinding(new Binding("zhihao.info",Binding.DestinationType.QUEUE,
                    "zhihao.direct.exchange","zhihao.haha",new HashMap()));
    
            rabbitAdmin.declareBinding(new Binding("zhihao.error",Binding.DestinationType.QUEUE,
                    "zhihao.direct.exchange","zhihao.welcome",new HashMap()));
    
    
            //绑定header exchange
            Map<String,Object> headerValues = new HashMap<>();
            headerValues.put("type",1);
            headerValues.put("size",10);
    
            //whereAll指定了x-match:   all参数
            rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.debug")).
                    to(new HeadersExchange("zhihao.header.exchange")).whereAll(headerValues).match());
    
            //whereAll指定了x-match:   any参数
            rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.info")).
                    to(new HeadersExchange("zhihao.header.exchange")).whereAny(headerValues).match());
    
    
            //进行解绑
            rabbitAdmin.removeBinding(BindingBuilder.bind(new Queue("zhihao.info")).
                  to(new TopicExchange("zhihao.direct.exchange")).with("zhihao.info"));
    
            //声明topic类型的exchange
            rabbitAdmin.declareExchange(new TopicExchange("zhihao.hehe.exchange",true,false));
            rabbitAdmin.declareExchange(new TopicExchange("zhihao.miao.exchange",true,false));
    
            //exchange与exchange绑定
            rabbitAdmin.declareBinding(new Binding("zhihao.hehe.exchange",Binding.DestinationType.EXCHANGE,
                    "zhihao.miao.exchange","zhihao",new HashMap()));
    
            //使用BindingBuilder进行绑定
            rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.debug")).
                    to(new TopicExchange("zhihao.topic.exchange")).with("zhihao.miao"));
    
            //rabbitAdmin.declareBinding(new Binding("amq.rabbitmq.trace",Binding.DestinationType.EXCHANGE,
                    //"amq.rabbitmq.log","zhihao",new HashMap()));
    
            context.close();
    
        }
    
    }
    
    

    Exchange ,Queue,Binding的自动声明

    1. 直接把要自动声明的组件Bean纳入到spring容器中管理即可。
      自动声明发生的rabbitmq第一次连接创建的时候。如果系统从启动到停止没有创建任何连接,则不会自动创建。

    2. 自定声明支持单个和多个。

    自动声明Exchange

    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class DeclareConfig {
    
        //声明direct类型的Exchange
        @Bean
        public Exchange directExchange(){
            return new DirectExchange("zhihao.direct.exchange",true,false);
        }
    
        //声明topic类型的Exchange
        @Bean
        public Exchange topicExchange(){
            return new TopicExchange("zhihao.topic.exchange",true,false);
        }
    
        //声明fanout类型的Exchange
        @Bean
        public Exchange fanoutExchange(){
            return new FanoutExchange("zhihao.fanout.exchange",true,false);
        }
    
        //声明headers类型的Exchange
        @Bean
        public Exchange headersExchange(){
            return new HeadersExchange("zhihao.header.exchange",true,false);
        }
    }
    

    配置类,在spring容器中纳入ConnectionFactory实例和RabbitAdmin实例

    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            return new RabbitAdmin(connectionFactory);
        }
    }
    

    启动应用类,自动声明发生的rabbitmq第一次连接创建的时候。如果系统从启动到停止没有创建任何连接,则不会自动创建。

    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    @ComponentScan
    public class Application {
        public static void main(String[] args) {
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            //使得客户端第一次连接rabbitmq
            context.getBean(RabbitAdmin.class).getQueueProperties("**");
            context.close();
        }
    }
    
    控制台显示已经声明创建

    队列的自动声明

    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class DeclareConfig {
    
        @Bean
        public Queue debugQueue(){
            return new Queue("zhihao.debug",true);
        }
    
        @Bean
        public Queue infoQueue(){
            return new Queue("zhihao.info",true);
        }
    
        @Bean
        public Queue errorQueue(){
            return new Queue("zhihao.error",true);
        }
    }
    

    上面的Application和DeclareConfig不列举出来了,执行Application应用启动类,查看web管控台的队列生成。

    Queue自动创建声明

    绑定的自动生成
    DeclareConfig类中,

    import org.springframework.amqp.core.Binding;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    
    @Configuration
    public class DeclareConfig {
    
        @Bean
        public Binding binding(){
            return new Binding("zhihao.debug",Binding.DestinationType.QUEUE,
                    "zhihao.direct.exchange","zhihao.debug",new HashMap());
        }
    
        @Bean
        public Binding binding2(){
            return new Binding("zhihao.info",Binding.DestinationType.QUEUE,
                    "zhihao.direct.exchange","zhihao.info",new HashMap());
        }
    
        @Bean
        public Binding binding3(){
            return new Binding("zhihao.error",Binding.DestinationType.QUEUE,
                    "zhihao.direct.exchange","zhihao.error",new HashMap());
        }
    }
    
    

    上面的Application和DeclareConfig不列举出来了,执行Application应用启动类,查看web管控台的Binding生成。

    Binding自动创建声明

    一次性生成多个queue,exchange,binding

    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    
    @Configuration
    public class DeclareConfig {
    
        @Bean
        public List<Queue> queues(){
            List<Queue> queueList = new ArrayList<>();
            queueList.add(new Queue("chao.wang.debug",true));
            queueList.add(new Queue("chao.wang.info",true));
            queueList.add(new Queue("chao.wang.error",true));
            return queueList;
        }
    
        @Bean
        public List<Exchange> exchanges(){
            List<Exchange> exchangeList = new ArrayList<>();
            exchangeList.add(new TopicExchange("chao.wang.debug.topic.exchange",true,false));
            exchangeList.add(new TopicExchange("chao.wang.info.topic.exchange",true,false));
            exchangeList.add(new TopicExchange("chao.wang.error.topic.exchange",true,false));
            return exchangeList;
        }
    
        @Bean
        public List<Binding> bindings(){
            List<Binding> bindingList = new ArrayList<>();
            bindingList.add(BindingBuilder.bind(new Queue("chao.wang.debug")).
                    to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.#"));
            bindingList.add(BindingBuilder.bind(new Queue("chao.wang.info")).
                    to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.*"));
            bindingList.add(BindingBuilder.bind(new Queue("chao.wang.error")).
                    to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.error.*"));
            return bindingList;
        }
    }
    

    上面的Application和DeclareConfig不列举出来了,执行Application应用启动类,查看web管控台Exchange,Queue,Binding都已经生成。

    注意
    当声明队列是以amp开头的时候,队列是不能创建声明的。

    @Bean
    public Queue amqQueue(){
       return new Queue("amp.log",true);
    }
    

    总结

    自动声明的一些条件

    • 要有连接(对rabbitmq的连接)
    • 容器中要有org.springframework.amqp.rabbit.core.RabbitAdmin的实例
    • RabbitAdminautoStartup属性必须为true。
    • 如果ConnectionFactory使用的是CachingConnectionFactory,则cacheMode必须是CachingConnectionFactory.CacheMode.CHANNEL(默认)。
    • 所要声明的组件(QueueExchangeBinding)的shouldDeclare必须是true(默认就是true
    • Queue队列的名字不能以amq.开头。

    注意:QueueExchangeBinding都直接或者间接的继承Declarable,而Declarable中定义了shouldDeclare的方法。

    自动声明源码分析

    org.springframework.amqp.rabbit.core.RabbitAdmin实现InitializingBean接口,在BeanFactory设置完所有属性之后执行特定初始化(afterPropertiesSet方法)

    RabbitAdminafterPropertiesSet方法,

    @Override
        public void afterPropertiesSet() {
    
            synchronized (this.lifecycleMonitor) {
    
              //autoStartup属性的值为false的时候,直接return
                if (this.running || !this.autoStartup) {
                    return;
                }
    
              //connectionFactory实例如果是CachingConnectionFactory,并且CacheMode是CacheMode.CONNECTION也会return下面不执行了。
                if (this.connectionFactory instanceof CachingConnectionFactory &&
                        ((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
                    this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
                    return;
                }
    
              //连接的监听器
                this.connectionFactory.addConnectionListener(new ConnectionListener() {
    
                    // Prevent stack overflow...
                    private final AtomicBoolean initializing = new AtomicBoolean(false);
    
                    @Override
                    public void onCreate(Connection connection) {
                        if (!initializing.compareAndSet(false, true)) {
                            // If we are already initializing, we don't need to do it again...
                            return;
                        }
                        try {
                            //执行这个方法
                            initialize();
                        }
                        finally {
                            initializing.compareAndSet(true, false);
                        }
                    }
    
                    @Override
                    public void onClose(Connection connection) {
                    }
    
                });
    
                this.running = true;
    
            }
        }
    

    RabbitAdmininitialize方法,声明所有exchanges, queuesbindings

    /**
         * Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe
         * (but unnecessary) to call this method more than once.
         */
        public void initialize() {
    
            if (this.applicationContext == null) {
                this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
                return;
            }
    
            this.logger.debug("Initializing declarations");
            //得到容器中所有的Exchange
            Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
                    this.applicationContext.getBeansOfType(Exchange.class).values());
            //得到容器中所有的Queue
            Collection<Queue> contextQueues = new LinkedList<Queue>(
                    this.applicationContext.getBeansOfType(Queue.class).values());
           //得到容器中所有的Binding
            Collection<Binding> contextBindings = new LinkedList<Binding>(
                    this.applicationContext.getBeansOfType(Binding.class).values());
    
           //获取容器中所有的Collection,如果容器中所有元素是Exchange,Queue或者Binding的时候将这些实例也加入到spring容器中。
            @SuppressWarnings("rawtypes")
            Collection<Collection> collections = this.applicationContext.getBeansOfType(Collection.class, false, false)
                    .values();
            for (Collection<?> collection : collections) {
                if (collection.size() > 0 && collection.iterator().next() instanceof Declarable) {
                    for (Object declarable : collection) {
                        if (declarable instanceof Exchange) {
                            contextExchanges.add((Exchange) declarable);
                        }
                        else if (declarable instanceof Queue) {
                            contextQueues.add((Queue) declarable);
                        }
                        else if (declarable instanceof Binding) {
                            contextBindings.add((Binding) declarable);
                        }
                    }
                }
            }
    
           //进行了filter过滤,
            final Collection<Exchange> exchanges = filterDeclarables(contextExchanges);
            final Collection<Queue> queues = filterDeclarables(contextQueues);
            final Collection<Binding> bindings = filterDeclarables(contextBindings);
    
            for (Exchange exchange : exchanges) {
                if ((!exchange.isDurable() || exchange.isAutoDelete())  && this.logger.isInfoEnabled()) {
                    this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
                            + exchange.getName()
                            + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
                            + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
                            + "reopening the connection.");
                }
            }
    
            for (Queue queue : queues) {
                if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
                    this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
                            + queue.getName()
                            + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
                            + queue.isExclusive() + ". "
                            + "It will be redeclared if the broker stops and is restarted while the connection factory is "
                            + "alive, but all messages will be lost.");
                }
            }
    
            this.rabbitTemplate.execute(new ChannelCallback<Object>() {
                @Override
                public Object doInRabbit(Channel channel) throws Exception {
                   //声明exchange,如果exchange是默认的exchange那么也不会声明。
                    declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
                    //声明队列,如果队列名以amq.开头的也不会进行声明
                    declareQueues(channel, queues.toArray(new Queue[queues.size()]));
                    declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
                    return null;
                }
            });
            this.logger.debug("Declarations finished");
    
        }
    

    filterDeclarables方法过滤一些ExchangeQueueBinding,因为这三个类都是继承Declarable这个类

    
        private <T extends Declarable> Collection<T> filterDeclarables(Collection<T> declarables) {
            Collection<T> filtered = new ArrayList<T>();
            for (T declarable : declarables) {
                Collection<?> adminsWithWhichToDeclare = declarable.getDeclaringAdmins();
                //shouldDeclare属性必须是true,否则就会被过滤掉了
                if (declarable.shouldDeclare() &&
                    (adminsWithWhichToDeclare.isEmpty() || adminsWithWhichToDeclare.contains(this))) {
                    filtered.add(declarable);
                }
            }
            return filtered;
        }
    
    

    声明Exchanges

        private void declareExchanges(final Channel channel, final Exchange... exchanges) throws IOException {
            for (final Exchange exchange : exchanges) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("declaring Exchange '" + exchange.getName() + "'");
                }
    
                //不是默认的Exchange
                if (!isDeclaringDefaultExchange(exchange)) {
                    try {
                       //是否是delayed类型的Exchange
                        if (exchange.isDelayed()) {
                            Map<String, Object> arguments = exchange.getArguments();
                            if (arguments == null) {
                                arguments = new HashMap<String, Object>();
                            }
                            else {
                                arguments = new HashMap<String, Object>(arguments);
                            }
                            arguments.put("x-delayed-type", exchange.getType());
                           //调用exchangeDeclare进行声明
                            channel.exchangeDeclare(exchange.getName(), DELAYED_MESSAGE_EXCHANGE, exchange.isDurable(),
                                    exchange.isAutoDelete(), exchange.isInternal(), arguments);
                        }
                        else {
                           //调用exchangeDeclare进行声明
                          channel.exchangeDeclare(exchange.getName(), exchange.getType(), exchange.isDurable(),
                                    exchange.isAutoDelete(), exchange.isInternal(), exchange.getArguments());
                        }
                    }
                    catch (IOException e) {
                        logOrRethrowDeclarationException(exchange, "exchange", e);
                    }
                }
            }
        }
    

    声明Queue队列

    private DeclareOk[] declareQueues(final Channel channel, final Queue... queues) throws IOException {
            List<DeclareOk> declareOks = new ArrayList<DeclareOk>(queues.length);
            for (int i = 0; i < queues.length; i++) {
                Queue queue = queues[i];
                //队列不以amq.开头的队列才能进行声明
                if (!queue.getName().startsWith("amq.")) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("declaring Queue '" + queue.getName() + "'");
                    }
                    try {
                        try {
                           //进行队列声明
                            DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),
                                    queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
                            declareOks.add(declareOk);
                        }
                        catch (IllegalArgumentException e) {
                            if (this.logger.isDebugEnabled()) {
                                this.logger.error("Exception while declaring queue: '" + queue.getName() + "'");
                            }
                            try {
                                if (channel instanceof ChannelProxy) {
                                    ((ChannelProxy) channel).getTargetChannel().close();
                                }
                            }
                            catch (TimeoutException e1) {
                            }
                            throw new IOException(e);
                        }
                    }
                    catch (IOException e) {
                        logOrRethrowDeclarationException(queue, "queue", e);
                    }
                }
                this.logger.debug("Queue with name that starts with 'amq.' cannot be declared.");
            }
            return declareOks.toArray(new DeclareOk[declareOks.size()]);
    }
    

    binding声明:

        private void declareBindings(final Channel channel, final Binding... bindings) throws IOException {
            for (Binding binding : bindings) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Binding destination [" + binding.getDestination() + " (" + binding.getDestinationType()
                            + ")] to exchange [" + binding.getExchange() + "] with routing key [" + binding.getRoutingKey()
                            + "]");
                }
    
                try {
                   //QUEUE类型的绑定
                    if (binding.isDestinationQueue()) {
                       //并且不是绑定到默认的Default Exchange
                        if (!isDeclaringImplicitQueueBinding(binding)) {
                             //绑定队列
                            channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
                                    binding.getArguments());
                        }
                    }
                    else {
                        //Exchange类型的绑定
                        channel.exchangeBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
                                binding.getArguments());
                    }
                }
                catch (IOException e) {
                    logOrRethrowDeclarationException(binding, "binding", e);
                }
            }
        }
    

    参考资料
    官网
    github地址
    官网文档

    相关文章

      网友评论

        本文标题:RabbitMQ笔记六:Spring AMQP简介与quick

        本文链接:https://www.haomeiwen.com/subject/xucqextx.html