美文网首页
5. Spring Boot Messaging

5. Spring Boot Messaging

作者: LeeSpringFly | 来源:发表于2018-10-25 15:45 被阅读0次

    第一章 说明

    Spring Boot Messaging
    Spring框架提供了与消息传递系统集成的广泛支持,从使用JmsTemplate简化JMS API到异步接收消息的完整基础结构。Spring AMQP为高级消息队列协议提供了类似的特性集。Spring Boot还为RabbitTemplate和RabbitMQ提供了自动配置选项。Spring WebSocket本身就支持STOMP消息传递,Spring Boot通过启动器和少量的自动配置支持这一点。Spring Boot也支持Apache Kafka。

    AMQP
    高级消息队列协议(Advanced Message queue Protocol, AMQP)是面向消息中间件的平台无关的、线级协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。Spring Boot为通过RabbitMQ使用AMQP提供了一些便利,包括Spring-Boot-Starter-AMQP“Starter”。

    RabbitMQ 扩展
    RabbitMQ是一个基于AMQP协议的轻量级、可靠、可伸缩和可移植的消息代理。Spring使用RabbitMQ通过AMQP协议进行通信。

    第二章 编写代码

    2.1 启动 RabbitMQ

    安装 RabbitMQ 服务,以 Mac 为例

    brew install rabbitmq
    

    等待安装完成后,进入安装目录 '/usr/local/Cellar/rabbitmq/3.7.4/sbin',启动

    cd /usr/local/Cellar/rabbitmq/3.7.4/sbin
    ./rabbitmq-server
    

    如果启动成功,将会看到

      ##  ##
      ##  ##      RabbitMQ 3.7.4. Copyright (C) 2007-2018 Pivotal Software, Inc.
      ##########  Licensed under the MPL.  See http://www.rabbitmq.com/
      ######  ##
      ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
                        /usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log
    
                  Starting broker...
     completed with 6 plugins.
    

    2.2 代码

    2.2.1 文件结构

    src/
     +- main/
         +- java/
             +- com/
                 +- lee/
                     +- springbootdemo/
                         +- config
                             +- RabbitMQConfig.java
                         +- pojo/
                             +- Receiver.java
                             +- Runner.java
                         +- SpringBootDemoApplication.java
         +- resources/
             +- <other resource>
    

    2.2.2 创建 RabbitMQ 消息接受

    src/main/java/com/lee/springbootdemo/pojo/Receiver.java

    package com.lee.springbootdemo.pojo;
    
    import org.springframework.stereotype.Component;
    import java.util.concurrent.CountDownLatch;
    
    @Component
    public class Receiver {
    
        private CountDownLatch latch = new CountDownLatch(1);
    
        public void receiveMessage(String message) {
            System.out.println("Received <" + message + ">");
            latch.countDown();
        }
    
        public CountDownLatch getLatch() {
            return latch;
        }
    }
    

    2.2.3 注册监听和发送消息

    src/main/java/com/lee/springbootdemo/config/RabbitMQConfig.java

    package com.lee.springbootdemo.config;
    
    import com.lee.springbootdemo.pojo.Receiver;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMQConfig {
    
        public static final String topicExchangeName = "spring-boot-exchange";
        public static final String queueName = "spring-boot";
    
        @Bean
        public Queue queue() {
            return new Queue(queueName, false);
        }
    
        @Bean
        public DirectExchange exchange() {
            return new DirectExchange(topicExchangeName);
        }
    
        @Bean
        public Binding binding(Queue queue, DirectExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
        }
    
        @Bean
        public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(queueName);
            container.setMessageListener(listenerAdapter);
    
            return container;
        }
    
        @Bean
        public MessageListenerAdapter listenerAdapter(Receiver receiver) {
            return new MessageListenerAdapter(receiver, "receiveMessage");
        }
    }
    
    • queue() 方法创建一个 AMQP 队列

    • exchange() 方法创建一个 direct 交换器

    • binding() 方法把上面两个内容绑定在一起,定义 RabbitTemplate 发布消息的行为

      Spring AMQP要求将队列、TopicExchange和绑定声明为顶级Spring bean,以便正确设置。

    • 在这里,我们把 direct exchange 和 queue 绑定的路由键(routing key)是 foo.bar.#

    2.2.4 测试发布消息

    src/main/java/com/lee/springbootdemo/pojo/Runner.java

    package com.lee.springbootdemo.pojo;
    
    import com.lee.springbootdemo.config.RabbitMQConfig;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.stereotype.Component;
    import java.util.concurrent.TimeUnit;
    
    @Component
    public class Runner implements CommandLineRunner {
    
        private final RabbitTemplate rabbitTemplate;
        private final Receiver receiver;
    
        public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
            this.receiver = receiver;
        }
    
        @Override
        public void run(String... args) throws Exception {
            System.out.println("Sending message ...");
            rabbitTemplate.convertAndSend(RabbitMQConfig.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
            receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
        }
    }
    

    这里我们定义的路由键(routing key)是foo.bar.baz

    2.2.5 运行

    运行 SpringBootDemoApplication.main() 方法,如果运行成功,将会看到

    Sending message ...
    Received <Hello from RabbitMQ!>
    

    附录 A 参考

    1. Spring Boot Messaging 官方文档
    2. Spring Messaging 代码案例

    上一篇:4. Spring Boot Caching Redis

    下一篇:

    相关文章

      网友评论

          本文标题:5. Spring Boot Messaging

          本文链接:https://www.haomeiwen.com/subject/kshctqtx.html