美文网首页Spring Boot
SpringBoot2.x集成RabbitMQ

SpringBoot2.x集成RabbitMQ

作者: 小胖学编程 | 来源:发表于2020-03-24 22:17 被阅读0次

    【*** window10安装RabbitMQ资源及教程 ***】

    环境:
    JDK8
    SpringBoot 2.1.3.RELEASE
    

    依赖:

    <dependency> 
       <groupId>org.springframework.boot</groupId>  
       <artifactId>spring-boot-starter-amqp</artifactId> 
    </dependency>
    

    1. 提供者

    配置文件:

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        publisher-confirms: true  # 确保消息不丢失
        publisher-returns: true # 确保消息不丢失
    

    注册组件:

    因为RabbitMQ中Exchange有三种模式:订阅、路由、通配符。

    1. fanout(订阅模式):每个和交换机绑定的队列都会收到消息。
    2. direct(路由模式):根据routingKey值选择对应的Binding。
    3. topic(通配符模式):支持routingKey的模糊匹配选择Binding。
    direct模式.png fanout模式.png topic模式.png

    用户可以在项目启动时,向MQ注册一些Exchange和Queue。用户可以自由组合注册Binding关系。它们使用routingKey进行区别。

    生产者发送消息需要指定ExchangeroutingKey。请求到达Exchange后,根据Exchange的模式和routingKey找到一个或一组Binding关系。并将消息发送Binding对应到Queue中。

    消费者监听对应的Queue,来处理消息。


    1. direct也称为路由模式,消息到达Exchange后,会根据指定的routingKey路由到指定routingKey的Binding上。
      所以需要在注册Binding时,指定各个Binding的路由键。
    @Configuration
    public class DirectConfig {
        @Bean("directMessage1")
        public Queue directQueue1() {
            //name才是queue的名字,消费者实际监听的是dirQueue-1队列
            return new Queue("dirQueue-1");
        }
        @Bean("directMessage2")
        public Queue directQueue2() {
            return new Queue("dirQueue-2");
        }
        //注册交换机
        @Bean
        public DirectExchange directExchange() {
            return new DirectExchange("directExchange");
        }
        @Bean
        public Binding bindingDirectExchange1(@Qualifier("directMessage1") Queue queue,
                                              DirectExchange directExchange) {
            String routingKey = "directExchange.message-1";
            return BindingBuilder.bind(queue).to(directExchange).with(routingKey);
        }
        @Bean
        public Binding bindingDirectExchange2(@Qualifier("directMessage2") Queue queue,
                                              DirectExchange directExchange) {
            return BindingBuilder.bind(queue).to(directExchange).with("directExchange.message-2");
        }
    }
    
    1. fanout模式不需要指定routingKey关系,消息发送到Exchange后,将分发到与交换机绑定的各个Queue中。也称为订阅-发布模式。
    @Configuration
    public class FanooutConfig {
        @Bean(name = "AMessage")
        public Queue fanAMessage() {
            return new Queue("fanout.A");
        }
        @Bean(name = "BMessage")
        public Queue fanBMessage() {
            return new Queue("fanout.B");
        }
        @Bean(name = "CMessage")
        public Queue fanCMessage() {
            return new Queue("fanout.C");
        }
        //广播模式
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
        @Bean
        Binding bindingExchangeA(@Qualifier("AMessage") Queue message, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(message).to(fanoutExchange);
        }
        @Bean
        Binding bindingExchangeB(@Qualifier("BMessage") Queue message, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(message).to(fanoutExchange);
        }
        @Bean
        Binding bindingExchangeC(@Qualifier("CMessage") Queue message, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(message).to(fanoutExchange);
        }
    }
    
    1. topic通配符模式,注册的Binding的routingKey可以使用#或者*来进行模糊匹配。

    消息到达Exchange后,使用指定的routingKey模糊匹配到注册的routingKey

    • * 代表的是一个单词。
    • # 代表的是一个或多个单词。
    @Configuration
    public class TopicConfig {
        @Bean("message")
        public Queue queueMessage() {
            return new Queue("topic.message");
        }
        @Bean("messages")
        public Queue queueMessages() {
            return new Queue("topic.messages");
        }
        @Bean
        public TopicExchange exchange(){
            return new TopicExchange("topicExchange");
        }
        //普通绑定
        @Bean
        Binding bindingExchangeMessage(@Qualifier("message") Queue message, TopicExchange exchange){
            return BindingBuilder.bind(message).to(exchange).with("topic.message");
        }
        //通配符绑定
        @Bean("topicBindingExchangeMessages")
        Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessage,TopicExchange exchange){
            return BindingBuilder.bind(queueMessage).to(exchange).with("topic.#");
        }
    }
    

    生产者发送消息:

    @RestController
    public class RabbitMQController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        //直接向队列中发送数据
        @GetMapping("send")
        public String send() {
            String content = "Date:" + System.currentTimeMillis();
            rabbitTemplate.convertAndSend("kinson", content);
            return content;
        }
    
    
        @GetMapping("sendDirect")
        public Book sendDirect() {
            Book book = new Book();
            book.setId("001");
            book.setName("JAVA编思想");
            book.setPrice(100);
            book.setInfo("学习JAVA必备");
            String id = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(id);
            rabbitTemplate.convertAndSend("directExchange",
                    "directExchange.message", book, correlationData);
            return book;
        }
    
        @GetMapping("sendFanout")
        public Book sendFanout() {
            Book book = new Book();
            book.setId("005");
            book.setName("深入理解JVM虚拟机");
            String id = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(id);
            rabbitTemplate.convertAndSend("fanoutExchange", ""
                    , book, correlationData);
            return book;
        }
    
        @GetMapping("sendTopic")
        public Book sendTopic() {
            Book book = new Book();
            book.setId("003");
            book.setName("mysql高性能优化");
            String id = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(id);
            rabbitTemplate.convertAndSend("topicExchange", "topic.message"
                    , book, correlationData);
            return book;
        }
    
        /**
         * * 可以代替一个单词。
         * # 可以替代零个或多个单词。
         */
        @GetMapping("sendTopic2")
        public Book sendTopic2() {
            Book book = new Book();
            book.setId("004");
            book.setName("高并发实战");
            String id = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(id);
            rabbitTemplate.convertAndSend("topicExchange", "topic.xxx"
                    , book, correlationData);
            return book;
        }
    }
    

    2. 消费者

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        listener:
          simple:
    #        acknowledge-mode: manual  # 手动确定(默认自动确认)
            concurrency: 5 # 消费端的监听个数
            max-concurrency: 10 # 消费端的监听最大个数
        connection-timeout: 15000   # 超时时间
    
    @Component
    @Slf4j
    public class MyReceiver1 {
        @RabbitListener(queues = {"kinson"})
        public void receiver(Message msg, Channel channel) {
    
            byte[] messageBytes = msg.getBody();
    
            if (messageBytes != null && messageBytes.length > 0) {
                //打印数据
                String message = new String(msg.getBody(), StandardCharsets.UTF_8);
                log.info("开始消费:{}\n channel:{}", message, channel);
            }
        }
        //监听的Queue
        //没有找到监听的Queue启动时会出现的异常:(reply-code=404, reply-text=NOT_FOUND - no queue 'directMessage' in vhost '/', class-id=50, method-id=10)
        @RabbitListener(queues = "dirQueue-1")
        public void receiverDirect(Message msg, Channel channel) throws IOException, ClassNotFoundException {
            log.info("【DirectExchange 绑定的队列】");
            byte[] messageBytes = msg.getBody();
            Book book = (Book) deserializable(messageBytes);
            log.info("开始消费:[{}]", JSON.toJSONString(book));
        }
    
    
        public static Object deserializable(byte[] bytes) throws IOException, ClassNotFoundException {
            ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
            return in.readObject();
        }
    }
    

    推荐阅读

    RabbitMQ中 exchange、route、queue的关系

    SpringBoot 2.1.3.RELEASE整合amqp官方文档

    RabbitMQ官网—MQ的消息模型

    相关文章

      网友评论

        本文标题:SpringBoot2.x集成RabbitMQ

        本文链接:https://www.haomeiwen.com/subject/vsnayhtx.html