美文网首页
Spring集成ActiveMQ

Spring集成ActiveMQ

作者: 木有_ | 来源:发表于2018-01-16 15:03 被阅读0次

    ActiveMQ基础

    JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

    基本概念

    JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。

    消息模型

    ○ Point-to-Point(P2P)

    ○ Publish/Subscribe(Pub/Sub)

    即点对点和发布订阅模型

    JMS编程模型

    (1) ConnectionFactory

    创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。

    (2) Destination

    Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。

    所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。

    (3) Connection

    Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

    (4) Session

    Session是我们操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

    (5) 消息的生产者

    消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

    (6) 消息消费者

    消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

    (7) MessageListener

    消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

    ActiveMQ下载及安装

    这个网上例子比较多,不同操作系统方式不同,这里不做说明。
    Mac可以使用brew命令来进行安装,也比较简单

    brew install activemq
    
    image.png

    下载安装成功以后,启动activemq

    activemq start
    

    然后本地浏览器中输入http://localhost:8161/查看是否启动成功,登录用户及密码为admin/admin

    image.png

    Spring集成

    1.使用maven引入activemq包

    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.2</version>
    </dependency>
    

    2.配置spring文件

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
        <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:61616"></property>
        </bean>
    
    <!-- Spring Caching连接工厂 -->
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
        <bean id="cachingConnectionFactory"
              class="org.springframework.jms.connection.CachingConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
            <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
            <!-- Session缓存数量 -->
            <property name="sessionCacheSize" value="10"></property>
        </bean>
    

    消息提供者/发布者配置

    jmsTemplate配置

    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
            <constructor-arg ref="cachingConnectionFactory"/>
        </bean>
    

    消息模型配置

       <!-- 点对点队列 -->   
        <bean id="mqQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg index="0" value="MessageQueue" />
        </bean>
       <!-- 发布/消费 -->
        <bean id="mqTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg index="0" value="MessageTopic" />
        </bean>
    

    Java代码

    消息提供者/发布者

    package com.permission.core.activemq;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Component;
    
    import javax.jms.*;
    
    /**
     * @Description: 消息提供者/发布者
     * @Author: 
     * @Date: Created in 下午5:44 2018/1/11
     */
    @Component
    public class MessageQueueSender {
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        @Autowired
        private Destination mqQueue;
    
        @Autowired
        private Destination mqTopic;
    
    
        public void sendMessage(String message) {
            jmsTemplate.send(mqTopic, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    TextMessage textMessage = session.createTextMessage(message);
                    return textMessage;
                }
            });
        }
    }
    
    

    如果是点对点模式,jmsTemplate.send(mqQueue,****)
    如果是发布/订阅模式,jmsTemplate.send(mqTopic,****)

    消息接收者/消费者配置

    如果跟消息提供者不在一个工程下,也需要配置connectionFactory

        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
        <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:61616"></property>
        </bean>
    
        <!-- Spring Caching连接工厂 -->
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
        <bean id="cachingConnectionFactory"
              class="org.springframework.jms.connection.CachingConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
            <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
            <!-- Session缓存数量 -->
            <property name="sessionCacheSize" value="10"></property>
        </bean>
    

    配置消息监听

        <!-- 消息监听器 -->
        <bean id="consumerMessageListener" class="com.bigdata.core.activemq.ConsumerMessageListener" />
    
        <jms:listener-container container-type="default" destination-type="topic" connection-factory="cachingConnectionFactory" acknowledge="auto">
            <jms:listener destination="MessageTopic" ref="consumerMessageListener"/>
        </jms:listener-container>
    

    这里需要注意,如果是点对点模式 jms:listener-container标签属性destination-type需要配置为queue,默认情况为queue;如果是发布/订阅模式jms:listener-container标签属性destination-type需要配置为topic

    消息接收者/订阅者 java代码

    package com.bigdata.core.activemq;
    
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * @Description: TODO
     * @Author: 
     * @Date: Created in 下午4:28 2018/1/11
     */
    public class ConsumerMessageListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try{
                System.out.println("接收到的消息内容是:" + textMessage.getText());
            }catch (Exception e){
                e.printStackTrace();
            }
    
        }
    }
    

    java代码监听需要实现接口MessageListener

    此外可以在amqConnectionFactory中配置消息传输监听器,用以处理网络异常及服务器异常,配置信息如下:

       <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
       <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
           <property name="brokerURL" value="tcp://localhost:61616"></property>
           <!-- 消息传输监听器 处理网络及服务器异常 -->
           <property name="transportListener">
               <bean class="com.permission.core.activemq.ActiveMQTransportListener" />
           </property>
       </bean>
    

    ActiveMQTransportListener代码

    package com.permission.core.activemq;
    
    import org.apache.activemq.transport.TransportListener;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    /**
     * @Description: TODO
     * @Author: 
     * @Date: Created in 下午4:31 2018/1/11
     */
    public class ActiveMQTransportListener implements TransportListener {
    
        private final static Logger log = LoggerFactory.getLogger(ActiveMQTransportListener.class);
    
        @Override
        public void onCommand(Object o) {
            log.info("onCommand -> 对消息传输命令进行监控  ...");
        }
    
        @Override
        public void onException(IOException e) {
            log.error("onException -> 消息服务器连接错误......", e);
        }
    
        @Override
        public void transportInterupted() {
            log.warn("transportInterupted -> 消息服务器连接发生中断...");
        }
    
        @Override
        public void transportResumed() {
            log.info("transportResumed -> 消息服务器连接已恢复...");
        }
    }
    

    订阅着/接收者消息传送监听器会对所有数据进行监控,如下:

    接收到的消息内容是:发布消息:0
    [2018-01-16 15:06:35.202] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
    ] >>> onCommand -> 对消息传输命令进行监控  ...
    接收到的消息内容是:发布消息:1
    [2018-01-16 15:06:35.206] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
    ] >>> onCommand -> 对消息传输命令进行监控  ...
    接收到的消息内容是:发布消息:2
    [2018-01-16 15:06:35.209] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
    ] >>> onCommand -> 对消息传输命令进行监控  ...
    接收到的消息内容是:发布消息:3
    [2018-01-16 15:06:35.214] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
    ] >>> onCommand -> 对消息传输命令进行监控  ...
    接收到的消息内容是:发布消息:4
    [2018-01-16 15:06:35.216] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
    ] >>> onCommand -> 对消息传输命令进行监控  ...
    接收到的消息内容是:发布消息:5
    [2018-01-16 15:06:35.220] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
    ] >>> onCommand -> 对消息传输命令进行监控  ...
    接收到的消息内容是:发布消息:6
    [2018-01-16 15:06:35.222] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
    ] >>> onCommand -> 对消息传输命令进行监控  ...
    接收到的消息内容是:发布消息:7
    [2018-01-16 15:06:35.223] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
    ] >>> onCommand -> 对消息传输命令进行监控  ...
    接收到的消息内容是:发布消息:8
    [2018-01-16 15:06:35.225] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
    ] >>> onCommand -> 对消息传输命令进行监控  ...
    接收到的消息内容是:发布消息:9
    [2018-01-16 15:06:35.226] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
    ] >>> onCommand -> 对消息传输命令进行监控  ...
    

    如果activemq服务器down掉,消息传输会输出

    2018-01-16 15:09:26,467 WARN [ActiveMQ Connection Executor: tcp://localhost/127.0.0.1:61616@56068] org.springframework.jms.connection.CachingConnectionFactory#onException[SingleConnectionFactory.java:322] Encountered a JMSException - resetting the underlying JMS Connection
    javax.jms.JMSException: java.io.EOFException
        at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54)
        at org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1952)
        at org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1971)
        at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
        at org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126)
        at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
        at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
        at org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:173)
        at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:345)
        at org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:96)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:219)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.io.EOFException: null
        at java.io.DataInputStream.readInt(DataInputStream.java:392)
        at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
        at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
        ... 1 common frames omitted
    2018-01-16 15:09:26,479 ERROR [ActiveMQ Connection Executor: tcp://localhost/127.0.0.1:61616@56068] com.permission.core.activemq.ActiveMQTransportListener#onException[ActiveMQTransportListener.java:26] onException -> 消息服务器连接错误......
    java.io.EOFException: null
        at java.io.DataInputStream.readInt(DataInputStream.java:392)
        at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
        at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
        at java.lang.Thread.run(Thread.java:748)
    

    相关文章

      网友评论

          本文标题:Spring集成ActiveMQ

          本文链接:https://www.haomeiwen.com/subject/fjusoxtx.html