第一章 说明
Spring Boot Messaging
Spring框架提供了与消息传递系统集成的广泛支持,从使用JmsTemplate简化JMS API到异步接收消息的完整基础结构。Spring AMQP为高级消息队列协议提供了类似的特性集。Spring Boot还为RabbitTemplate和RabbitMQ提供了自动配置选项。Spring WebSocket本身就支持STOMP消息传递,Spring Boot通过启动器和少量的自动配置支持这一点。Spring Boot也支持Apache Kafka。
AMQP
高级消息队列协议(Advanced Message queue Protocol, AMQP)是面向消息中间件的平台无关的、线级协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。Spring Boot为通过RabbitMQ使用AMQP提供了一些便利,包括Spring-Boot-Starter-AMQP“Starter”。
RabbitMQ 扩展
RabbitMQ是一个基于AMQP协议的轻量级、可靠、可伸缩和可移植的消息代理。Spring使用RabbitMQ通过AMQP协议进行通信。
第二章 编写代码
2.1 启动 RabbitMQ
安装 RabbitMQ 服务,以 Mac 为例
brew install rabbitmq
等待安装完成后,进入安装目录 '/usr/local/Cellar/rabbitmq/3.7.4/sbin',启动
cd /usr/local/Cellar/rabbitmq/3.7.4/sbin
./rabbitmq-server
如果启动成功,将会看到
## ##
## ## RabbitMQ 3.7.4. Copyright (C) 2007-2018 Pivotal Software, Inc.
########## Licensed under the MPL. See http://www.rabbitmq.com/
###### ##
########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
/usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log
Starting broker...
completed with 6 plugins.
2.2 代码
2.2.1 文件结构
src/
+- main/
+- java/
+- com/
+- lee/
+- springbootdemo/
+- config
+- RabbitMQConfig.java
+- pojo/
+- Receiver.java
+- Runner.java
+- SpringBootDemoApplication.java
+- resources/
+- <other resource>
2.2.2 创建 RabbitMQ 消息接受
src/main/java/com/lee/springbootdemo/pojo/Receiver.java
package com.lee.springbootdemo.pojo;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
@Component
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(String message) {
System.out.println("Received <" + message + ">");
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
2.2.3 注册监听和发送消息
src/main/java/com/lee/springbootdemo/config/RabbitMQConfig.java
package com.lee.springbootdemo.config;
import com.lee.springbootdemo.pojo.Receiver;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String topicExchangeName = "spring-boot-exchange";
public static final String queueName = "spring-boot";
@Bean
public Queue queue() {
return new Queue(queueName, false);
}
@Bean
public DirectExchange exchange() {
return new DirectExchange(topicExchangeName);
}
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
-
queue()
方法创建一个 AMQP 队列 -
exchange()
方法创建一个 direct 交换器 -
binding()
方法把上面两个内容绑定在一起,定义 RabbitTemplate 发布消息的行为Spring AMQP要求将队列、TopicExchange和绑定声明为顶级Spring bean,以便正确设置。
-
在这里,我们把 direct exchange 和 queue 绑定的路由键(routing key)是
foo.bar.#
2.2.4 测试发布消息
src/main/java/com/lee/springbootdemo/pojo/Runner.java
package com.lee.springbootdemo.pojo;
import com.lee.springbootdemo.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class Runner implements CommandLineRunner {
private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.receiver = receiver;
}
@Override
public void run(String... args) throws Exception {
System.out.println("Sending message ...");
rabbitTemplate.convertAndSend(RabbitMQConfig.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
}
}
这里我们定义的路由键(routing key)是foo.bar.baz
2.2.5 运行
运行 SpringBootDemoApplication.main()
方法,如果运行成功,将会看到
Sending message ...
Received <Hello from RabbitMQ!>
附录 A 参考
上一篇:4. Spring Boot Caching Redis
下一篇:
网友评论