美文网首页
Spring Boot 消息

Spring Boot 消息

作者: 虫儿飞ZLEI | 来源:发表于2019-03-03 16:55 被阅读0次

    1. JMS && AMQP对比

    批注 2019-03-03 151511.jpg

    2 RabbitMQ

    image.png

    direct 点对点
    fanout、topic、headers 发布订阅

    image.png image.png

    2.1 Exchange类型

    image.png

    消息包含的路由键指向哪个队列就给到哪个队列

    image.png

    广播给所有队列

    image.png

    根据路由键配合匹配规则

    3 rabbit MQ的Java代码

    官网:http://www.rabbitmq.com/tutorials/tutorial-one-java.html

    导入maven

        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>5.6.0</version>
        </dependency>
    

    3.1 hello模式

    就是直接发送给队列,直接从队列接收消息,不经过Exchange

    3.1.1 发送消息

    package com.zl.hello;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.zl.ConnectUtil;
    
    public class Sender {
    
        //队列的名字
        private final static String QUEUE = "testhello";
    
        public static void main(String[] args) throws Exception{
    
            //获取连接
            Connection connect = ConnectUtil.getConnect();
            //创建通道
            Channel channel = connect.createChannel();
    
            //申明队列,如果队列不存在,则创建队列,如果队列存在,则什么都不做,只是把channel和队列绑定
            //参数1:队列的名字
            //参数2:是否持久化队列,队列是默认在内存中的,rabbitmq重启会丢失,如果设置为true,则会保存到erlang自带的数据库中,重启后会重新读取
            //参数3:是否排外,作用一:关闭连接是否会自动删除队列,作用二:是否私有化队列,如果私有了,其他通道不可以访问当前队列,适用于一个队列只适合一个消费者
            //参数4:是否自动删除
            //参数5:一些其他的参数
            channel.queueDeclare(QUEUE,false,false,false,null);
    
            //发送数据
            //参数1:交换机名,这里hello模式直接发到队列上,不需要交换机
            //参数2:队列名
            //参数3:属性(待查)
            //参数4:消息内容
            channel.basicPublish("",QUEUE,null,"发送的消息".getBytes());
    
            //关闭连接
            channel.close();
            connect.close();
        }
    
    }
    

    3.1.2 接收消息

    package com.zl.hello;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DeliverCallback;
    import com.zl.ConnectUtil;
    
    public class Receiver {
    
        //队列的名字
        private final static String QUEUE = "testhello";
    
        public static void main(String[] args) throws Exception {
            //获取连接
            Connection connect = ConnectUtil.getConnect();
            //创建通道
            Channel channel = connect.createChannel();
    
            //申明队列,如果队列不存在,则创建队列,如果队列存在,则什么都不做,只是把channel和队列绑定
            //参数1:队列的名字
            //参数2:是否持久化队列,队列是默认在内存中的,rabbitmq重启会丢失,如果设置为true,则会保存到erlang自带的数据库中,重启后会重新读取
            //参数3:是否排外,作用一:关闭连接是否会自动删除队列,作用二:是否私有化队列,如果私有了,其他通道不可以访问当前队列,适用于一个队列只适合一个消费者
            //参数4:是否自动删除
            //参数5:一些其他的参数
            channel.queueDeclare(QUEUE,false,false,false,null);
    
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            //接收消息
            //true:自动应答,应答了,表示消息被消费了
            channel.basicConsume(QUEUE,true,deliverCallback, consumerTag -> { });
        }
    }
    

    3.1.3 运行结果

    3.2 work模式

    还是直接发送到队列里面,当一个队列有多个消费者订阅时,消息会发送给多个消费者,但是一个消息只能给一个消费者,所以结果就是一个消费者收到一部分消息,另一个消费者收到另一部分消息

    3.2.1 发送者

    发送100条

    package com.zl.work;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.zl.ConnectUtil;
    
    public class Sender {
    
        //队列的名字
        private final static String QUEUE = "testwork";
    
        public static void main(String[] args) throws Exception{
    
            //获取连接
            Connection connect = ConnectUtil.getConnect();
            //创建通道
            Channel channel = connect.createChannel();
    
            //申明队列,如果队列不存在,则创建队列,如果队列存在,则什么都不做,只是把channel和队列绑定
            //参数1:队列的名字
            //参数2:是否持久化队列,队列是默认在内存中的,rabbitmq重启会丢失,如果设置为true,则会保存到erlang自带的数据库中,重启后会重新读取
            //参数3:是否排外,作用一:关闭连接是否会自动删除队列,作用二:是否私有化队列,如果私有了,其他通道不可以访问当前队列,适用于一个队列只适合一个消费者
            //参数4:是否自动删除
            //参数5:一些其他的参数
            channel.queueDeclare(QUEUE,false,false,false,null);
    
            //发送数据
            //参数1:交换机名,这里hello模式直接发到队列上,不需要交换机
            //参数2:队列名
            //参数3:属性(待查)
            //参数4:消息内容
            for (int i = 0; i < 100; i++) {
                channel.basicPublish("",QUEUE,null,("发送的消息"+i).getBytes());
            }
    
            //关闭连接
            channel.close();
            connect.close();
        }
    
    }
    

    3.2.2 消费者

    消费者1:

    package com.zl.work;
    
    import com.rabbitmq.client.*;
    import com.zl.ConnectUtil;
    
    import java.io.IOException;
    
    public class Receiver1 {
    
        //队列的名字
        private final static String QUEUE = "testwork";
    
        public static void main(String[] args) throws Exception {
            //获取连接
            Connection connect = ConnectUtil.getConnect();
            //创建通道
            Channel channel = connect.createChannel();
    
            //申明队列,如果队列不存在,则创建队列,如果队列存在,则什么都不做,只是把channel和队列绑定
            //参数1:队列的名字
            //参数2:是否持久化队列,队列是默认在内存中的,rabbitmq重启会丢失,如果设置为true,则会保存到erlang自带的数据库中,重启后会重新读取
            //参数3:是否排外,作用一:关闭连接是否会自动删除队列,作用二:是否私有化队列,如果私有了,其他通道不可以访问当前队列,适用于一个队列只适合一个消费者
            //参数4:是否自动删除
            //参数5:一些其他的参数
            channel.queueDeclare(QUEUE,false,false,false,null);
    
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                //收到消息的回调
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1收到的消息:"+new String(body));
                    //确认应答
                    //参数2:false:表示确认收到消息,true:拒绝收到消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            //接收消息
            //false:手动确认,代表收到消息需要手动告诉服务器收到信息了
            channel.basicConsume(QUEUE,false,consumer);
        }
    }
    

    消费者2

    package com.zl.work;
    
    import com.rabbitmq.client.*;
    import com.zl.ConnectUtil;
    
    import java.io.IOException;
    
    public class Receiver2 {
    
        //队列的名字
        private final static String QUEUE = "testwork";
    
        public static void main(String[] args) throws Exception {
            //获取连接
            Connection connect = ConnectUtil.getConnect();
            //创建通道
            Channel channel = connect.createChannel();
    
            //申明队列,如果队列不存在,则创建队列,如果队列存在,则什么都不做,只是把channel和队列绑定
            //参数1:队列的名字
            //参数2:是否持久化队列,队列是默认在内存中的,rabbitmq重启会丢失,如果设置为true,则会保存到erlang自带的数据库中,重启后会重新读取
            //参数3:是否排外,作用一:关闭连接是否会自动删除队列,作用二:是否私有化队列,如果私有了,其他通道不可以访问当前队列,适用于一个队列只适合一个消费者
            //参数4:是否自动删除
            //参数5:一些其他的参数
            channel.queueDeclare(QUEUE,false,false,false,null);
    
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                //收到消息的回调
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2收到的消息:"+new String(body));
                    //确认应答
                    //参数2:false:表示确认收到消息,true:拒绝收到消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            //接收消息
            //false:手动确认,代表收到消息需要手动告诉服务器收到信息了
            channel.basicConsume(QUEUE,false,consumer);
        }
    }
    
    

    3.2.3运行结果

    消费者1部分截图
    消费者2部分截图

    这里分配给两个消费者的消息是一样多的,但是如果这两个消费者消费的速度不一样呢?一个消费的快,一个消费的慢,他两个分配到的消息数量还是一样的吗?

    是的,不管你有没有处理完消息,它都会把消息分配给你,你自己在那里慢慢处理

    如果想要实现消费快的处理多一点数据,消费慢的处理少一点数据,怎么实现呢?

    只需要在消费者代码中添加一句:

    channel.basicQos(1);
    

    告诉服务器,在没有确认当前消息完成之前,不要发送过来新的消息
    这样就实现了按照消费能力分配数据量

    3.3 publish模式/发布订阅模式/广播模式

    发送消息到交换机,多个队列绑定到这个交换机,绑定的队列都会收到消息,绑定对应队列的消费者就收到了消息

    3.3.1 消费者1

    package com.zl.publish;
    
    import com.rabbitmq.client.*;
    import com.zl.ConnectUtil;
    
    import java.io.IOException;
    
    public class Receiver1 {
    
        //交换机的名字
        private static final String EXCHANGE_NAME = "testexchange";
        private static final String QUEUE = "testqueue1";
    
        public static void main(String[] args) throws Exception {
            Connection connect = ConnectUtil.getConnect();
            Channel channel = connect.createChannel();
    
            channel.queueDeclare(QUEUE,false,false,false,null);
    
            //绑定队列到交换机
            //参数:队列名,交换机名,路由键
            channel.queueBind(QUEUE,EXCHANGE_NAME,"");
    
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1收到的消息:"+new String(body));
                    //确认应答
                    //参数2:false:表示确认收到消息,true:拒绝收到消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume(QUEUE,false,consumer);
        }
    }
    
    

    3.3.2 消费者2

    package com.zl.publish;
    
    import com.rabbitmq.client.*;
    import com.zl.ConnectUtil;
    
    import java.io.IOException;
    
    public class Receiver2 {
    
        //交换机的名字
        private static final String EXCHANGE_NAME = "testexchange";
        private static final String QUEUE = "testqueue2";
    
        public static void main(String[] args) throws Exception {
            Connection connect = ConnectUtil.getConnect();
            Channel channel = connect.createChannel();
    
            channel.queueDeclare(QUEUE,false,false,false,null);
    
            //绑定队列到交换机
            //参数:队列名,交换机名,路由键
            channel.queueBind(QUEUE,EXCHANGE_NAME,"");
    
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2收到的消息:"+new String(body));
                    //确认应答
                    //参数2:false:表示确认收到消息,true:拒绝收到消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume(QUEUE,false,consumer);
        }
    }
    
    

    3.3.3 发送者

    package com.zl.publish;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.zl.ConnectUtil;
    
    public class Sender {
    
        //交换机的名字
        private static final String EXCHANGE_NAME = "testexchange";
    
        public static void main(String[] args) throws Exception {
            Connection connect = ConnectUtil.getConnect();
            Channel channel = connect.createChannel();
    
            //申明交换机,类型是fanout(fanout是发布订阅模式,也就是广播)
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
    
    
            //如果发布消息时,没有消费者,数据会丢失
            //参数1:交换机的名字
            //参数2:路由键,不需要
            //参数3:附加的属性
            //参数4:消息
            channel.basicPublish(EXCHANGE_NAME,"",null,"发布订阅模式的消息".getBytes());
    
            channel.close();
            connect.close();
        }
    }
    

    3.3.4 运行结果

    都收到了


    image.png
    image.png

    3.4 routing模式

    发送消息时,指定路由键,队列绑定到交换机时也指定路由键,只有路由键匹配了才会收到消息(一个消费者可以指定多个路由键)

    3.4.1 发送者

    package com.zl.routing;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.zl.ConnectUtil;
    
    public class Sender {
    
        //交换机的名字
        private static final String EXCHANGE_NAME = "testrouting";
    
        public static void main(String[] args) throws Exception {
            Connection connect = ConnectUtil.getConnect();
            Channel channel = connect.createChannel();
    
            //路由格式的交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"direct");
    
            channel.basicPublish(EXCHANGE_NAME,"key1",null,"routing模式的消息".getBytes());
    
            channel.close();
            connect.close();
        }
    }
    

    3.4.2 消费者1

    package com.zl.routing;
    
    import com.rabbitmq.client.*;
    import com.zl.ConnectUtil;
    
    import java.io.IOException;
    
    public class Receiver1 {
    
        //交换机的名字
        private static final String EXCHANGE_NAME = "testrouting";
        private static final String QUEUE = "testroutqueue1";
    
        public static void main(String[] args) throws Exception {
            Connection connect = ConnectUtil.getConnect();
            Channel channel = connect.createChannel();
    
            channel.queueDeclare(QUEUE,false,false,false,null);
    
            //绑定队列到交换机
            //参数:队列名,交换机名,路由键
            //只有对应了key1,才会收到消息
            channel.queueBind(QUEUE,EXCHANGE_NAME,"key1");
            //可以绑定多个路由键
            channel.queueBind(QUEUE,EXCHANGE_NAME,"key2");
    
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1收到的消息:"+new String(body));
                    //确认应答
                    //参数2:false:表示确认收到消息,true:拒绝收到消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume(QUEUE,false,consumer);
        }
    }
    

    3.4.3 消费者2

    package com.zl.routing;
    
    import com.rabbitmq.client.*;
    import com.zl.ConnectUtil;
    
    import java.io.IOException;
    
    public class Receiver2 {
    
        //交换机的名字
        private static final String EXCHANGE_NAME = "testrouting";
        private static final String QUEUE = "testroutqueue1";
    
        public static void main(String[] args) throws Exception {
            Connection connect = ConnectUtil.getConnect();
            Channel channel = connect.createChannel();
    
            channel.queueDeclare(QUEUE,false,false,false,null);
    
            //绑定队列到交换机
            //参数:队列名,交换机名,路由键
            //只有对应了key1,才会收到消息
            channel.queueBind(QUEUE,EXCHANGE_NAME,"key1");
            //可以绑定多个路由键
            channel.queueBind(QUEUE,EXCHANGE_NAME,"key3");
    
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1收到的消息:"+new String(body));
                    //确认应答
                    //参数2:false:表示确认收到消息,true:拒绝收到消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume(QUEUE,false,consumer);
        }
    }
    

    运行结果不看了

    3.5 topic模式

    和routing模式差不多,只是路由键不是精确匹配,而是用通配符

    # 匹配一个或多个单词,* 匹配一个单词

    3.5.1 生产者

    package com.zl.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.zl.ConnectUtil;
    
    public class Sender {
    
        //交换机的名字
        private static final String EXCHANGE_NAME = "testtopic";
    
        public static void main(String[] args) throws Exception {
            Connection connect = ConnectUtil.getConnect();
            Channel channel = connect.createChannel();
    
    
            channel.exchangeDeclare(EXCHANGE_NAME,"topic");
    
            channel.basicPublish(EXCHANGE_NAME,"key.aaa.bbb",null,"topic模式的key.aaa.bbb消息".getBytes());
            channel.basicPublish(EXCHANGE_NAME,"key.ccc",null,"topic模式的key.ccc消息".getBytes());
    
            channel.close();
            connect.close();
        }
    }
    

    3.5.2 消费者1

    package com.zl.topic;
    
    import com.rabbitmq.client.*;
    import com.zl.ConnectUtil;
    
    import java.io.IOException;
    
    public class Receiver1 {
    
        //交换机的名字
        private static final String EXCHANGE_NAME = "testtopic";
        private static final String QUEUE = "testtopicqueue1";
    
        public static void main(String[] args) throws Exception {
            Connection connect = ConnectUtil.getConnect();
            Channel channel = connect.createChannel();
    
            channel.queueDeclare(QUEUE,false,false,false,null);
    
            channel.queueBind(QUEUE,EXCHANGE_NAME,"key.#");
    
    
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("当前消费者的路由键是key.#,收到的消息:"+new String(body));
                    //确认应答
                    //参数2:false:表示确认收到消息,true:拒绝收到消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume(QUEUE,false,consumer);
        }
    }
    

    3.5.3 消费者2

    package com.zl.topic;
    
    import com.rabbitmq.client.*;
    import com.zl.ConnectUtil;
    
    import java.io.IOException;
    
    public class Receiver2 {
    
        //交换机的名字
        private static final String EXCHANGE_NAME = "testtopic";
        private static final String QUEUE = "testtopicqueue2";
    
        public static void main(String[] args) throws Exception {
            Connection connect = ConnectUtil.getConnect();
            Channel channel = connect.createChannel();
    
            channel.queueDeclare(QUEUE,false,false,false,null);
    
            channel.queueBind(QUEUE,EXCHANGE_NAME,"key.*");
    
    
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("当前消费者的路由键是key.*,收到的消息:"+new String(body));
                    //确认应答
                    //参数2:false:表示确认收到消息,true:拒绝收到消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume(QUEUE,false,consumer);
        }
    }
    

    3.5.4 运行结果

    # 和 * 的区别就是匹配一个还是多个单词


    image.png
    image.png

    3.6 持久化消息

    首先搞清楚几个问题,消息是存在队列里面的,如果一个交换机没有绑定的队列,那么往这个交换机里面发送消息会丢失,当绑定了队列,那么消息在队列里面,这样即使没有消费者,这个队列里面的数据还在,但是如果服务重启了,这个队列里面的数据也就丢失了,那么想要服务重启,队列里面的数据还在的话,可以做如下操作

    3.6.1 生产者

            //看这里的第二个参数
            channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
    
            //看这里的第三个参数
            channel.basicPublish(EXCHANGE_NAME,"abc",MessageProperties.PERSISTENT_TEXT_PLAIN,"持久化的消息".getBytes());
    

    3.6.2 消费者

    package com.zl.persisit;
    
    import com.rabbitmq.client.*;
    import com.zl.ConnectUtil;
    
    import java.io.IOException;
    
    public class Receiver {
    
        private static final String EXCHANGE_NAME = "testpersisit";
        private static final String QUEUE = "testpersisitqueue";
    
        public static void main(String[] args) throws Exception {
            Connection connect = ConnectUtil.getConnect();
            Channel channel = connect.createChannel();
    
            //这一这里的第二个参数true
            channel.queueDeclare(QUEUE,true,false,false,null);
            
            channel.queueBind(QUEUE,EXCHANGE_NAME,"abc");
    
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者收到的消息:"+new String(body));
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume(QUEUE,false,consumer);
        }
    }
    

    3.6.3 结果

    生产者生产一个消息以后,重启rabbitmq,然后再启动消费者,以后可以获取到之前生产者生产的消息

    4 Spring boot使用rabbitmq

    4.1 导包

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
    缺少log的maven,这里忽略

    4.2 配置

    spring.rabbitmq.host=118.24.44.169
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    

    4.3 使用

    发送时,直接发给交换机Exchange,具体交换机将数据给绑定的哪个队列,由交换机自己判断

        /**
         * 1、单播(点对点)
         */
        @Test
        public void contextLoads() {
            //Message需要自己构造一个;定义消息体内容和消息头
            //rabbitTemplate.send(exchage,routeKey,message);
    
            //object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq;
            //rabbitTemplate.convertAndSend(exchage,routeKey,object);
            Map<String,Object> map = new HashMap<>();
            map.put("msg","这是第一个消息");
            map.put("data", Arrays.asList("helloworld",123,true));
            rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",map);
            //对象被默认序列化以后发送出去
            rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",new Book("西游记","吴承恩"));
    
        }
    
        /**
         * 2.广播,广播不需要路由键
         */
        @Test
        public void sendMsg(){
            rabbitTemplate.convertAndSend("exchange.fanout","",new Book("红楼梦","曹雪芹"));
        }
    

    接收,订阅的是队列

        @Test
        public void receive(){
            Object o = rabbitTemplate.receiveAndConvert("atguigu.news");
            System.out.println(o.getClass());
            System.out.println(o);
        }
    

    3.4 存json数据

    默认存储数据是序列化后的数据,如果想将存储的内容改成json内容,需要更改MessageConverter

    @Configuration
    public class MyAMQPConfig {
    
        @Bean
        public MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    }
    

    3.5 @RabbitListener和@EnableRabbit

    • 在运行主方法上加上
    @EnableRabbit  //开启基于注解的RabbitMQ模式
    @SpringBootApplication
    public class Springboot02AmqpApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(Springboot02AmqpApplication.class, args);
        }
    }
    
    • 接收数据
        @RabbitListener(queues = "atguigu.news")
        public void receive(Book book){
            System.out.println("收到消息:"+book);
        }
    
        @RabbitListener(queues = "atguigu")
        public void receive02(Message message){
            System.out.println(message.getBody());
            System.out.println(message.getMessageProperties());
        }
    

    发送数据还是像上文那么发

    3.6 AmqpAdmin 创建和删除Queue、Exchange、Binding

    不管是发送还是接收都需要rabbitmq里面有对应和Queue、Exchange、Binding,可以手动在rabbitmq的管理页面创建,也可以在代码中创建

        @Autowired
        AmqpAdmin amqpAdmin;
    
        @Test
        public void createExchange(){
    
            amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
            amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
            amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.routingkey",null));
            System.out.println("创建完成");
    

    相关文章

      网友评论

          本文标题:Spring Boot 消息

          本文链接:https://www.haomeiwen.com/subject/elopuqtx.html