美文网首页
ActiveMQ结合Spring

ActiveMQ结合Spring

作者: 米刀灵 | 来源:发表于2016-07-23 18:35 被阅读251次

    先看application_mq.xml
    1.配置ConnectionFactory

    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"/>
    

    Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory的,真正产生到JMS服务器链接的ConnectionFactory还是由ActiveMQ提供,所以完整的ConnectionFactory配置如下,我们以SingleConnectionFactory为例:

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://localhost:61616"/>  
    </bean> 
    
    <!-- Spring用于管理的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
    </bean>  
    

    ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory可以用来将ConnectionSession和MessageProducer池化,这样可以大大的减少我们的资源消耗。当使用PooledConnectionFactory时,我们定义一个ConnectionFactory时如下:

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://localhost:61616"/>  
    </bean>  
      
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="connectionFactory" ref="targetConnectionFactory"/>
    </bean>
      
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>  
    </bean>  
    

    2.配置Template和Destination

    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <constructor-arg ref="connectionFactory" />
        <property name="messageConverter" ref="messageConverter"></property>
        <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="false" />
    </bean>
    
    <!--这个是队列目的地-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 设置消息队列的名字 -->
        <constructor-arg index="0" value="HelloWorldSpringQueue" />
    </bean>
    
    <!-- 定义JmsTemplate的Topic类型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <property name="messageConverter" ref="messageConverter"></property>
        <!-- pub/sub模型(发布/订阅) -->
        <property name="pubSubDomain" value="true" />
    </bean>
    
    <!--这个是订阅目的地-->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <!-- 设置消息队列的名字 -->
        <constructor-arg index="0" value="HelloWorldSpringTopic" />
    </bean>
    

    3.配置生产者和消费者
    我们在配置一个MessageListenerContainer的时候有三个属性必须指定,一个是表示从哪里监听的ConnectionFactory;一个是表示监听什么的Destination;一个是接收到消息以后进行消息处理的MessageListener。

    <!--send-->
    <bean id="queueProducer" class="com.zzhblh.activemq.producer.QueueProducer">
        <property name="jmsQueueTemplate" ref="jmsQueueTemplate"></property>
        <property name="queueDestination" ref="queueDestination"></property>
    </bean>
    
    <bean id="topicProducer" class="com.zzhblh.activemq.producer.TopicProducer">
        <property name="jmsTopicTemplate" ref="jmsTopicTemplate"></property>
        <property name="topicDestination" ref="topicDestination"></property>
    </bean>
    
    <!--receive-->
    <!-- 注入我们自己写的继承了javax.jms.MessageListener的消息监听器-->
    <bean id="consumerMessageListener" class="com.zzhblh.activemq.consumer.ConsumerMessageListener"/>
    
    <!-- 消息监听容器 -->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="consumerMessageListener" />
        <property name="sessionAcknowledgeMode" value="1"/>//1代表AUTO_ACKNOWLEDGE 
    </bean>
    <!-- 消息监听容器2 -->
    <bean id="jmsContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="topicDestination" />
        <property name="messageListener" ref="consumerMessageListener" />
        <property name="sessionAcknowledgeMode" value="1"/>//1代表AUTO_ACKNOWLEDGE 
    </bean>
    

    4.发送消息

    @Service
    public class QueueProducer {
        @Autowired
        JmsTemplate jmsQueueTemplate;
        @Autowired
        ActiveMQQueue queueDestination;
    
        public void sendMessages(final String text, final int i) throws JMSException {
                jmsQueueTemplate.send(queueDestination,new MessageCreator() {
                    public Message createMessage(Session session) throws JMSException {
                        TextMessage message = session.createTextMessage(text);
                        message.setIntProperty("messageCount", i);
                        return message;
                    }
                });
        }
    
        public JmsTemplate getJmsQueueTemplate() {
            return jmsQueueTemplate;
        }
    
        public void setJmsQueueTemplate(JmsTemplate jmsQueueTemplate) {
            this.jmsQueueTemplate = jmsQueueTemplate;
        }
    
        public ActiveMQQueue getQueueDestination() {
            return queueDestination;
        }
    
        public void setQueueDestination(ActiveMQQueue queueDestination) {
            this.queueDestination = queueDestination;
        }
    }
    

    5.接收消息

    public class ConsumerMessageListener implements MessageListener {
        private NotifyMessageConverter messageConverter;
    
        @Override
        public void onMessage(Message message) {
            if (message instanceof TextMessage) {
                //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage
                TextMessage textMsg = (TextMessage) message;
                System.out.println("接收到一个纯文本消息。");
                try {
                    System.out.println("消息内容是:" + textMsg.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            } else if (message instanceof MapMessage) {
                MapMessage mapMessage = (MapMessage) message;
            } else if (message instanceof ObjectMessage) {
                ObjectMessage objMessage = (ObjectMessage) message;
                try {
                    /*Object obj = objMessage.getObject();
                    Email email = (Email) obj;
                    Email email = (Email) messageConverter.fromMessage(objMessage);
                    System.out.println("接收到一个ObjectMessage,包含Email对象。");
                    System.out.println(email);*/
                    messageConverter.fromMessage(message);
                    System.out.println("接收到一个ObjectMessage");
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    6.消息转换器

    public class NotifyMessageConverter implements MessageConverter {
        @Override
        public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
            return session.createObjectMessage((Serializable) object);
        }
    
        @Override
        public Object fromMessage(Message message) throws JMSException, MessageConversionException {
            ObjectMessage objMessage = (ObjectMessage) message;
            return objMessage.getObject();
        }
    }
    

    要点:
    1.用activemq的connection构造spring用于管理的SingleConnectionFactory。用于发送/接收消息。
    2.创建Destination,ActiveMQQueue或ActiveMQTopic。用于发送/接收消息。

    3.用SingleConnectionFactory构造JmsTemplate。用于发送消息。
    4.用JmsTemplate和Destination(ActiveMQQueue/ActiveMQTopic)构造QueueProducer/TopicProducer。用于发送消息。

    5.创建MessageListener。用于接收消息。
    6.构造ListenerContainer,用到ConnectionFactory,Destination,MessageListener。用于接收消息。


    参考:
    http://my.oschina.net/xiaoxishan/blog/381209
    http://shengwangi.blogspot.com/2015/06/spring-jms-with-activemq-hello-world.html

    相关文章

      网友评论

          本文标题:ActiveMQ结合Spring

          本文链接:https://www.haomeiwen.com/subject/gktijttx.html