美文网首页MQ
找到组织——spring整合mq

找到组织——spring整合mq

作者: getthrough | 来源:发表于2017-11-05 22:12 被阅读6次
    • setp 1:添加依赖
        <!--集中定义依赖版本-->
        <properties>
            <activemq.version>5.11.2</activemq.version>
            <spring.version>4.2.4.RELEASE</spring.version>
            <junit.version>4.12</junit.version>
        </properties>
    
        <dependencies>
            <!--activemq-->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>${activemq.version}</version>
            </dependency>
            <!--jms-->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <!--整合支持-->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context-support</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <!--junit-->
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>${junit.version}</version>
            </dependency>
        </dependencies>
    
    • step 2:创建applicationContext.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:context="http://www.springframework.org/schema/context"
    
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd">
    
        <!--开启注解扫描-->
        <context:annotation-config/>
    
        <!--JMS服务厂商提供的连接工厂-->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://192.168.25.128:61616"/>
        </bean>
    
        <!--spring对厂商提供的连接工厂的包装,方便更换不同的JMS实现-->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <!--引用了实际的连接工厂-->
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        </bean>
    
        <!--配置生产者,使用spring提供的jms模板-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!--将spring包装后的连接工厂注入-->
            <property name="connectionFactory" ref="connectionFactory"/>
        </bean>
    
        <!--点对点消息队列-->
        <bean class="org.apache.activemq.command.ActiveMQQueue" id="activeMQQueue">
            <constructor-arg>
                <!--指定队列名称-->
                <value>spring_queue</value>
            </constructor-arg>
        </bean>
    
        <!--发布/订阅消息队列-->
        <bean class="org.apache.activemq.command.ActiveMQTopic" id="activeMQTopic">
            <constructor-arg>
                <value>spring_topic</value>
            </constructor-arg>
        </bean>
    

    spring整合后的连接工厂是包装后的连接工厂,今后若是项目想要替换消息中间件技术如RabbitMqRocketMq,只需要在配置文件中更改连接工厂的实现即可(还有注解信息...)。

    • step 3:创建队列消息生产者
    /**
     * 使用spring整合Junit4方式测试代码
     * 避免了测试方法前使用@Before注解手动初始化spring容器
     */
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:applicationContext.xml")
    public class Queue_Producer {
    
        @Resource(name = "jmsTemplate")
        private JmsTemplate jmsTemplate;
    
        @Resource(name = "activeMQQueue")
        private Queue queue;
    
        @Test
        public void sendQueueMessage() {
            jmsTemplate.send(queue, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage("text five!");
                }
            });
        }
    }
    
    • step 4:创建消息监听类
    /**
     * 消息监听器
     */
    public class MyMessageListener implements MessageListener {
        public void onMessage(Message message) {
            try {
                TextMessage textMessage = (TextMessage) message;
                System.out.println(textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    • step 5:在applicationContext.xml中配置监听器bean和监听容器
    -------------------在applicationContext.xml中追加以下内容---------------------
        <!--消息监听器-->
        <bean id="myMessageListener" class="listener.MyMessageListener"/>
    
        <!--消息监听容器-->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" id="messageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="messageListener" ref="myMessageListener"/>
            <property name="destination" ref="activeMQQueue"/>
        </bean>
    

    运行生产者类,消息即可被消费。

    • topic 消息生产与消费
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:applicationContext.xml")
    public class Topic_Producer {
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        @Autowired
        @Qualifier("activeMQTopic")
        private Topic topic;
    
        @Test
        public void sendMessage() {
            jmsTemplate.send(topic, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage("its a topic message");
                }
            });
        }
    }
    

    applicationContext.xml中创建监听容器,监听类为上文的 listener

        <!--广播消息监听容器-->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" id="topicMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="messageListener" ref="myMessageListener"/>
            <property name="destination" ref="activeMQTopic"/>
        </bean>
    

    运行生产者类,消息即可被消费。

    spring 整合了 ActiveMq 之后,当 spring 容器初始化时,监听器就会监听连接工厂中定义的 ip 和 端口,一旦生产者将消息发送至服务器,监听器就会获取消息进行消费。

    注意:本案例的MyMessageListener类不能定义在Test目录下,否则无法解析该类。

    若有疑惑,可能阅读 http://www.jianshu.com/p/1439340d2388 有所帮助。

    本文完。

    代码地址:https://github.com/Getthrough/ActiveMq_Spring_DemoCode

    相关文章

      网友评论

        本文标题:找到组织——spring整合mq

        本文链接:https://www.haomeiwen.com/subject/kgohmxtx.html