美文网首页
JMS-ActiveMQ-Demo

JMS-ActiveMQ-Demo

作者: 8813d76fee36 | 来源:发表于2018-02-06 14:15 被阅读161次

    慕课网Java消息中间件笔记

    笔记代码
    https://gitee.com/oooh2016/JMS-DEMO

    安装ActiveMQ

    下载ActiveMQ并解压至任意目录,如/home/apache-activemq/

    运行ActiveMQ

    进入到ActiveMQ主目录下的bin目录。
    如:/home/apache-activemq/bin
    执行如下命令运行:

    $ ./activemq start

    如图执行成功

    尝试进入ActiveMQ主页

    默认端口:8161


    ActiveMQ主页

    点击Manage ActiveMQ broker进入管理页面,默认用户名和密码都是admin


    管理页面

    至此安装成功

    Java使用ActiveMQ

    新建Maven项目,并引入如下依赖

            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.15.3</version>
            </dependency>
    

    队列模式

    • 项目结构


      项目结构
    • 消息提供者
      消息提供者向消息中间件发送消息,需要配置消息服务器的地址和队列名称。
    package queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @Author: WJ
     * @Description: 消息提供者 向消息中间件发送消息
     * 61616是activemq默认端口
     * @Date: Created in 上午10:45 2018/2/6
     */
    public class AppProducer {
    
        private static final String URL = "tcp://192.168.58.3:61616";
        private static final String queueName = "queue-test";
    
        public static void main(String[] args) throws JMSException {
            //创建ConnectionFactory
            ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
    
            //创建Connection
            Connection connection = factory.createConnection();
    
            //建立连接
            connection.start();
    
            //创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //创建一个目标
            Destination destination = session.createQueue(queueName);
    
            //创建一个生产者
            MessageProducer producer = session.createProducer(destination);
    
            for (int i = 0; i < 100; i ++) {
                //创建消息
                TextMessage message = session.createTextMessage("test" + i);
                producer.send(message);
    
                //打印发送的消息
                System.out.println("发送消息:" + message.getText());
            }
    
            //关闭连接
            connection.close();
        }
    }
    
    

    运行消息提供者。

    进入ActiveMQ后台的Queue选项查看刚才发送的消息。此时看到名为queue-test的队列中有100条消息。
    接下来创建消费者消费队列中的消息。


    查看消息
    • 消息消费者
      同样需要指定消息服务器的地址,以及需要消费的队列名称。
    package queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @Author: WJ
     * @Description: 消费者
     * @Date: Created in 上午11:19 2018/2/6
     */
    public class AppConsumer {
    
        private static final String URL = "tcp://192.168.58.3:61616";
        private static final String queueName = "queue-test";
    
        public static void main(String[] args) throws JMSException {
            //创建ConnectionFactory
            ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
    
            //创建Connection
            Connection connection = factory.createConnection();
    
            //建立连接
            connection.start();
    
            //创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //创建一个目标
            Destination destination = session.createQueue(queueName);
    
            //创建一个消费者
            MessageConsumer consumer = session.createConsumer(destination);
    
            //创建监听器
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        //获取消息并打印
                        String text = textMessage.getText();
                        System.out.println("接收到的消息:" + text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            /**
             * 消息异步接收
             * 若在此处关闭连接
             * 可能会只接收到部分消息后
             * 因连接关闭而不能接收消息队列中的全部消息
             */
    //        connection.close();
        }
    }
    
    

    运行消费者,并观察运行结果。


    接收到的消息

    可以看到队列中的100条消息已经全部接收到了。
    ActiveMQ后台也显示queue-test这个队列中已经没有消息了,并且有一个消费者在线,100个消息出队列。


    后台情况
    • 运行多个消费者
      这次先启动3个消费者,看看三个消费者接收到的消息结果是什么样的。


      启动三个消费者

      运行提供者,发送100条消息。

    消费者1


    消费者1

    消费者2


    消费者2
    消费者3
    消费者3

    可以发现100条消息被依次发送给了三个消费者。


    队列模型

    主题模式

    • 项目结构


      项目结构
    • 消息发布者
      该代码与之前队列模式的发布者十分相似,只是在创建目标的环节由创建队列(createQueue)改为了创建主题(createTopic)。

    //创建一个目标
    Destination destination = session.createTopic(topicName);

    package topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @Author: WJ
     * @Description: 消息提供者 向消息中间件发送消息
     * 61616是activemq默认端口
     * @Date: Created in 上午10:45 2018/2/6
     */
    public class AppProducer {
    
        private static final String URL = "tcp://192.168.58.3:61616";
        private static final String topicName = "topic-test";
    
        public static void main(String[] args) throws JMSException {
            //创建ConnectionFactory
            ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
    
            //创建Connection
            Connection connection = factory.createConnection();
    
            //建立连接
            connection.start();
    
            //创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //创建一个目标
            Destination destination = session.createTopic(topicName);
    
            //创建一个生产者
            MessageProducer producer = session.createProducer(destination);
    
            for (int i = 0; i < 100; i ++) {
                //创建消息
                TextMessage message = session.createTextMessage("test" + i);
                producer.send(message);
    
                //打印发送的消息
                System.out.println("发送消息:" + message.getText());
            }
    
            //关闭连接
            connection.close();
        }
    }
    
    
    • 创建消费者
      同样在创建目标时使用createTopic

    Destination destination = session.createTopic(topicName);

    package topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @Author: WJ
     * @Description: 消费者
     * @Date: Created in 上午11:19 2018/2/6
     */
    public class AppConsumer {
    
        private static final String URL = "tcp://192.168.58.3:61616";
        private static final String topicName = "topic-test";
    
        public static void main(String[] args) throws JMSException {
            //创建ConnectionFactory
            ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
    
            //创建Connection
            Connection connection = factory.createConnection();
    
            //建立连接
            connection.start();
    
            //创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //创建一个目标
            Destination destination = session.createTopic(topicName);
    
            //创建一个消费者
            MessageConsumer consumer = session.createConsumer(destination);
    
            //创建监听器
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        //获取消息并打印
                        String text = textMessage.getText();
                        System.out.println("接收到的消息:" + text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            /**
             * 消息异步接收
             * 若在此处关闭连接
             * 可能会只接收到部分消息后
             * 因连接关闭而不能接收消息队列中的全部消息
             */
    //        connection.close();
        }
    }
    
    
    • 测试
      我们仿照之前的测试步骤,先启动消息提供者。
      进入到ActiveMQ后台,进入Topic页面,发现名为topic-test的主题中已经有了100条消息。


      ActiveMQ后台

      再启动消费者。
      此时看到后台中已经显示有一名消费者在线,但并没有消息被消费。


      ActiveMQ后台
      同时在消费者的控制台中也没有任何信息打印出来,说明确实没有获取到消息。
      消费者后台
    • 问题原因
      该现象产生的原因就是主题模式下,消费者无法接收到在它订阅该主题时刻之前的主题中的消息,只能接收到订阅时刻后主题中的消息。

    • 再次测试
      先启动消费者订阅topic-test主题,再让生产者提供新的消息。发现成功接收所有消息。


      成功接收消息
    • 启动多个消费者
      这次我们依然启动三个消费者。发现三个消费者都接收到了同样的100条消息。

    消费者1


    消费者1

    消费者2


    消费者2
    消费者3
    消费者3
    • 主题模式示意


      主题模型

    SpringBoot整合ActiveMQ

    新建项目

    勾选JMS(ActiveMQ)


    新建项目

    配置ActiveMQ连接

    spring:
      activemq:
        broker-url: tcp://192.168.58.3:61616
        close-timeout: 5000
        in-memory: false
        pool:
          max-connections: 100
    #      enabled: true
        send-timeout: 3000
    

    broker-url即访问远程消息服务器的地址,同时需要关闭内存消息服务器(in-memory=false)
    注释掉的配置(spring.activemq.pool.enabled)如果配置为true,则需要额外引入如下依赖:

            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
                <version>5.15.3</version>
            </dependency>
    

    启动类开启JMS支持

    在启动类上添加@EnableJMS注解

    @EnableJMS

    队列模式

    • 配置队列名称


      队列名称
    • 创建生产者

    这里引入了Spring提供的JmsMessagingTemplate和刚才创建的执行队列消息的对象。并使用jmsMessagingTemplate.convertAndSend()发送消息到消息服务器。

    package dev.wj.springbootjmsdemo.queue;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.jms.Queue;
    
    /**
     * @Author: WJ
     * @Description:
     * @Date: Created in 下午2:44 2018/2/6
     */
    @Component
    public class QueueProducer {
    
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        @Autowired
        private Queue queue;
    
        public void send(String message) {
            System.out.println("发送消息:" + message);
            jmsMessagingTemplate.convertAndSend(this.queue, message);
        }
    }
    
    • 创建消费者

    此处使用@JmsListener(destination = "")注解监听我们的队列。destination属性即要监听的队列名。

    package dev.wj.springbootjmsdemo.queue;
    
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author: WJ
     * @Description:
     * @Date: Created in 下午2:47 2018/2/6
     */
    @Component
    public class QueueConsumer {
    
        @JmsListener(destination = "spring-queue")
        public void receive(String text) {
            System.out.println("接收到消息:" + text);
        }
    }
    
    
    • 测试
    package dev.wj.springbootjmsdemo.queue;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    /**
     * @Author: WJ
     * @Description:
     * @Date: Created in 下午2:49 2018/2/6
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class QueueTest {
    
        @Autowired
        private QueueProducer queueProducer;
    
        @Test
        public void testQueue() {
            queueProducer.send("This is SpringBoot JMS Queue");
        }
    }
    
    

    控制台结果:


    控制台结果

    ActiveMQ后台结果


    ActiveMQ后台结果
    可以看到刚才创建的队列消息已经出现了。

    订阅模式

    • 配置主题


      配置主题
    • 创建发布者

    package dev.wj.springbootjmsdemo.topic;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.jms.Topic;
    
    /**
     * @Author: WJ
     * @Description:
     * @Date: Created in 下午2:52 2018/2/6
     */
    @Component
    public class TopicProducer {
    
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        @Autowired
        private Topic topic;
    
        public void send(String text) {
            System.out.println("topic发送消息:" + text);
            jmsMessagingTemplate.convertAndSend(this.topic, text);
        }
    }
    
    • 创建订阅者

    这里需要注意的就是需要为订阅者的监听指定containerFactory才能正确地接收主题中的消息。

    package dev.wj.springbootjmsdemo.topic;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    import org.springframework.jms.config.JmsListenerContainerFactory;
    import org.springframework.stereotype.Component;
    
    import javax.jms.ConnectionFactory;
    
    /**
     * @Author: WJ
     * @Description:
     * @Date: Created in 下午2:53 2018/2/6
     */
    @Component
    public class TopicConsumer {
    
        /**
         * 为主题订阅者Listener指定containerFactory
         * @param connectionFactory
         * @return
         */
        @Bean
        public JmsListenerContainerFactory jmsListenerContainerTopic(ConnectionFactory connectionFactory) {
            DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
            bean.setPubSubDomain(true);
            bean.setConnectionFactory(connectionFactory);
            return bean;
        }
    
        @JmsListener(destination = "spring-topic", containerFactory = "jmsListenerContainerTopic")
        public void receive(String text) {
            System.out.println("topic接收到消息:" + text);
        }
    }
    
    
    • 测试
    package dev.wj.springbootjmsdemo.topic;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    /**
     * @Author: WJ
     * @Description:
     * @Date: Created in 下午2:55 2018/2/6
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class TopicTest {
    
        @Autowired
        private TopicProducer topicProducer;
    
        @Test
        public void testTopic() {
            topicProducer.send("This is SpringBoot JMS Topic");
        }
    }
    
    
    • 执行结果
      控制台:


      控制台结果

      ActiveMQ后台结果:


      ActiveMQ后台结果

    相关文章

      网友评论

          本文标题:JMS-ActiveMQ-Demo

          本文链接:https://www.haomeiwen.com/subject/axzwzxtx.html