美文网首页
JMS(三)---Spring集成JMS连接ActiveMQ

JMS(三)---Spring集成JMS连接ActiveMQ

作者: shallwego_ | 来源:发表于2018-10-12 11:16 被阅读0次

    一、Spring JMS理论

    Spring 提供的接口

    • ConnectionFacory 用于管理连接的连接工厂
    • JmsTemplate 用于发送和接收消息的模板类
    • MessageListerner 消息监听器
    1、ConnectionFacory
    • 这是一个Spring为我们提供的连接池,与ActiveMQ中定义的ConnectionFactory不同
    • Spring中提供了SingleConnectionFactory和CachingConnectionFactory。
      • SingleConnectionFactory 对于建立JMS服务器连接的请求只会返回一个同一个Connection,也就是说在整个应用中只会使用一个连接进行操作。
      • CachingConnectionFactory 继承了 SingleConnectionFactory 所以它拥有SingleConnectionFactory 它的所有的功能,同时还新增了缓存功能,缓存 producer和consumer
    2、JmsTemplate
    • 是 Spring 提供的,只需要向Spring容器内注册这个类就可以使用JmsTemplate 方便操作 jms ;
    • JMSTemplate每次发消息都会重新创建connection,session和producer
    • JmsTemplate 是线程安全的,可以在整个应用范围使用,但是并不代表整个应用中只能使用一个,我们可以创建多个。
    3、MessageListerner
    • 实现一个onMessage方法即可,该方法只接收一个Message参数。

    二、Spring JMS代码实现

    项目结构:

    1、在pom.xml文件中引入依赖
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.test.jms</groupId>
        <artifactId>jms-spring</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <spring.version>4.2.5.RELEASE</spring.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.11</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context</artifactId>
                <version>${spring.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version>${spring.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>${spring.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-core</artifactId>
                <version>5.7.0</version>
                <exclusions>
                    <exclusion>
                        <artifactId>spring-context</artifactId>
                        <groupId>org.springframework</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
        </dependencies>
    
    </project>
    
    2、创建Spring配置文件
    (1)common.xml

    配置producer.xml 和 consumer.xml公用的bean。

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    
        <context:annotation-config/>
        <!-- ActiveMQ为我们提供的ConnectionFactory-->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://127.0.0.1:61616"/>
        </bean>
        <!--Spring JMS为我们提供的连接池-->
        <bean id="connectionFacory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        </bean>
    
        <!--一个队列目的地,点对点模式-->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="queue"/>
        </bean>
    
        <!--一个主题目的地,发布/订阅模式-->
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="topic"/>
        </bean>
    
    </beans>
    
    (2)producer.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    
        <import resource="common.xml"/>
        <!--配置JmsTemplate,用于发送消息-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFacory"/>
        </bean>
    
        <bean class="com.test.jms.producer.ProducerServiceImpl"/>
    </beans>
    

    (3)consumer.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <import resource="common.xml"/>
    
        <!--配置消息监听器-->
        <bean id="consumerMessageListener" class="com.test.jms.consumer.ConsumerMessageListener"></bean>
    
        <!--配置消息监听容器-->
        <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFacory"/>
            <property name="destination" ref="queueDestination"/>
            <!--<property name="destination" ref="topicDestination"/>-->
            <property name="messageListener" ref="consumerMessageListener"/>
        </bean>
    </beans>
    
    3、队列模式
    (1)创建生产者服务

    接口:ProducerService

    package com.test.jms.producer;
    
    public interface ProducerService {
        void sendMessage(String message);
    }
    
    

    实现:ProducerServiceImpl

    package com.test.jms.producer;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    
    import javax.annotation.Resource;
    import javax.jms.*;
    
    
    public class ProducerServiceImpl implements  ProducerService {
    
        @Autowired
        JmsTemplate jmsTemplate;
    
        @Resource(name="queueDestination")
        Destination destination;
      
        public void sendMessage(final String message) {
            //使用JmsTemplate发送消息
            jmsTemplate.send(destination, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    //创建一个消息
                    TextMessage textMessage = session.createTextMessage(message);
                    return textMessage;
                }
            });
            System.out.println("发送消息:"+ message);
        }
    }
    
    

    (2)创建生产者

    package com.test.jms.producer;
    
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class AppProducer {
        public static void main(String[] args) {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
            ProducerService producerService=context.getBean(ProducerService.class);
            for (int i = 0; i <100; i++) {
                producerService.sendMessage("test"+i);
            }
            context.close();
        }
    
    }
    
    

    (3)创建消费者监听器

    package com.test.jms.consumer;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    import javax.xml.soap.Text;
    
    public class ConsumerMessageListener implements MessageListener {
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("接收消息:"+textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    (5)创建消费者

    package com.test.jms.consumer;
    
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class AppConsumer {
        public static void main(String[] args) {
            ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
        }
    }
    
    

    测试

    1. 打开ActiveMQ的后台:http://localhost:8161/
    2. 启动生产者中的”main”方法,即可看到正常的发送消息输出了。
    3. 启动”main”方法即可看到消息被消费了,查看ActiveMQ后台可看到。
    4. 如果启动生产者之前,启动两个消费者,则发送的消息被两个消费者平均消费。A消费者接收偶数消息,B消费者接收奇数消息。




    4、主题模式

    只需要三步就可以将 队列模式 改成 主题模式 了。

    (1)在common.xml中加入:
    <!-- 一个主题目的地,发布订阅模式 -->
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="topic"/>
        </bean>
    
    (2)修改 ProducerServiceImpl :
        @Resource(name = "queueDestination")
        Destination destination;
    

    改为:

        @Resource(name = "topicDestination")
        Destination destination;
    
    (3)修改consumer.xml:
           <!--配置消息监听容器-->
        <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFacory"/>
            <property name="destination" ref="queueDestination"/>
            <property name="messageListener" ref="consumerMessageListener"/>
        </bean>
    

    将消息队列目的地:

    <property name="destination" ref="queueDestination"/>
    

    改为:

    <property name="destination" ref="topicDestination"/>
    

    测试

    启动两个消费者 AppConsumer, 然后启动一个生产者 Producer 可以看到输出了。A、B消费者可接收到生产者发送的全部消息。




    相关文章

      网友评论

          本文标题:JMS(三)---Spring集成JMS连接ActiveMQ

          本文链接:https://www.haomeiwen.com/subject/mtibaftx.html