美文网首页架构
ActiveMQ详解以及使用Spring整合ActiveMQ

ActiveMQ详解以及使用Spring整合ActiveMQ

作者: Cehae | 来源:发表于2018-06-25 21:27 被阅读0次

一丶ActiveMQ详解

什么是ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
主要特点:

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
  4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持通过JDBC和journal提供高速的消息持久化
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点
  8. 支持Ajax
  9. 支持与Axis的整合
  10. 可以很容易得调用内嵌JMS provider,进行测试

ActiveMQ的消息形式

对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
  • StreamMessage -- Java原始值的数据流
  • MapMessage--一套名称-值对
  • TextMessage--一个字符串对象
  • ObjectMessage--一个序列化的 Java对象
  • BytesMessage--一个字节的数据流

ActiveMQ的安装

环境要求

  • VMware Linux CentOS-6.5。
  • JDK 1.7.0_07
  • apache-activemq-5.12.0-bin.tar.gz
  • 上传工具FileZilla Client

第一步把ActiveMQ 的压缩包上传到Linux系统并解压。

图片.png 图片.png

第二步将ActiveMQ的目录移动到合适的目录并进入到bin目录启动ActiveMQ

图片.png

启动ActiveMQ

[root@cehae bin]# ./activemq start

查看ActiveMQ状态

[root@cehae bin]# ./activemq status

关闭ActiveMQ

[root@cehae bin]# ./activemq stop

第三步进入管理后台

在windows浏览器中输入http://192.168.25.200:8161/admin
输入账户和密码 admin后看到管理后台代表成功。

图片.png

ActiveMQ的使用

ActiveMQ两种消息形式结构图


图片.png

使用生产者发布Queue消息

创建maven工程,把jar包添加到工程中。使用5.11.2版本的jar包。

注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建议使用5.11.2。因为5.11.2版本的ActiveMQ会有Spring的源码,会有冲突。

使用生产者发布Queue消息
    @Test
    public void testQueueProducer() throws Exception {
    // 1创建一个连接工厂,需要指定如果服务的ip和端口
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.200:61616");

        // 2使用工厂创建一个Connection对象,
        Connection connection = factory.createConnection();

        // 3开启连接,调用Connection对象的start方法
        connection.start();

        // 4创建一个Session对象
        // 参数1:是否开启事务,一般不开启事务。如果开启事务自定忽略第二个参数。
        // 参数2:应答模式。自动应答和手动应答,一般使用自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5使用Session对象创建一个Destination(接口)对象, 两种形式Queue和Topic,现在应该使用queue.
        Queue queue = session.createQueue("test-Queue");

        // 6使用Session对象创建一个Producer对象
        MessageProducer producer = session.createProducer(queue);

        // 7创建一个Messaeg对象,可以使用TextMessage。
        TextMessage textMessage = session.createTextMessage("hello,ActiveMQ-Queue");
        // 8发送消息
        producer.send(textMessage);

        // 9关闭资源
        producer.close();
        session.close();
        connection.close();
    }
使用消费者接收Queue消息
@Test
    public void testQueueConsumer() throws Exception {

        // 1创建一个连接工厂连接MQ服务器。
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.200:61616");

        // 2创建一个连接
        Connection connection = factory.createConnection();

        // 3开启连接
        connection.start();

        // 4使用Connection创建一个Session对象
        // 参数1:是否开启事务,一般不开启事务。如果开启事务自定忽略第二个参数。
        // 参数2:应答模式。自动应答和手动应答,一般使用自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5使用Session对象创建一个Destination(接口)对象, 两种形式Queue和Topic,现在应该使用queue.
        Queue queue = session.createQueue("test-Queue"); // 注意要和发布消息的在同一个队列

        // 6使用Session对象创建一个Consumer对象
        MessageConsumer consumer = session.createConsumer(queue);

        // 7使用consumer接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {

                TextMessage txMessage = (TextMessage) message;
                String text = "";
                try {
                    text = txMessage.getText();
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                System.out.println("message = " + text);
            }
        });

        // 8等待接收消息 ,目的是阻塞当前线程,等待接收消息
        System.in.read();

        // 9关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
测试结果
图片.png

使用生产者发布Topic消息

使用生产者发布Topic消息
@Test
    public void testTopicProducer() throws Exception {

        // 1创建一个连接工厂,需要指定如果服务的ip和端口
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.200:61616");

        // 2使用工厂创建一个Connection对象,
        Connection connection = factory.createConnection();

        // 3开启连接,调用Connection对象的start方法
        connection.start();

        // 4创建一个Session对象
        // 参数1:是否开启事务,一般不开启事务。如果开启事务自定忽略第二个参数。
        // 参数2:应答模式。自动应答和手动应答,一般使用自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5使用Session对象创建一个Destination(接口)对象, 两种形式Queue和Topic,现在应该使用Topic.
        Topic topic = session.createTopic("test-Topic");

        // 6使用Session对象创建一个Producer对象
        MessageProducer producer = session.createProducer(topic);

        // 7创建一个Messaeg对象,可以使用TextMessage。
        TextMessage textMessage = session.createTextMessage("hello,ActiveMQ-Topic");
        // 8发送消息
        producer.send(textMessage);

        // 9关闭资源
        producer.close();
        session.close();
        connection.close();
    }
使用消费者接收Topic消息
    @Test
    public void testTopicConsumer() throws Exception {

        // 1创建一个连接工厂连接MQ服务器。
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.200:61616");

        // 2创建一个连接
        Connection connection = factory.createConnection();

        // 3开启连接
        connection.start();

        // 4使用Connection创建一个Session对象
        // 参数1:是否开启事务,一般不开启事务。如果开启事务自定忽略第二个参数。
        // 参数2:应答模式。自动应答和手动应答,一般使用自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5使用Session对象创建一个Destination(接口)对象, 两种形式Queue和Topic,现在应该使用Topic.
        Topic topic = session.createTopic("test-Topic");

        // 6使用Session对象创建一个Consumer对象
        MessageConsumer consumer = session.createConsumer(topic);

        // 7使用consumer接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {

                TextMessage txMessage = (TextMessage) message;
                String text = "";
                try {
                    text = txMessage.getText();
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                System.out.println("message = " + text);
            }
        });

        // 8等待接收消息 ,目的是阻塞当前线程
        System.in.read();

        // 9关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
测试结果
图片.png

注意

Queue形式的消息(点到点):发送完消息如果没有消费者接收会保存到(队列中)服务器,自动持久化。

Topic形式的消息(广播):发送完消息如果没有消费者接收会被MQ删除,也就是无法持久化。如果广播形式也想持久化,需要在客户顿启动的时候设置客户端id也就是订阅消息,这样MQ会保存广播的消息。

二丶Spring整合ActiveMQ

2-1丶Queue模式

创建消息生产者工程springjms_producer

引入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.cehae.demo</groupId>
    <artifactId>springjms_producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
        <spring.version>4.2.4.RELEASE</spring.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.9</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.4</version>
        </dependency>
    </dependencies>
</project>

添加配置文件applicationContext-jms-producer_queue.xml

图片.png
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd">


    <context:component-scan base-package="com.cehae.demo.queen"></context:component-scan>


    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.200:61616" />
    </bean>

    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>

    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    
    <!--这个是队列目的地,点对点的 文本信息 -->
    <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue_text" />
    </bean>

</beans>

编写消息生产者QueueProducer

package com.cehae.demo.queen;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

@Component
public class QueueProducer {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Destination queueTextDestination;

    /**
     * 发送文本消息
     * 
     * @param text
     */
    public void sendTextMessage(final String text) {

        jmsTemplate.send(queueTextDestination, new MessageCreator() {

            @Override
            public Message createMessage(Session session) throws JMSException {

                return session.createTextMessage(text);
            }
        });
    }
}

编写测试类TestQueue.java

package com.cehae.test.queue;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.cehae.demo.queen.QueueProducer;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/applicationContext-jms-producer_queue.xml")
public class TestQueue {

    @Autowired
    private QueueProducer queueProducer;

    @Test
    public void testSend() {
        queueProducer.sendTextMessage("SpringJms-queue");
    }
}

创建消息消费者工程springjms_consumer

引入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.cehae.demo</groupId>
    <artifactId>springjms_consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
        <spring.version>4.2.4.RELEASE</spring.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.9</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.4</version>
        </dependency>
    </dependencies>
</project>

添加配置文件applicationContext-jms-consumer-queue.xml


图片.png
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.200:61616" />
    </bean>

    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>

    <!--这个是队列目的地,点对点的 文本信息 -->
    <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue_text" />
    </bean>

    <!-- 我的监听类 -->
    <bean id="myQueueMessageListener" class="com.cehae.demo.queue.MyQueueMessageListener"></bean>
    <!-- 消息监听容器 -->
    <bean
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueTextDestination" />
        <property name="messageListener" ref="myQueueMessageListener" />
    </bean>

</beans>

编写queue消息监听者MyQueueMessageListener.java

package com.cehae.demo.queue;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyQueueMessageListener implements MessageListener {
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("接收到消息:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

编写测试类TestQueue.java

package com.cehae.test.queue;

import java.io.IOException;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/applicationContext-jms-consumer-queue.xml")
public class TestQueue {
    @Test
    public void testQueue() {
        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

queue方式测试结果


图片.png

2-2丶Topic模式

在springjms_producer工程添加配置文件applicationContext-jms-producer_topic.xml

图片.png
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd">


    <context:component-scan base-package="com.cehae.demo.topic"></context:component-scan>


    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.200:61616" />
    </bean>

    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>

    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>


    <!--这个是订阅模式 文本信息 -->
    <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic_text" />
    </bean>

</beans>

编写topic消息生成者TopicProducer.java

package com.cehae.demo.topic;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

@Component
public class TopicProducer {
    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Destination topicTextDestination;

    /**
     * 发送文本消息
     * 
     * @param text
     */
    public void sendTextMessage(final String text) {
        jmsTemplate.send(topicTextDestination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(text);
            }
        });
    }
}

编写测试类TestTopic.java

package com.cehae.test.topic;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.cehae.demo.topic.TopicProducer;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/applicationContext-jms-producer_topic.xml")
public class TestTopic {
    @Autowired
    private TopicProducer topicProducer;

    @Test
    public void sendTextQueue() {
        topicProducer.sendTextMessage("SpringJms-topic");
    }
}

在springjms_consumer工程添加配置文件applicationContext-jms-consumer-topic.xml


图片.png
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.200:61616" />
    </bean>

    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>

    <!--这个是队列目的地,点对点的 文本信息 -->
    <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic_text" />
    </bean>

    <!-- 我的监听类 -->
    <bean id="myTopicMessageListener" class="com.cehae.demo.topic.MyTopicMessageListener"></bean>
    <!-- 消息监听容器 -->

    <bean
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="topicTextDestination" />
        <property name="messageListener" ref="myTopicMessageListener" />
    </bean>

</beans>

编写topic消息消费者MyTopicMessageListener.java

package com.cehae.demo.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyTopicMessageListener implements MessageListener {
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("接收到消息:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

编写测试类TestTopic.java

package com.cehae.test.topic;

import java.io.IOException;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/applicationContext-jms-consumer-topic.xml")
public class TestTopic {
    @Test
    public void testQueue() {
        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

topic方式测试结果


图片.png

相关文章

网友评论

    本文标题:ActiveMQ详解以及使用Spring整合ActiveMQ

    本文链接:https://www.haomeiwen.com/subject/hnluyftx.html