ActiveMQ简介
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
ActiveMQ特性
- 多语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议:
OpenWire,Stomp REST,WS Notification,XMPP,AMQP。 - 完全支持JMS1.1和J2EE 1.4规范(持久化、XA消息,事物)。
- 对Spring的支持,ActiveMQ可以很容易嵌套到使用Spring的系统里面去,而且也支持Spring2.0的特征。
- 通过了常见的J2EE服务器(如Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5。
resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上。 - 支持多种传送协议: in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA。
- 支持通过JDBC和journal提供高速的消息持久化。
- 从设计上保证了高性能的集群,客户端-服务器,点对点。
- 支持Ajax。
- 支持和Axis的整合。
- 可以很容易的调用内嵌JMS provider,进行测试。
什么情况下使用ActiveMQ?
- 多个项目之间集成
跨平台
多语言
多项目 - 降低系统间模块的耦合度,解耦
软件扩展性 - 系统前后端隔离
前后端隔离,屏蔽高安全区
ActiveMQ安装
在官方下载ActiveMQ: http://activemq.apache.org/download.html
这次选择的是Unix版本,解压安装包,启动ActiveMQ:
root@ubuntu:~/apache-activemq-5.15.3/bin# activemq start
访问ActiveMQ监控界面:
这里写图片描述
SpringBoot整合ActiveMQ
- pom文件添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
- 在application.yml中加入activemq的配置:
spring:
activemq:
broker-url: tcp://192.168.0.197:61616
user: admin
password: admin
pool:
enabled: false
- 创建一个消息生产者:
@Service
public class JMSProducer {
@Autowired
private JmsTemplate jmsTemplate;
// 发送消息,destination是发送到的队列,message是待发送的消息
public void sendMessage(Destination destination, final String message){
jmsTemplate.convertAndSend(destination, message);
}
}
- 创建一个消息消费者:
@Component
public class JMSConsumer {
private final static Logger logger = LoggerFactory.getLogger(JMSConsumer.class);
@JmsListener(destination = "springboot.queue.test")
public void receiveQueue(String msg) {
logger.info("接收到消息:{}",msg);
}
}
- 测试:
@Autowired
private JMSProducer jmsProducer;
@Test
public void testJms() {
Destination destination = new ActiveMQQueue("springboot.queue.test");
for (int i=0;i<10;i++) {
jmsProducer.sendMessage(destination,"hello,world!" + i);
}
}
这里写图片描述
注:后面多加了两个消费者。
这里写图片描述可以看到,在ActiveMQ监控界面上,已经存在前面定义的队列“springboot.queue.test”。
支持同时发送和接收queue/topic
- 新建一个JMS的配置类:
@Configuration
public class JmsConfig {
public final static String TOPIC = "springboot.topic.test";
public final static String QUEUE = "springboot.queue.test";
@Bean
public Queue queue() {
return new ActiveMQQueue(QUEUE);
}
@Bean
public Topic topic() {
return new ActiveMQTopic(TOPIC);
}
// topic模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
// queue模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
}
- 新增消息消费者JMSConsumer3 ,指定ConnectionFactory:
@Component
public class JMSConsumer3 {
private final static Logger logger = LoggerFactory.getLogger(JMSConsumer3.class);
@JmsListener(destination = JmsConfig.TOPIC,containerFactory = "jmsListenerContainerTopic")
public void onTopicMessage(String msg) {
logger.info("接收到topic消息:{}",msg);
}
@JmsListener(destination = JmsConfig.QUEUE,containerFactory = "jmsListenerContainerQueue")
public void onQueueMessage(String msg) {
logger.info("接收到queue消息:{}",msg);
}
}
- 测试:
@Autowired
private Topic topic;
@Autowired
private Queue queue;
@Test
public void testJms2() {
for (int i=0;i<10;i++) {
jmsProducer.sendMessage(queue,"queue,world!" + i);
jmsProducer.sendMessage(topic, "topic,world!" + i);
}
}
网友评论