美文网首页待整理菜鸟要飞
ActiveMQ学习(二)安装与HelloWorld

ActiveMQ学习(二)安装与HelloWorld

作者: 万总有点菜 | 来源:发表于2017-05-28 16:49 被阅读54次

    准备

    ActiveMQ下载 ActiveMQ 5.14.5 Release 下载Windows Distribution
    注:运行环境需要装jdk

    代码

    demo代码

    运行

    解压下载文件,根据本地系统选择运行win32/win64文件夹的 activemq.bat

    文件路径
    双击运行如下
    运行界面
    浏览器访问 http://localhost:8161/ ,用户名和密码均为 admin ActiveMQ主界面

    Hello World (点对点的消息模型)

    • 使用maven来构建项目,pom配置
    <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-all</artifactId>
          <version>5.14.5</version>
    </dependency>
    
    • 项目目录
    项目目录
    • 编写消息生产者
    package com.sima.queues;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    /**
     * Created by Maple on 2017-05-28.
     */
    public class JMSProducer {
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
        private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
    //    private static final String BROKEURL= "tcp://localhost:8161"; // 默认的连接地址
        private static final int SENDNUM = 10; // 发送的消息数量
        public static void  main(String[] args){
    
            ConnectionFactory connectionFactory; // 连接工厂
            Connection connection = null; // 连接
            Session session; // 会话 接受或者发送消息的线程
            Destination destination; // 消息的目的地
            MessageProducer messageProducer; // 消息生产者
            connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME,
                    JMSProducer.PASSWORD, JMSProducer.BROKEURL);
            try {
                connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
                connection.start(); // 启动连接
                session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
                destination = session.createQueue("TestQueueFirst"); // 创建消息队列
                messageProducer = session.createProducer(destination); // 创建消息生产者
                //设置不持久化
    //            messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                sendMessage(session, messageProducer); // 发送消息
                session.commit();
            } catch (JMSException e) {
                e.printStackTrace();
            }finally {
                if(connection!=null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        private static void sendMessage(Session session, MessageProducer messageProducer) {
            for(int i=0;i<JMSProducer.SENDNUM;i++){
                TextMessage message= null;
                try {
                    message = session.createTextMessage("ActiveMQ 发送的消息-"+i);
                    System.out.println("发送消息:"+"ActiveMQ 发送的消息-"+i);
                    messageProducer.send(message);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    • 编写消息消费者
    package com.sima.queues;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    /**
     * Created by Maple on 2017-05-28.
     */
    public class JMSConsumerFirst {
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
        private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
    //    private static final String BROKEURL= "tcp://localhost:8161"; // 默认的连接地址
        public static void main(String[] args){
            ConnectionFactory connectionFactory; // 连接工厂
            Connection connection = null; // 连接
            Session session; // 会话 接受或者发送消息的线程
            Destination destination; // 消息的目的地
            MessageConsumer messageConsumer; // 消息的消费者
            // 实例化连接工厂
            connectionFactory=new ActiveMQConnectionFactory(JMSConsumerFirst.USERNAME, JMSConsumerFirst.PASSWORD, JMSConsumerFirst.BROKEURL);
            try {
                connection=connectionFactory.createConnection();  // 通过连接工厂获取连接
                connection.start(); // 启动连接
                session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
                destination=session.createQueue("TestQueueFirst");  // 创建连接的消息队列
                messageConsumer=session.createConsumer(destination); // 创建消息消费者
                while(true){
                    TextMessage textMessage=(TextMessage)messageConsumer.receive(100000);
                    if(textMessage!=null){
                        System.out.println("收到的消息:"+textMessage.getText());
                    }else{
                        break;
                    }
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    消息队列
    • 消费消息
      运行 JMSConsumerFirst
    控制台

    查看 http://localhost:8161/admin/queues.jsp

    消息队列
    • 此时,再运行 JMSProducer生产消息,会被消费者消费
    消费者数量

    监听方式

    • 创建监听类
    package com.sima.queues;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    /**
     * Created by Maple on 2017-05-28.
     */
    public class MyListener implements MessageListener {
        public void onMessage(Message message) {
            try {
                System.out.println("通过MyListener收到的消息:"+((TextMessage)message).getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 修改消息消费者
      该行代码
      messageConsumer.setMessageListener(new MyListener());// 注册消息监听
    

    替换

     while(true){
          TextMessage textMessage=(TextMessage)messageConsumer.receive(100000);
           if(textMessage!=null){
                System.out.println("收到的消息:"+textMessage.getText());
            }else{
                break;
            }                
    }
    

    运行结果一致,这种方式有利于代码的管理,建议采用该方式。

    相关文章

      网友评论

        本文标题:ActiveMQ学习(二)安装与HelloWorld

        本文链接:https://www.haomeiwen.com/subject/qzrhfxtx.html