美文网首页
Spring Boot 消息

Spring Boot 消息

作者: 虫儿飞ZLEI | 来源:发表于2019-03-03 16:55 被阅读0次

1. JMS && AMQP对比

批注 2019-03-03 151511.jpg

2 RabbitMQ

image.png

direct 点对点
fanout、topic、headers 发布订阅

image.png image.png

2.1 Exchange类型

image.png

消息包含的路由键指向哪个队列就给到哪个队列

image.png

广播给所有队列

image.png

根据路由键配合匹配规则

3 rabbit MQ的Java代码

官网:http://www.rabbitmq.com/tutorials/tutorial-one-java.html

导入maven

    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.6.0</version>
    </dependency>

3.1 hello模式

就是直接发送给队列,直接从队列接收消息,不经过Exchange

3.1.1 发送消息

package com.zl.hello;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zl.ConnectUtil;

public class Sender {

    //队列的名字
    private final static String QUEUE = "testhello";

    public static void main(String[] args) throws Exception{

        //获取连接
        Connection connect = ConnectUtil.getConnect();
        //创建通道
        Channel channel = connect.createChannel();

        //申明队列,如果队列不存在,则创建队列,如果队列存在,则什么都不做,只是把channel和队列绑定
        //参数1:队列的名字
        //参数2:是否持久化队列,队列是默认在内存中的,rabbitmq重启会丢失,如果设置为true,则会保存到erlang自带的数据库中,重启后会重新读取
        //参数3:是否排外,作用一:关闭连接是否会自动删除队列,作用二:是否私有化队列,如果私有了,其他通道不可以访问当前队列,适用于一个队列只适合一个消费者
        //参数4:是否自动删除
        //参数5:一些其他的参数
        channel.queueDeclare(QUEUE,false,false,false,null);

        //发送数据
        //参数1:交换机名,这里hello模式直接发到队列上,不需要交换机
        //参数2:队列名
        //参数3:属性(待查)
        //参数4:消息内容
        channel.basicPublish("",QUEUE,null,"发送的消息".getBytes());

        //关闭连接
        channel.close();
        connect.close();
    }

}

3.1.2 接收消息

package com.zl.hello;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DeliverCallback;
import com.zl.ConnectUtil;

public class Receiver {

    //队列的名字
    private final static String QUEUE = "testhello";

    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connect = ConnectUtil.getConnect();
        //创建通道
        Channel channel = connect.createChannel();

        //申明队列,如果队列不存在,则创建队列,如果队列存在,则什么都不做,只是把channel和队列绑定
        //参数1:队列的名字
        //参数2:是否持久化队列,队列是默认在内存中的,rabbitmq重启会丢失,如果设置为true,则会保存到erlang自带的数据库中,重启后会重新读取
        //参数3:是否排外,作用一:关闭连接是否会自动删除队列,作用二:是否私有化队列,如果私有了,其他通道不可以访问当前队列,适用于一个队列只适合一个消费者
        //参数4:是否自动删除
        //参数5:一些其他的参数
        channel.queueDeclare(QUEUE,false,false,false,null);


        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        //接收消息
        //true:自动应答,应答了,表示消息被消费了
        channel.basicConsume(QUEUE,true,deliverCallback, consumerTag -> { });
    }
}

3.1.3 运行结果

3.2 work模式

还是直接发送到队列里面,当一个队列有多个消费者订阅时,消息会发送给多个消费者,但是一个消息只能给一个消费者,所以结果就是一个消费者收到一部分消息,另一个消费者收到另一部分消息

3.2.1 发送者

发送100条

package com.zl.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zl.ConnectUtil;

public class Sender {

    //队列的名字
    private final static String QUEUE = "testwork";

    public static void main(String[] args) throws Exception{

        //获取连接
        Connection connect = ConnectUtil.getConnect();
        //创建通道
        Channel channel = connect.createChannel();

        //申明队列,如果队列不存在,则创建队列,如果队列存在,则什么都不做,只是把channel和队列绑定
        //参数1:队列的名字
        //参数2:是否持久化队列,队列是默认在内存中的,rabbitmq重启会丢失,如果设置为true,则会保存到erlang自带的数据库中,重启后会重新读取
        //参数3:是否排外,作用一:关闭连接是否会自动删除队列,作用二:是否私有化队列,如果私有了,其他通道不可以访问当前队列,适用于一个队列只适合一个消费者
        //参数4:是否自动删除
        //参数5:一些其他的参数
        channel.queueDeclare(QUEUE,false,false,false,null);

        //发送数据
        //参数1:交换机名,这里hello模式直接发到队列上,不需要交换机
        //参数2:队列名
        //参数3:属性(待查)
        //参数4:消息内容
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("",QUEUE,null,("发送的消息"+i).getBytes());
        }

        //关闭连接
        channel.close();
        connect.close();
    }

}

3.2.2 消费者

消费者1:

package com.zl.work;

import com.rabbitmq.client.*;
import com.zl.ConnectUtil;

import java.io.IOException;

public class Receiver1 {

    //队列的名字
    private final static String QUEUE = "testwork";

    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connect = ConnectUtil.getConnect();
        //创建通道
        Channel channel = connect.createChannel();

        //申明队列,如果队列不存在,则创建队列,如果队列存在,则什么都不做,只是把channel和队列绑定
        //参数1:队列的名字
        //参数2:是否持久化队列,队列是默认在内存中的,rabbitmq重启会丢失,如果设置为true,则会保存到erlang自带的数据库中,重启后会重新读取
        //参数3:是否排外,作用一:关闭连接是否会自动删除队列,作用二:是否私有化队列,如果私有了,其他通道不可以访问当前队列,适用于一个队列只适合一个消费者
        //参数4:是否自动删除
        //参数5:一些其他的参数
        channel.queueDeclare(QUEUE,false,false,false,null);


        DefaultConsumer consumer = new DefaultConsumer(channel) {

            //收到消息的回调
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1收到的消息:"+new String(body));
                //确认应答
                //参数2:false:表示确认收到消息,true:拒绝收到消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //接收消息
        //false:手动确认,代表收到消息需要手动告诉服务器收到信息了
        channel.basicConsume(QUEUE,false,consumer);
    }
}

消费者2

package com.zl.work;

import com.rabbitmq.client.*;
import com.zl.ConnectUtil;

import java.io.IOException;

public class Receiver2 {

    //队列的名字
    private final static String QUEUE = "testwork";

    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connect = ConnectUtil.getConnect();
        //创建通道
        Channel channel = connect.createChannel();

        //申明队列,如果队列不存在,则创建队列,如果队列存在,则什么都不做,只是把channel和队列绑定
        //参数1:队列的名字
        //参数2:是否持久化队列,队列是默认在内存中的,rabbitmq重启会丢失,如果设置为true,则会保存到erlang自带的数据库中,重启后会重新读取
        //参数3:是否排外,作用一:关闭连接是否会自动删除队列,作用二:是否私有化队列,如果私有了,其他通道不可以访问当前队列,适用于一个队列只适合一个消费者
        //参数4:是否自动删除
        //参数5:一些其他的参数
        channel.queueDeclare(QUEUE,false,false,false,null);


        DefaultConsumer consumer = new DefaultConsumer(channel) {

            //收到消息的回调
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2收到的消息:"+new String(body));
                //确认应答
                //参数2:false:表示确认收到消息,true:拒绝收到消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //接收消息
        //false:手动确认,代表收到消息需要手动告诉服务器收到信息了
        channel.basicConsume(QUEUE,false,consumer);
    }
}

3.2.3运行结果

消费者1部分截图
消费者2部分截图

这里分配给两个消费者的消息是一样多的,但是如果这两个消费者消费的速度不一样呢?一个消费的快,一个消费的慢,他两个分配到的消息数量还是一样的吗?

是的,不管你有没有处理完消息,它都会把消息分配给你,你自己在那里慢慢处理

如果想要实现消费快的处理多一点数据,消费慢的处理少一点数据,怎么实现呢?

只需要在消费者代码中添加一句:

channel.basicQos(1);

告诉服务器,在没有确认当前消息完成之前,不要发送过来新的消息
这样就实现了按照消费能力分配数据量

3.3 publish模式/发布订阅模式/广播模式

发送消息到交换机,多个队列绑定到这个交换机,绑定的队列都会收到消息,绑定对应队列的消费者就收到了消息

3.3.1 消费者1

package com.zl.publish;

import com.rabbitmq.client.*;
import com.zl.ConnectUtil;

import java.io.IOException;

public class Receiver1 {

    //交换机的名字
    private static final String EXCHANGE_NAME = "testexchange";
    private static final String QUEUE = "testqueue1";

    public static void main(String[] args) throws Exception {
        Connection connect = ConnectUtil.getConnect();
        Channel channel = connect.createChannel();

        channel.queueDeclare(QUEUE,false,false,false,null);

        //绑定队列到交换机
        //参数:队列名,交换机名,路由键
        channel.queueBind(QUEUE,EXCHANGE_NAME,"");

        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1收到的消息:"+new String(body));
                //确认应答
                //参数2:false:表示确认收到消息,true:拒绝收到消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(QUEUE,false,consumer);
    }
}

3.3.2 消费者2

package com.zl.publish;

import com.rabbitmq.client.*;
import com.zl.ConnectUtil;

import java.io.IOException;

public class Receiver2 {

    //交换机的名字
    private static final String EXCHANGE_NAME = "testexchange";
    private static final String QUEUE = "testqueue2";

    public static void main(String[] args) throws Exception {
        Connection connect = ConnectUtil.getConnect();
        Channel channel = connect.createChannel();

        channel.queueDeclare(QUEUE,false,false,false,null);

        //绑定队列到交换机
        //参数:队列名,交换机名,路由键
        channel.queueBind(QUEUE,EXCHANGE_NAME,"");

        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2收到的消息:"+new String(body));
                //确认应答
                //参数2:false:表示确认收到消息,true:拒绝收到消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(QUEUE,false,consumer);
    }
}

3.3.3 发送者

package com.zl.publish;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zl.ConnectUtil;

public class Sender {

    //交换机的名字
    private static final String EXCHANGE_NAME = "testexchange";

    public static void main(String[] args) throws Exception {
        Connection connect = ConnectUtil.getConnect();
        Channel channel = connect.createChannel();

        //申明交换机,类型是fanout(fanout是发布订阅模式,也就是广播)
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");


        //如果发布消息时,没有消费者,数据会丢失
        //参数1:交换机的名字
        //参数2:路由键,不需要
        //参数3:附加的属性
        //参数4:消息
        channel.basicPublish(EXCHANGE_NAME,"",null,"发布订阅模式的消息".getBytes());

        channel.close();
        connect.close();
    }
}

3.3.4 运行结果

都收到了


image.png
image.png

3.4 routing模式

发送消息时,指定路由键,队列绑定到交换机时也指定路由键,只有路由键匹配了才会收到消息(一个消费者可以指定多个路由键)

3.4.1 发送者

package com.zl.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zl.ConnectUtil;

public class Sender {

    //交换机的名字
    private static final String EXCHANGE_NAME = "testrouting";

    public static void main(String[] args) throws Exception {
        Connection connect = ConnectUtil.getConnect();
        Channel channel = connect.createChannel();

        //路由格式的交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");

        channel.basicPublish(EXCHANGE_NAME,"key1",null,"routing模式的消息".getBytes());

        channel.close();
        connect.close();
    }
}

3.4.2 消费者1

package com.zl.routing;

import com.rabbitmq.client.*;
import com.zl.ConnectUtil;

import java.io.IOException;

public class Receiver1 {

    //交换机的名字
    private static final String EXCHANGE_NAME = "testrouting";
    private static final String QUEUE = "testroutqueue1";

    public static void main(String[] args) throws Exception {
        Connection connect = ConnectUtil.getConnect();
        Channel channel = connect.createChannel();

        channel.queueDeclare(QUEUE,false,false,false,null);

        //绑定队列到交换机
        //参数:队列名,交换机名,路由键
        //只有对应了key1,才会收到消息
        channel.queueBind(QUEUE,EXCHANGE_NAME,"key1");
        //可以绑定多个路由键
        channel.queueBind(QUEUE,EXCHANGE_NAME,"key2");

        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1收到的消息:"+new String(body));
                //确认应答
                //参数2:false:表示确认收到消息,true:拒绝收到消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(QUEUE,false,consumer);
    }
}

3.4.3 消费者2

package com.zl.routing;

import com.rabbitmq.client.*;
import com.zl.ConnectUtil;

import java.io.IOException;

public class Receiver2 {

    //交换机的名字
    private static final String EXCHANGE_NAME = "testrouting";
    private static final String QUEUE = "testroutqueue1";

    public static void main(String[] args) throws Exception {
        Connection connect = ConnectUtil.getConnect();
        Channel channel = connect.createChannel();

        channel.queueDeclare(QUEUE,false,false,false,null);

        //绑定队列到交换机
        //参数:队列名,交换机名,路由键
        //只有对应了key1,才会收到消息
        channel.queueBind(QUEUE,EXCHANGE_NAME,"key1");
        //可以绑定多个路由键
        channel.queueBind(QUEUE,EXCHANGE_NAME,"key3");

        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1收到的消息:"+new String(body));
                //确认应答
                //参数2:false:表示确认收到消息,true:拒绝收到消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(QUEUE,false,consumer);
    }
}

运行结果不看了

3.5 topic模式

和routing模式差不多,只是路由键不是精确匹配,而是用通配符

# 匹配一个或多个单词,* 匹配一个单词

3.5.1 生产者

package com.zl.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zl.ConnectUtil;

public class Sender {

    //交换机的名字
    private static final String EXCHANGE_NAME = "testtopic";

    public static void main(String[] args) throws Exception {
        Connection connect = ConnectUtil.getConnect();
        Channel channel = connect.createChannel();


        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        channel.basicPublish(EXCHANGE_NAME,"key.aaa.bbb",null,"topic模式的key.aaa.bbb消息".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"key.ccc",null,"topic模式的key.ccc消息".getBytes());

        channel.close();
        connect.close();
    }
}

3.5.2 消费者1

package com.zl.topic;

import com.rabbitmq.client.*;
import com.zl.ConnectUtil;

import java.io.IOException;

public class Receiver1 {

    //交换机的名字
    private static final String EXCHANGE_NAME = "testtopic";
    private static final String QUEUE = "testtopicqueue1";

    public static void main(String[] args) throws Exception {
        Connection connect = ConnectUtil.getConnect();
        Channel channel = connect.createChannel();

        channel.queueDeclare(QUEUE,false,false,false,null);

        channel.queueBind(QUEUE,EXCHANGE_NAME,"key.#");


        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("当前消费者的路由键是key.#,收到的消息:"+new String(body));
                //确认应答
                //参数2:false:表示确认收到消息,true:拒绝收到消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(QUEUE,false,consumer);
    }
}

3.5.3 消费者2

package com.zl.topic;

import com.rabbitmq.client.*;
import com.zl.ConnectUtil;

import java.io.IOException;

public class Receiver2 {

    //交换机的名字
    private static final String EXCHANGE_NAME = "testtopic";
    private static final String QUEUE = "testtopicqueue2";

    public static void main(String[] args) throws Exception {
        Connection connect = ConnectUtil.getConnect();
        Channel channel = connect.createChannel();

        channel.queueDeclare(QUEUE,false,false,false,null);

        channel.queueBind(QUEUE,EXCHANGE_NAME,"key.*");


        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("当前消费者的路由键是key.*,收到的消息:"+new String(body));
                //确认应答
                //参数2:false:表示确认收到消息,true:拒绝收到消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(QUEUE,false,consumer);
    }
}

3.5.4 运行结果

# 和 * 的区别就是匹配一个还是多个单词


image.png
image.png

3.6 持久化消息

首先搞清楚几个问题,消息是存在队列里面的,如果一个交换机没有绑定的队列,那么往这个交换机里面发送消息会丢失,当绑定了队列,那么消息在队列里面,这样即使没有消费者,这个队列里面的数据还在,但是如果服务重启了,这个队列里面的数据也就丢失了,那么想要服务重启,队列里面的数据还在的话,可以做如下操作

3.6.1 生产者

        //看这里的第二个参数
        channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);

        //看这里的第三个参数
        channel.basicPublish(EXCHANGE_NAME,"abc",MessageProperties.PERSISTENT_TEXT_PLAIN,"持久化的消息".getBytes());

3.6.2 消费者

package com.zl.persisit;

import com.rabbitmq.client.*;
import com.zl.ConnectUtil;

import java.io.IOException;

public class Receiver {

    private static final String EXCHANGE_NAME = "testpersisit";
    private static final String QUEUE = "testpersisitqueue";

    public static void main(String[] args) throws Exception {
        Connection connect = ConnectUtil.getConnect();
        Channel channel = connect.createChannel();

        //这一这里的第二个参数true
        channel.queueDeclare(QUEUE,true,false,false,null);
        
        channel.queueBind(QUEUE,EXCHANGE_NAME,"abc");

        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者收到的消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(QUEUE,false,consumer);
    }
}

3.6.3 结果

生产者生产一个消息以后,重启rabbitmq,然后再启动消费者,以后可以获取到之前生产者生产的消息

4 Spring boot使用rabbitmq

4.1 导包

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
缺少log的maven,这里忽略

4.2 配置

spring.rabbitmq.host=118.24.44.169
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

4.3 使用

发送时,直接发给交换机Exchange,具体交换机将数据给绑定的哪个队列,由交换机自己判断

    /**
     * 1、单播(点对点)
     */
    @Test
    public void contextLoads() {
        //Message需要自己构造一个;定义消息体内容和消息头
        //rabbitTemplate.send(exchage,routeKey,message);

        //object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq;
        //rabbitTemplate.convertAndSend(exchage,routeKey,object);
        Map<String,Object> map = new HashMap<>();
        map.put("msg","这是第一个消息");
        map.put("data", Arrays.asList("helloworld",123,true));
        rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",map);
        //对象被默认序列化以后发送出去
        rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",new Book("西游记","吴承恩"));

    }

    /**
     * 2.广播,广播不需要路由键
     */
    @Test
    public void sendMsg(){
        rabbitTemplate.convertAndSend("exchange.fanout","",new Book("红楼梦","曹雪芹"));
    }

接收,订阅的是队列

    @Test
    public void receive(){
        Object o = rabbitTemplate.receiveAndConvert("atguigu.news");
        System.out.println(o.getClass());
        System.out.println(o);
    }

3.4 存json数据

默认存储数据是序列化后的数据,如果想将存储的内容改成json内容,需要更改MessageConverter

@Configuration
public class MyAMQPConfig {

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

3.5 @RabbitListener和@EnableRabbit

  • 在运行主方法上加上
@EnableRabbit  //开启基于注解的RabbitMQ模式
@SpringBootApplication
public class Springboot02AmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(Springboot02AmqpApplication.class, args);
    }
}
  • 接收数据
    @RabbitListener(queues = "atguigu.news")
    public void receive(Book book){
        System.out.println("收到消息:"+book);
    }

    @RabbitListener(queues = "atguigu")
    public void receive02(Message message){
        System.out.println(message.getBody());
        System.out.println(message.getMessageProperties());
    }

发送数据还是像上文那么发

3.6 AmqpAdmin 创建和删除Queue、Exchange、Binding

不管是发送还是接收都需要rabbitmq里面有对应和Queue、Exchange、Binding,可以手动在rabbitmq的管理页面创建,也可以在代码中创建

    @Autowired
    AmqpAdmin amqpAdmin;

    @Test
    public void createExchange(){

        amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
        amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
        amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.routingkey",null));
        System.out.println("创建完成");

相关文章

网友评论

      本文标题:Spring Boot 消息

      本文链接:https://www.haomeiwen.com/subject/elopuqtx.html