Spring JMS + ActiveMQ

作者: Joryun | 来源:发表于2017-05-21 11:45 被阅读0次

    前言

    说明

    开发环境:IDEA,Active MQ

    项目构建:Maven

    软件环境:Spring,Spring JMS,Active MQ

    项目描述:Spring JMS与Active MQ进行通讯

    目的

    入门Spring JMS、Active MQ,使用Spring JMS向Active MQ的Message Queue发消息和读消息。

    PS:
    demo整合过程均亲测,按顺序编码已测试通过
    项目代码有注释,可供理清各个class和各个方法块及属性的作用。
    

    编码

    1. 开启Active MQ服务

    2. 使用maven导入依赖库

    pom.xml

    <dependencies>
    
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.11</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.9.0</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version>4.0.2.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-core</artifactId>
                <version>4.0.2.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>4.0.2.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.apache.xbean</groupId>
                <artifactId>xbean-spring</artifactId>
                <version>3.16</version>
            </dependency>
    
        </dependencies>
    
    

    3. 创建application.properties,封装Message Queue的配置

    application.properties

    jms.broker.url=tcp://localhost:61616
    jms.queue.name=bar
    

    4. JMS全局配置,配置与ActiveMQ的连接

    JMSConfiguration.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:amq="http://activemq.apache.org/schema/core"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://activemq.apache.org/schema/core
           http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
    
        <!-- 配置与ActiveMQ的连接 -->
    
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="location">
                <value>application.properties</value>
            </property>
        </bean>
    
        <!-- Activemq connection factory -->
        <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <constructor-arg index="0" value="${jms.broker.url}" />
            <property name="useAsyncSend" value="true" />
        </bean>
    
        <!-- ConnectionFactory Definition -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <constructor-arg ref="amqConnectionFactory" />
        </bean>
    
        <!--  Default Destination Queue Definition-->
        <bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg index="0" value="${jms.queue.name}" />
        </bean>
    
        <!-- JmsTemplate Definition -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="defaultDestination" ref="defaultDestination"/>
        </bean>
    
        <!-- Message Sender Definition -->
        <bean id="messageSender" class="com.net.jms.MessageSender">
            <constructor-arg index="0" ref="jmsTemplate" />
        </bean>
    
    </beans>
    
    

    分析:

    (1) 配置ActiveMQ提供的ActiveMQConnectionFactory

    (2) 配置一个Spring JMS提供的CachingConnectionFactory

    (3) 定义一个ActiveMQ Queue作为消息的接收Queue(即Destination)

    (4) 创建JmsTemplate

    (5) 自定义一个MessageSender,使用该JmsTemplate进行消息发送

    5. 创建MessageSender,提供发送消息的服务

    MessageSender.java

    package com.net.jms;
    
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.stereotype.Component;
    
    /**
     * 发送消息的服务:发送字符串信息
     */
    
    @Component
    public class MessageSender {
    
        private final JmsTemplate jmsTemplate;
    
        public MessageSender(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
    
        public void send(final String text) {
            jmsTemplate.convertAndSend(text);
            System.out.println("send: " + text);
        }
    
    }
    
    

    作用:通过jmsTemplate发送一个字符串信息

    6. 配置一个Listener来监听和处理当前的Message Queue

    JMSReceiver.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <!-- 配置一个Listener来监听和处理当前的Message Queue -->
    
        <!-- Message Receiver Definition -->
        <bean id="messageReceiver" class="com.net.jms.MessageReceiver" />
    
        <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destinationName" value="${jms.queue.name}"/>
            <property name="messageListener" ref="messageReceiver"/>
        </bean>
    
    </beans>
    

    自定义了一个MessageListener,且使用Spring提供的SimpleMessageListenerContainer作为Container。

    7. 创建MessageListener的具体实现

    MessageReceiver.java

    package com.net.jms;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * 消息侦听器:监听当前的Message Queue
     * 从Queue中读取消息,并输出到当前控制台中
     */
    public class MessageReceiver implements MessageListener {
    
        public void onMessage(Message message) {
    
            if (message instanceof TextMessage){
    
                TextMessage textMessage = (TextMessage) message;
                try {
    
                    String text = textMessage.getText();
                    System.out.println(String.format("Received: %s",text));
                    try {
    
                        Thread.sleep(100);
                    } catch (InterruptedException e){
                        e.printStackTrace();
                    }
                } catch (JMSException e){
                    e.printStackTrace();
                }
            }
        }
    }
    
    

    作用:从Queue中读取消息,并输出到当前控制台中。

    8. 创建两个测试类,一个用于发送消息到ActiveMQ的MessageQueue中,一个用于从MessageQueue中读取消息

    SenderApp.java

    package com.net;
    
    import com.net.jms.MessageSender;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.util.StringUtils;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    
    /**
     * 发送消息到ActiveMQ的Message Queue
     */
    public class SenderApp {
    
        public static void main(String[] args) throws IOException {
    
            start("JMSConfiguration.xml");
        }
    
        public static void start(String configLocation) throws IOException {
    
            MessageSender sender = getMessageSender(configLocation);
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
    
            System.out.println("Please input your message:");
            String text = br.readLine();
            while (!StringUtils.isEmpty(text)) {
    
                System.out.println(String.format("send message: %s", text));
                sender.send(text);
                text = br.readLine();
            }
        }
    
        public static MessageSender getMessageSender(String configLocation) {
    
            ApplicationContext context = new ClassPathXmlApplicationContext(configLocation);
            return (MessageSender) context.getBean("messageSender");
        }
    
    }
    
    

    ReceiverApp.java

    package com.net;
    
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    /**
     * 从Message Queue中读取消息
     */
    public class ReceiverApp {
    
        public static void main(String[] args) {
    
            new ClassPathXmlApplicationContext("JMSConfiguration.xml", "JMSReceiver.xml");
        }
    
    }
    
    

    9. 测试:运行SenderApp.java,在控制台输入消息,接着运行ReceiverApp.java,结果如下

    总结

    到此,spring jms与activemq的通信就已经集成结束了。

    相关文章

      网友评论

        本文标题:Spring JMS + ActiveMQ

        本文链接:https://www.haomeiwen.com/subject/lwvmxxtx.html