美文网首页
SpringBoot整合ActiveMQ

SpringBoot整合ActiveMQ

作者: Radom7 | 来源:发表于2019-02-19 15:03 被阅读0次

ActiveMQ简介
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

ActiveMQ特性

  1. 多语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议:
    OpenWire,Stomp REST,WS Notification,XMPP,AMQP。
  2. 完全支持JMS1.1和J2EE 1.4规范(持久化、XA消息,事物)。
  3. 对Spring的支持,ActiveMQ可以很容易嵌套到使用Spring的系统里面去,而且也支持Spring2.0的特征。
  4. 通过了常见的J2EE服务器(如Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5。
    resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上。
  5. 支持多种传送协议: in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA。
  6. 支持通过JDBC和journal提供高速的消息持久化。
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点。
  8. 支持Ajax。
  9. 支持和Axis的整合。
  10. 可以很容易的调用内嵌JMS provider,进行测试。

什么情况下使用ActiveMQ?

  • 多个项目之间集成
    跨平台
    多语言
    多项目
  • 降低系统间模块的耦合度,解耦
    软件扩展性
  • 系统前后端隔离
    前后端隔离,屏蔽高安全区

ActiveMQ安装
在官方下载ActiveMQ: http://activemq.apache.org/download.html
这次选择的是Unix版本,解压安装包,启动ActiveMQ:

root@ubuntu:~/apache-activemq-5.15.3/bin# activemq start

访问ActiveMQ监控界面:


这里写图片描述

SpringBoot整合ActiveMQ

  • pom文件添加依赖:
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
             <version>5.7.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
  • 在application.yml中加入activemq的配置:
spring:
  activemq:
    broker-url: tcp://192.168.0.197:61616
    user: admin
    password: admin
    pool:
      enabled: false
  • 创建一个消息生产者:
@Service
public class JMSProducer {

    @Autowired
    private JmsTemplate jmsTemplate;

    // 发送消息,destination是发送到的队列,message是待发送的消息
    public void sendMessage(Destination destination, final String message){
        jmsTemplate.convertAndSend(destination, message);
    }

   
}
  • 创建一个消息消费者:
@Component
public class JMSConsumer {
    private final static Logger logger = LoggerFactory.getLogger(JMSConsumer.class);

    @JmsListener(destination = "springboot.queue.test")
    public void receiveQueue(String msg) {
        logger.info("接收到消息:{}",msg);
    }
}

  • 测试:
@Autowired
    private JMSProducer jmsProducer;

    @Test
    public void testJms() {
        Destination destination = new ActiveMQQueue("springboot.queue.test");
        for (int i=0;i<10;i++) {
            jmsProducer.sendMessage(destination,"hello,world!" + i);
        }
    }
这里写图片描述

注:后面多加了两个消费者。

这里写图片描述

可以看到,在ActiveMQ监控界面上,已经存在前面定义的队列“springboot.queue.test”。

支持同时发送和接收queue/topic

  • 新建一个JMS的配置类:
@Configuration
public class JmsConfig {
    public final static String TOPIC = "springboot.topic.test";
    public final static String QUEUE = "springboot.queue.test";
    @Bean
    public Queue queue() {
        return new ActiveMQQueue(QUEUE);
    }

    @Bean
    public Topic topic() {
        return new ActiveMQTopic(TOPIC);
    }

    // topic模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }
    // queue模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }
}

  • 新增消息消费者JMSConsumer3 ,指定ConnectionFactory:
@Component
public class JMSConsumer3 {
    private final static Logger logger = LoggerFactory.getLogger(JMSConsumer3.class);

    @JmsListener(destination = JmsConfig.TOPIC,containerFactory = "jmsListenerContainerTopic")
    public void onTopicMessage(String msg) {
        logger.info("接收到topic消息:{}",msg);
    }

    @JmsListener(destination = JmsConfig.QUEUE,containerFactory = "jmsListenerContainerQueue")
    public void onQueueMessage(String msg) {
        logger.info("接收到queue消息:{}",msg);
    }
}
  • 测试:
    @Autowired
    private Topic topic;
    @Autowired
    private Queue queue;

    @Test
    public void testJms2() {
        for (int i=0;i<10;i++) {
            jmsProducer.sendMessage(queue,"queue,world!" + i);
            jmsProducer.sendMessage(topic, "topic,world!" + i);
        }
    }

相关文章

网友评论

      本文标题:SpringBoot整合ActiveMQ

      本文链接:https://www.haomeiwen.com/subject/ovffyqtx.html