ActiveMQ入门教程

作者: 龙圣贤 | 来源:发表于2017-10-02 01:13 被阅读1882次

    ActiveMQ入门教程

    本博客内容皆为网络搜集而来,不保证任何版权问题,不保证长期有效性(即具有时效性),如有侵权或内容有违相关法律法规,请联系本人邮箱移除

    博客永久更新地址: ActiveMQ入门教程

    概述与介绍

    ActiveMQ 是Apache出品,最流行的. 功能强大的即时通讯和集成模式的开源服务器。

    ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

    提供客户端支持跨语言和协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能。

    特性

    1. 多种语言和协议编写客户端。语言: Java. C. C++. C#. Ruby. Perl. Python. PHP。应用协议:OpenWire. Stomp REST. WS Notification. XMPP. AMQP

    2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

    3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

    4. 通过了常见J2EE服务器(如 Geronimo. JBoss 4. GlassFish. WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

    5. 支持多种传送协议:in-VM. TCP. SSL. NIO. UDP. JGroups. JXTA

    6. 支持通过JDBC和journal提供高速的消息持久化

    1. 从设计上保证了高性能的集群,客户端-服务器,点对点
    2. 支持Ajax
    3. 支持与Axis的整合
    4. 可以很容易得调用内嵌JMS provider,进行测试

    本文基于最新版的ActiveMQ 5.15.0进行讲解, 期间遇到的问题也是这个版本出现的, 解决方法也会在文中记录


    1.下载

    ActiveMQ 5.15.0下载地址

    2.安装

    2.1解压apache-activemq-5.15.0-bin.zip

    本文解压在D盘根目录

    activemq-binactivemq-bin

    2.2启动activemq

    activemq-start01activemq-start01

    2.2.1普通启动命令

    按下: win+r, 输入cmd, 进入dos界面:

    open-cmdopen-cmd
    //3条命令的意思分别是:
    //1.进入activemq的bin目录
    //2.进入d盘
    //3.启动activemq
    
    cd D:\apache-activemq-5.15.0\bin
    d:
    activemq start
    

    如果你启动看到了异常信息:

    Caused by: java.io.IOException: Failed to bind to server socket: tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600 due to: java.net.SocketException: Unrecognized Windows Sockets error: 0: **JVM_Bind**
    
    

    原因是:

    你的端口被占用了。找到是哪个程序占用了你的端口, 并kill掉该进程或服务。或者尝试修改ActiveMQ的默认端口61616(ActiveMQ使用的默认端口是61616), 在大多数情况下,占用61616端口的是Internet Connection Sharing (ICS) 这个Windows服务,你只需停止它就可以启动ActiveMQ了。

    启动成功:

    启动完成访问http://127.0.0.1:8161/启动完成访问http://127.0.0.1:8161/

    1.访问: http://127.0.0.1:8161/,再点击Manage ActiveMQ broker

    web管理页面web管理页面

    进入管理页面, 帐号密码都是: admin


    输入帐号密码输入帐号密码

    进入admin页面, 也可以直接进入http://127.0.0.1:8161/admin/, 输入帐号密码: admin即可

    进入admin页面进入admin页面

    导航菜单中

    • Queues是队列方式消息
    • Topics是主题方式消息
    • Subscribers消息订阅监控查询
    • Connections可以查看链接数,分别可以查看xmpp、ssl、stomp、openwire、ws和网络链接
    • Network是网络链接数监控
    • Send可以发送消息数据。

    如果你想修改用户名和密码的话,在conf/jetty-realm.properties中修改即可。

    ## ---------------------------------------------------------------------------
    ## Licensed to the Apache Software Foundation (ASF) under one or more
    ## contributor license agreements.  See the NOTICE file distributed with
    ## this work for additional information regarding copyright ownership.
    ## The ASF licenses this file to You under the Apache License, Version 2.0
    ## (the "License"); you may not use this file except in compliance with
    ## the License.  You may obtain a copy of the License at
    ## 
    ## http://www.apache.org/licenses/LICENSE-2.0
    ## 
    ## Unless required by applicable law or agreed to in writing, software
    ## distributed under the License is distributed on an "AS IS" BASIS,
    ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    ## See the License for the specific language governing permissions and
    ## limitations under the License.
    ## ---------------------------------------------------------------------------
    
    # Defines users that can access the web (console, demo, etc.)
    # username: password [,rolename ...]
    admin: admin, admin
    user: user, user
    
    

    2.2.2运行demo示例

    遇到的问题:

    使用官方文档提供的启动方式启动ActiveMQ v5.15,
    bin/activemq console xbean:examples/conf/activemq-demo.xml
    不能启动,activemq中没有console 参数。
    解决办法:

    不同版本启动命令不一样!

    1、V5.8使用下面的命令行启动
    D:\apache-activemq-5.8.0\bin>activemq xbean:../conf/activemq-demo.xml
    2、V5.9使用下面的命令行启动
    D:\apache-activemq-5.9.1\bin>activemq xbean:../examples/conf/activemq-demo.xml
    3、V5.15.0使用下面的命令行启动
    D:\apache-activemq-5.15.0\bin>activemq start xbean:file:../examples/conf/activemq-demo.xml

    当然你还可以用绝对的文件目录方式:activemq start xbean:file:D:\apache-activemq-5.15.0\conf/activemq-demo.xml

    输入命令启动

    activemq start xbean:file:../examples/conf/activemq-demo.xml
    
    运行demo示例运行demo示例

    访问: http://localhost:8161/demo/ , 点击Websockets

    demo示例页面demo示例页面

    下面还有个发消息和收消息

    发消息和收消息发消息和收消息

    发消息

    发消息发消息

    收消息

    收消息收消息

    进入页面

    连接页连接页

    3.使用

    3.1消息示例

    3.1.1ActiviteMQ消息有3种形式

    JMS 公共 点对点域 发布/订阅域
    ConnectionFactory QueueConnectionFactory TopicConnectionFactory
    Connection QueueConnection TopicConnection
    Destination Queue Topic
    Session QueueSession TopicSession
    MessageProducer QueueSender TopicPublisher
    MessageConsumer QueueReceiver TopicSubscriber
    1. 点对点方式(point-to-point)

    点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息.具体点就是Sender Client发送Message Queue ,而 receiver Cliernt从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行

    1. 发布/订阅 方式(publish/subscriber Messaging)

    发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。

    3.1.2ActiviteMQ接收和发送消息基本流程

    基本流程基本流程

    发送消息的基本步骤:

    (1)、创建连接使用的工厂类JMS ConnectionFactory

    (2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动

    (3)、使用连接Connection 建立会话Session

    (4)、使用会话Session和管理对象Destination创建消息生产者MessageSender

    (5)、使用消息生产者MessageSender发送消息

    消息接收者从JMS接受消息的步骤

    (1)、创建连接使用的工厂类JMS ConnectionFactory

    (2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动

    (3)、使用连接Connection 建立会话Session

    (4)、使用会话Session和管理对象Destination创建消息接收者MessageReceiver

    (5)、使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。

    3.1.3使用java代码实现收发消息

    1.使用JMS方式发送接收消息
    发送方代码
    package com.dragon.activemq.demo;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    /**
     * @author Dragon
     * @version V0.1
     * @Title: com.dragon.activemq.demo.MessageSender
     * @project ActiveMQ-5.15.0
     * @Description: 使用JMS方式发送接收消息 - 消息发送者
     * @date 2017/10/01 21:16
     */
    public class MessageSender {
    
        // 发送次数
        public static final int SEND_NUM = 5;
        // tcp 地址
        public static final String BROKER_URL = "tcp://localhost:61616";
        // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
        public static final String DESTINATION = "sagedragon.mq.queue";
    
        /**
         * <b>function:</b> 发送消息
         *
         * @param session
         * @param producer
         * @throws Exception
         */
        public static void sendMessage(Session session, MessageProducer producer) throws Exception {
            for (int i = 0; i < SEND_NUM; i++) {
                String message = "发送消息第" + (i + 1) + "条";
                TextMessage text = session.createTextMessage(message);
    
                System.out.println(message);
                producer.send(text);
            }
        }
    
        public static void run() throws Exception {
    
            Connection connection = null;
            Session session = null;
            try {
                // 创建链接工厂
                ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
                // 通过工厂创建一个连接
                connection = factory.createConnection();
                // 启动连接
                connection.start();
                // 创建一个session会话
                session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Destination destination = session.createQueue(DESTINATION);
                // 创建消息制作者
                MessageProducer producer = session.createProducer(destination);
                // 设置持久化模式
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                sendMessage(session, producer);
                // 提交会话
                session.commit();
    
            } catch (Exception e) {
                throw e;
            } finally {
                // 关闭释放资源
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            MessageSender.run();
        }
    
    }
    

    输出

    发送消息第1条
    21:20:28.651 [main] DEBUG org.apache.activemq.TransactionContext - Begin:TX:ID:xxx-49962-1506864009010-1:1:1
    发送消息第2条
    发送消息第3条
    发送消息第4条
    发送消息第5条
    
    接收方
    package com.dragon.activemq.demo;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    /**
     * @author Dragon
     * @version V0.1
     * @Title: com.dragon.activemq.demo.MessageSender
     * @project ActiveMQ-5.15.0
     * @Description: 使用JMS方式发送接收消息 - 消息接收者
     * @date 2017/10/01 21:16
     */
    public class MessageReceiver {
    
        // tcp 地址
        public static final String BROKER_URL = "tcp://localhost:61616";
        // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
        public static final String DESTINATION = "sagedragon.mq.queue";
    
        public static void run() throws Exception {
    
            Connection connection = null;
            Session session = null;
            try {
                // 创建链接工厂
                ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection
                        .DEFAULT_PASSWORD, BROKER_URL);
                // 通过工厂创建一个连接
                connection = factory.createConnection();
                // 启动连接
                connection.start();
                // 创建一个session会话
                session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Destination destination = session.createQueue(DESTINATION);
                // 创建消息制作者
                MessageConsumer consumer = session.createConsumer(destination);
    
                while (true) {
                    // 接收数据的时间(等待) 100 ms
                    Message message = consumer.receive(1000 * 100);
    
                    TextMessage text = (TextMessage) message;
                    if (text != null) {
                        System.out.println("接收:" + text.getText());
                    } else {
                        break;
                    }
                }
    
                // 提交会话
                session.commit();
    
            } catch (Exception e) {
                throw e;
            } finally {
                // 关闭释放资源
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            MessageReceiver.run();
        }
    
    }
    

    输出

    21:25:08.718 [main] DEBUG org.apache.activemq.TransactionContext - Begin:TX:ID:xxx-49992-1506864308500-1:1:1
    接收:发送消息第1条
    接收:发送消息第2条
    接收:发送消息第3条
    接收:发送消息第4条
    接收:发送消息第5条
    
    2.Queue队列方式发送点对点消息数据
    发送方
    package com.dragon.activemq.demo;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.DeliveryMode;
    import javax.jms.MapMessage;
    import javax.jms.Queue;
    import javax.jms.QueueConnection;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueSession;
    import javax.jms.Session;
    
    /**
     * @author Dragon
     * @version V0.1
     * @Title: com.dragon.activemq.demo.QueueSender
     * @project ActiveMQ-5.15.0
     * @Description: Queue队列方式发送点对点消息数据 - 消息发送者
     * @date 2017/10/01 21:16
     */
    public class QueueSender {
    
        // 发送次数
        public static final int SEND_NUM = 5;
        // tcp 地址
        public static final String BROKER_URL = "tcp://localhost:61616";
        // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
        public static final String DESTINATION = "sagedragon.mq.queue";
    
        /**
         * <b>function:</b> 发送消息
         *
         * @param session
         * @param sender
         * @throws Exception
         */
        public static void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception {
            for (int i = 0; i < SEND_NUM; i++) {
                String message = "发送消息第" + (i + 1) + "条";
    
                MapMessage map = session.createMapMessage();
                map.setString("text", message);
                map.setLong("time", System.currentTimeMillis());
                System.out.println(map);
    
                sender.send(map);
            }
        }
    
        public static void run() throws Exception {
    
            QueueConnection connection = null;
            QueueSession session = null;
            try {
                // 创建链接工厂
                QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
                // 通过工厂创建一个连接
                connection = factory.createQueueConnection();
                // 启动连接
                connection.start();
                // 创建一个session会话
                session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Queue queue = session.createQueue(DESTINATION);
                // 创建消息发送者
                javax.jms.QueueSender sender = session.createSender(queue);
                // 设置持久化模式
                sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                sendMessage(session, sender);
                // 提交会话
                session.commit();
    
            } catch (Exception e) {
                throw e;
            } finally {
                // 关闭释放资源
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            QueueSender.run();
        }
    }
    

    输出

    21:27:31.827 [main] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_77, 25.77-b03, Oracle Corporation, OS: Windows 10, 10.0, amd64, CacheEnabled=true, Host=localhost, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.15.0}, magic=[A,c,t,i,v,e,M,Q]}
    21:27:31.834 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@50006] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_77, 25.77-b03, Oracle Corporation, OS: Windows 10, 10.0, amd64, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.15.0}, magic=[A,c,t,i,v,e,M,Q]}
    21:27:31.835 [main] DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_77, 25.77-b03, Oracle Corporation, OS: Windows 10, 10.0, amd64, CacheEnabled=true, Host=localhost, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.15.0}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_77, 25.77-b03, Oracle Corporation, OS: Windows 10, 10.0, amd64, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.15.0}, magic=[A,c,t,i,v,e,M,Q]}
    21:27:31.836 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@50006] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@50006 before negotiation: OpenWireFormat{version=12, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
    21:27:31.841 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@50006] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@50006 after negotiation: OpenWireFormat{version=12, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
    ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第1条, time=1506864451941} }
    21:27:31.966 [main] DEBUG org.apache.activemq.TransactionContext - Begin:TX:ID:Mr-Dragon-50004-1506864451704-1:1:1
    ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第2条, time=1506864451971} }
    ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第3条, time=1506864451973} }
    ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第4条, time=1506864451974} }
    ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第5条, time=1506864451975} }
    21:27:31.976 [main] DEBUG org.apache.activemq.ActiveMQSession - ID:Mr-Dragon-50004-1506864451704-1:1:1 Transaction Commit :TX:ID:Mr-Dragon-50004-1506864451704-1:1:1
    
    接收方
    package com.dragon.activemq.demo;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.Queue;
    import javax.jms.QueueConnection;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueSession;
    import javax.jms.Session;
    
    /**
     * @author Dragon
     * @version V0.1
     * @Title: com.dragon.activemq.demo.QueueReceiver
     * @project ActiveMQ-5.15.0
     * @Description: Queue队列方式发送点对点消息数据 - 消息发送者
     * @date 2017/10/01 21:16
     */
    public class QueueReceiver {
    
        // tcp 地址
        public static final String BROKER_URL = "tcp://localhost:61616";
        // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
        public static final String TARGET = "sagedragon.mq.queue";
    
    
        public static void run() throws Exception {
    
            QueueConnection connection = null;
            QueueSession session = null;
            try {
                // 创建链接工厂
                QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection
                        .DEFAULT_PASSWORD, BROKER_URL);
                // 通过工厂创建一个连接
                connection = factory.createQueueConnection();
                // 启动连接
                connection.start();
                // 创建一个session会话
                session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Queue queue = session.createQueue(TARGET);
                // 创建消息制作者
                javax.jms.QueueReceiver receiver = session.createReceiver(queue);
    
                receiver.setMessageListener(new MessageListener() {
                    public void onMessage(Message msg) {
                        if (msg != null) {
                            MapMessage map = (MapMessage) msg;
                            try {
                                System.out.println(map.getLong("time") + "接收#" + map.getString("text"));
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
                // 休眠100ms再关闭
                Thread.sleep(1000 * 100);
    
                // 提交会话
                session.commit();
    
            } catch (Exception e) {
                throw e;
            } finally {
                // 关闭释放资源
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            QueueReceiver.run();
        }
    }
    

    输出 , 爆了个类型转换错误

    21:31:36.409 [ActiveMQ Session Task-1] ERROR org.apache.activemq.ActiveMQMessageConsumer - ID:Mr-Dragon-50055-1506864696129-1:1:1:1 Exception while processing message: ID:Mr-Dragon-49962-1506864009010-1:1:1:1:5
    java.lang.ClassCastException: org.apache.activemq.command.ActiveMQTextMessage cannot be cast to javax.jms.MapMessage
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    1506864451941接收#发送消息第1条
    1506864451971接收#发送消息第2条
    1506864451973接收#发送消息第3条
    1506864451974接收#发送消息第4条
    1506864451975接收#发送消息第5条
    
    3.Topic主题发布和订阅消息
    发送方
    package com.dragon.activemq.demo;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.DeliveryMode;
    import javax.jms.MapMessage;
    import javax.jms.Session;
    import javax.jms.Topic;
    import javax.jms.TopicConnection;
    import javax.jms.TopicConnectionFactory;
    import javax.jms.TopicPublisher;
    import javax.jms.TopicSession;
    
    /**
     * @author Dragon
     * @version V0.1
     * @Title: com.dragon.activemq.demo.QueueSender
     * @project ActiveMQ-5.15.0
     * @Description: Topic主题发布和订阅消息 - 消息发送者
     * @date 2017/10/01 21:16
     */
    public class TopicSender {
    
        // 发送次数
        public static final int SEND_NUM = 5;
        // tcp 地址
        public static final String BROKER_URL = "tcp://localhost:61616";
        // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
        public static final String DESTINATION = "sagedragon.mq.queue";
    
        /**
         * <b>function:</b> 发送消息
         *
         * @param session   会话
         * @param publisher 发布者
         * @throws Exception
         */
        public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {
            for (int i = 0; i < SEND_NUM; i++) {
                String message = "发送消息第" + (i + 1) + "条";
    
                MapMessage map = session.createMapMessage();
                map.setString("text", message);
                map.setLong("time", System.currentTimeMillis());
                System.out.println(map);
    
                publisher.send(map);
            }
        }
    
        public static void run() throws Exception {
    
            TopicConnection connection = null;
            TopicSession session = null;
            try {
                // 创建链接工厂
                TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
                // 通过工厂创建一个连接
                connection = factory.createTopicConnection();
                // 启动连接
                connection.start();
                // 创建一个session会话
                session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Topic topic = session.createTopic(DESTINATION);
                // 创建消息发送者
                TopicPublisher publisher = session.createPublisher(topic);
                // 设置持久化模式
                publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                sendMessage(session, publisher);
                // 提交会话
                session.commit();
    
            } catch (Exception e) {
                throw e;
            } finally {
                // 关闭释放资源
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            TopicSender.run();
        }
    }
    
    

    输出

    ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第1条, time=1506865178417} }
    21:39:38.425 [main] DEBUG org.apache.activemq.TransactionContext - Begin:TX:ID:Mr-Dragon-50129-1506865178168-1:1:1
    ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第2条, time=1506865178429} }
    ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第3条, time=1506865178430} }
    ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第4条, time=1506865178432} }
    ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第5条, time=1506865178433} }
    
    接收方
    package com.dragon.activemq.demo;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.Topic;
    import javax.jms.TopicConnection;
    import javax.jms.TopicConnectionFactory;
    import javax.jms.TopicSession;
    import javax.jms.TopicSubscriber;
    
    /**
     * @author Dragon
     * @version V0.1
     * @Title: com.dragon.activemq.demo.QueueSender
     * @project ActiveMQ-5.15.0
     * @Description: Topic主题发布和订阅消息 - 消息接收者,依赖hawtbuf-1.9.jar
     * @date 2017/10/01 21:16
     */
    public class TopicReceiver {
    
        // tcp 地址
        public static final String BROKER_URL = "tcp://localhost:61616";
        // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
        public static final String TARGET = "sagedragon.mq.queue";
    
        public static void run() throws Exception {
    
            TopicConnection connection = null;
            TopicSession session = null;
            try {
                // 创建链接工厂
                TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection
                        .DEFAULT_PASSWORD, BROKER_URL);
                // 通过工厂创建一个连接
                connection = factory.createTopicConnection();
                // 启动连接
                connection.start();
                // 创建一个session会话
                session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Topic topic = session.createTopic(TARGET);
                // 创建消息制作者
                TopicSubscriber subscriber = session.createSubscriber(topic);
    
                subscriber.setMessageListener(new MessageListener() {
                    public void onMessage(Message msg) {
                        if (msg != null) {
                            MapMessage map = (MapMessage) msg;
                            try {
                                System.out.println(map.getLong("time") + "接收#" + map.getString("text"));
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
                // 休眠100ms再关闭
                Thread.sleep(1000 * 100);
    
                // 提交会话
                session.commit();
    
            } catch (Exception e) {
                throw e;
            } finally {
                // 关闭释放资源
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            TopicReceiver.run();
        }
    }
    
    

    输出

    1506865454228接收#发送消息第1条
    21:44:36.318 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10881ms elapsed since last write check.
    21:44:36.373 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
    1506865454236接收#发送消息第2条
    1506865454237接收#发送消息第3条
    1506865454237接收#发送消息第4条
    1506865454238接收#发送消息第5条
    
    4.在spring中使用ActiveMQ

    xml配置: applicationContext-beans.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.1.xsd">
    
        <!-- 连接池  -->
        <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
            <property name="connectionFactory">
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                    <property name="brokerURL" value="tcp://localhost:61616"/>
                </bean>
            </property>
        </bean>
    
        <!-- 连接工厂 -->
        <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:61616"/>
        </bean>
    
        <!-- 配置消息目标 -->
        <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
            <!-- 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp -->
            <constructor-arg index="0" value="sagedragon.mq.queue"/>
        </bean>
    
        <!-- 消息模板 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="activeMQConnectionFactory"/>
            <property name="defaultDestination" ref="destination"/>
            <property name="messageConverter">
                <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
            </property>
        </bean>
    </beans>
    

    pom依赖

     <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-all</artifactId>
       <version>5.15.0</version>
    </dependency>
    
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-spring</artifactId>
      <version>5.15.0</version>
    </dependency>
    
    发送方
    package com.dragon.activemq.demo;
    
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.FileSystemXmlApplicationContext;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.Session;
    import java.util.Date;
    
    /**
     * @author Dragon
     * @version V0.1
     * @Title: com.dragon.activemq.demo.SpringSender
     * @Description: Spring JMSTemplate 消息发送者
     * @project ActiveMQ-5.15.0
     * @date 2017/10/01 21:47
     */
    public class SpringSender {
    
        public static void main(String[] args) {
            ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext-*.xml");
            JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
    
            jmsTemplate.send(new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    MapMessage message = session.createMapMessage();
                    message.setString("message", "current system time: " + new Date().getTime());
    
                    return message;
                }
            });
        }
    }
    
    

    输出

    21:59:36.021 [main] DEBUG org.springframework.jms.core.JmsTemplate - Sending created message: ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {message=current system time: 1506866376016} }
    
    接收方
    package com.dragon.activemq.demo;
    
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.FileSystemXmlApplicationContext;
    import org.springframework.jms.core.JmsTemplate;
    
    import java.util.Map;
    
    /**
     * @author Dragon
     * @version V0.1
     * @Title: com.dragon.activemq.demo.SpringSender
     * @Description: Spring JMSTemplate 消息接收者
     * @project ActiveMQ-5.15.0
     * @date 2017/10/01 21:47
     */
    public class SpringReceiver {
    
        @SuppressWarnings("unchecked")
        public static void main(String[] args) {
            ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext-*.xml");
    
            JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
            while (true) {
                Map<String, Object> map = (Map<String, Object>) jmsTemplate.receiveAndConvert();
    
                System.out.println("收到消息:" + map.get("message"));
            }
        }
    }
    
    

    输出

    收到消息:current system time: 1506866100187
    
    5.在springboot中使用ActiveMQ

    pom.xml

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    

    application.yml

    spring:
      application:
        name: activemq-demo
    server:
      port: 9090
    
    发送方
    /**
     * @author Dragon
     * @version V0.1
     * @Title: com.dragon.activemq.demo.springboot.demo.MessageProduction
     * @Description: 消息生产者
     * @date 2017/10/01 23:14
     */
    //注册为一个bean
    @Component
    //开启定时器
    @EnableScheduling
    public class MessageProduction {
    
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;//使用JmsMessagingTemplate将消息放入队列
    
        @Autowired
        private Queue queue;
    
        @Scheduled(fixedDelay = 3000)//每3s执行1次,将消息放入队列内
        public void send() {
            this.jmsMessagingTemplate.convertAndSend(this.queue,
                "测试消息队列" + System.currentTimeMillis() / 1000);
        }
    }
    
    

    输出

    
    
    消息队列
    /**
     * @author Dragon
     * @version V0.1
     * @Title: com.dragon.activemq.demo.springboot.demo.MessageQueue
     * @Description: 队列消息发送者
     * @date 2017/10/01 23:16
     */
    @Component
    public class MessageQueue {
    
        //返回一个名为my-message的队列,并且注册为bean
        @Bean
        public Queue queue() {
            return new ActiveMQQueue("my-message");
        }
    
    }
    
    接收方
    package com.dragon.activemq.demo.springboot.demo;
    
    import com.dragon.activemq.demo.springboot.Constants;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    import javax.jms.JMSException;
    import java.text.MessageFormat;
    import java.util.Date;
    
    /**
     * @author Dragon
     * @version V0.1
     * @Title: com.dragon.activemq.demo.springboot.demo.MessageListener
     * @Description: 消息监听者
     * @date 2017/10/01 23:16
     */
    @Component
    public class MessageListener {
    
        /**
         * 使用@JmsListener注解来监听指定的某个队列内的消息,是否有新增,有的话则取出队列内消息
         * 进行处理
         **/
        @JmsListener(destination = "my-message")
        public void removeMessage(String msg) {
        //public void removeMessage(Email email) {
            System.out.println("监听接收到的消息是:" + msg);//打印队列内的消息
        }
      
        /*@JmsListener(destination = "mailbox", containerFactory = "myFactory")
        public void receiveMessage(Email email) {
            System.out.println("Received <" + email + ">");
        }*/
    
    }
    
    

    启动ActiveMQApplication后的输出

    监听接收到的消息是:测试消息队列1506875404
    监听接收到的消息是:测试消息队列1506875407
    监听接收到的消息是:测试消息队列1506875410
    监听接收到的消息是:测试消息队列1506875413
    

    ActiveMQApplication.java

    package com.dragon.activemq.demo;
    
    import com.dragon.activemq.demo.springboot.demo.Email;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.jms.annotation.EnableJms;
    import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    import org.springframework.jms.config.JmsListenerContainerFactory;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
    import org.springframework.jms.support.converter.MessageConverter;
    import org.springframework.jms.support.converter.MessageType;
    
    import javax.jms.ConnectionFactory;
    
    /**
     * @author Dragon
     * @version V0.1
     * @Title: com.dragon.activemq.demo.ActiveMQApplication
     * @Description: ActiveMQ 启动类
     * @date 2017/10/01 22:20
     */
    @SpringBootApplication(scanBasePackages = {"com.dragon"})////扫描com.dragon包的注解类为bean
    @EnableJms//开启jms
    public class ActiveMQApplication {
    
        /**
         * 将springboot里面的消息加到jms监听工厂
         * @param connectionFactory
         * @param configurer
         * @return
         */
        @Bean
        public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
                                                        DefaultJmsListenerContainerFactoryConfigurer configurer) {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            // This provides all boot's default to this factory, including the message converter
            configurer.configure(factory, connectionFactory);
            // You could still override some of Boot's default if necessary.
            return factory;
        }
    
        /**
         * 转换消息格式, 没有这个会爆类型转换错误:
         * Caused by: org.springframework.messaging.converter.MessageConversionException:
         * Cannot convert from [org.apache.activemq.command .ActiveMQTextMessage]
         * to
         * [com.dragon.activemq.demo.springboot.demo.Email]  / [java.lang.String]
         * for org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@3189b1ff
         * @return
         */
        @Bean // Serialize message content to json using TextMessage
        public MessageConverter jacksonJmsMessageConverter() {
            MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
            converter.setTargetType(MessageType.TEXT);
            converter.setTypeIdPropertyName("_type");
            return converter;
        }
    
        public static void main(String[] args) {
            // Launch the application
            ConfigurableApplicationContext context = SpringApplication.run(ActiveMQApplication.class, args);
    
            //JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
    
            // Send a message with a POJO - the template reuse the message converter
            //System.out.println("Sending an email message.");
            //jmsTemplate.convertAndSend("mailbox", new Email("info@example.com", "Hello"));
    
        }
    
    }
    

    4.遇到的坑

    4.1消息类型转换错误

    Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.activemq.command.ActiveMQTextMessage] to [com.dragon.activemq.demo.springboot.demo.Email] for org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@3189b1ff
    
    解决方法:

    在application类里加个bean

        @Bean // Serialize message content to json using TextMessage
        public MessageConverter jacksonJmsMessageConverter() {
            MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
            converter.setTargetType(MessageType.TEXT);
            converter.setTypeIdPropertyName("_type");
            return converter;
        }
    

    4.2发送消息之后消费不了

    解决方法:

    将springboot里的消息加到jms工厂

        @Bean
        public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
                                                        DefaultJmsListenerContainerFactoryConfigurer configurer) {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            // This provides all boot's default to this factory, including the message converter
            configurer.configure(factory, connectionFactory);
            // You could still override some of Boot's default if necessary.
            return factory;
        }
    
    

    4.3实体类序列化后报错

    Caused by: org.springframework.jms.support.converter.MessageConversionException: Could not convert JMS message; nested exception is javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.dragon.activemq.demo.springboot.demo.Email! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
    

    报错的详细讲解
    http://activemq.apache.org/objectmessage.html

    解决方法:

    设置可以序列化的包列表, 加到vm启动参数里面!

    #全部包
    #org.apache.activemq.SERIALIZABLE_PACKAGES=*
    org.apache.activemq.SERIALIZABLE_PACKAGES=com.dragon.activemq,com.dragon.demo
    
    加到vm启动参数里面加到vm启动参数里面

    5.参考资料

    ActiveMQ 4.x / 5.x Getting Started

    深入浅出 消息队列 ActiveMQ

    ActiveMQ In Action读书笔记

    Spring Boot学习笔记(十五) - 消息

    spring-boot集成activeMQ(一)-使用默认的ActiveMQ

    Messaging with JMS

    http://activemq.apache.org/objectmessage.html


    最后感谢观看本教程!

    @ 作者:龙圣贤
    @ 写作日期:2017年10月02日 01:05:14
    @ 转载请简信联系,并且带上文章出处。

    此致,今日分享完毕。

    相关文章

      网友评论

      • java人:写的很好,可以转载吗?
      • 不满_cc14:楼主,问一个问题就是点对点传输,我在QueueReceiver获得消息后,关闭它,然后再次启动且只启动它,结果发现,它又能输出消息了,我关了再启动还是这样,一直执行几次才能不再输出或得到消息,这是什么情况?
      • 呼嘎嘎_MrX:很好,很全

      本文标题:ActiveMQ入门教程

      本文链接:https://www.haomeiwen.com/subject/ynlzextx.html