美文网首页
SpringBoot整合ActiveMQ

SpringBoot整合ActiveMQ

作者: Radom7 | 来源:发表于2019-02-19 15:03 被阅读0次

    ActiveMQ简介
    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

    ActiveMQ特性

    1. 多语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议:
      OpenWire,Stomp REST,WS Notification,XMPP,AMQP。
    2. 完全支持JMS1.1和J2EE 1.4规范(持久化、XA消息,事物)。
    3. 对Spring的支持,ActiveMQ可以很容易嵌套到使用Spring的系统里面去,而且也支持Spring2.0的特征。
    4. 通过了常见的J2EE服务器(如Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5。
      resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上。
    5. 支持多种传送协议: in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA。
    6. 支持通过JDBC和journal提供高速的消息持久化。
    7. 从设计上保证了高性能的集群,客户端-服务器,点对点。
    8. 支持Ajax。
    9. 支持和Axis的整合。
    10. 可以很容易的调用内嵌JMS provider,进行测试。

    什么情况下使用ActiveMQ?

    • 多个项目之间集成
      跨平台
      多语言
      多项目
    • 降低系统间模块的耦合度,解耦
      软件扩展性
    • 系统前后端隔离
      前后端隔离,屏蔽高安全区

    ActiveMQ安装
    在官方下载ActiveMQ: http://activemq.apache.org/download.html
    这次选择的是Unix版本,解压安装包,启动ActiveMQ:

    root@ubuntu:~/apache-activemq-5.15.3/bin# activemq start
    
    

    访问ActiveMQ监控界面:


    这里写图片描述

    SpringBoot整合ActiveMQ

    • pom文件添加依赖:
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
                 <version>5.7.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
    • 在application.yml中加入activemq的配置:
    spring:
      activemq:
        broker-url: tcp://192.168.0.197:61616
        user: admin
        password: admin
        pool:
          enabled: false
    
    • 创建一个消息生产者:
    @Service
    public class JMSProducer {
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        // 发送消息,destination是发送到的队列,message是待发送的消息
        public void sendMessage(Destination destination, final String message){
            jmsTemplate.convertAndSend(destination, message);
        }
    
       
    }
    
    • 创建一个消息消费者:
    @Component
    public class JMSConsumer {
        private final static Logger logger = LoggerFactory.getLogger(JMSConsumer.class);
    
        @JmsListener(destination = "springboot.queue.test")
        public void receiveQueue(String msg) {
            logger.info("接收到消息:{}",msg);
        }
    }
    
    
    • 测试:
    @Autowired
        private JMSProducer jmsProducer;
    
        @Test
        public void testJms() {
            Destination destination = new ActiveMQQueue("springboot.queue.test");
            for (int i=0;i<10;i++) {
                jmsProducer.sendMessage(destination,"hello,world!" + i);
            }
        }
    
    这里写图片描述

    注:后面多加了两个消费者。

    这里写图片描述

    可以看到,在ActiveMQ监控界面上,已经存在前面定义的队列“springboot.queue.test”。

    支持同时发送和接收queue/topic

    • 新建一个JMS的配置类:
    @Configuration
    public class JmsConfig {
        public final static String TOPIC = "springboot.topic.test";
        public final static String QUEUE = "springboot.queue.test";
        @Bean
        public Queue queue() {
            return new ActiveMQQueue(QUEUE);
        }
    
        @Bean
        public Topic topic() {
            return new ActiveMQTopic(TOPIC);
        }
    
        // topic模式的ListenerContainer
        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
            DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
            bean.setPubSubDomain(true);
            bean.setConnectionFactory(activeMQConnectionFactory);
            return bean;
        }
        // queue模式的ListenerContainer
        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
            DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
            bean.setConnectionFactory(activeMQConnectionFactory);
            return bean;
        }
    }
    
    
    • 新增消息消费者JMSConsumer3 ,指定ConnectionFactory:
    @Component
    public class JMSConsumer3 {
        private final static Logger logger = LoggerFactory.getLogger(JMSConsumer3.class);
    
        @JmsListener(destination = JmsConfig.TOPIC,containerFactory = "jmsListenerContainerTopic")
        public void onTopicMessage(String msg) {
            logger.info("接收到topic消息:{}",msg);
        }
    
        @JmsListener(destination = JmsConfig.QUEUE,containerFactory = "jmsListenerContainerQueue")
        public void onQueueMessage(String msg) {
            logger.info("接收到queue消息:{}",msg);
        }
    }
    
    • 测试:
        @Autowired
        private Topic topic;
        @Autowired
        private Queue queue;
    
        @Test
        public void testJms2() {
            for (int i=0;i<10;i++) {
                jmsProducer.sendMessage(queue,"queue,world!" + i);
                jmsProducer.sendMessage(topic, "topic,world!" + i);
            }
        }
    

    相关文章

      网友评论

          本文标题:SpringBoot整合ActiveMQ

          本文链接:https://www.haomeiwen.com/subject/ovffyqtx.html