美文网首页
JMS-ActiveMQ-Demo

JMS-ActiveMQ-Demo

作者: 8813d76fee36 | 来源:发表于2018-02-06 14:15 被阅读161次

慕课网Java消息中间件笔记

笔记代码
https://gitee.com/oooh2016/JMS-DEMO

安装ActiveMQ

下载ActiveMQ并解压至任意目录,如/home/apache-activemq/

运行ActiveMQ

进入到ActiveMQ主目录下的bin目录。
如:/home/apache-activemq/bin
执行如下命令运行:

$ ./activemq start

如图执行成功

尝试进入ActiveMQ主页

默认端口:8161


ActiveMQ主页

点击Manage ActiveMQ broker进入管理页面,默认用户名和密码都是admin


管理页面

至此安装成功

Java使用ActiveMQ

新建Maven项目,并引入如下依赖

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.3</version>
        </dependency>

队列模式

  • 项目结构


    项目结构
  • 消息提供者
    消息提供者向消息中间件发送消息,需要配置消息服务器的地址和队列名称。
package queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Author: WJ
 * @Description: 消息提供者 向消息中间件发送消息
 * 61616是activemq默认端口
 * @Date: Created in 上午10:45 2018/2/6
 */
public class AppProducer {

    private static final String URL = "tcp://192.168.58.3:61616";
    private static final String queueName = "queue-test";

    public static void main(String[] args) throws JMSException {
        //创建ConnectionFactory
        ConnectionFactory factory = new ActiveMQConnectionFactory(URL);

        //创建Connection
        Connection connection = factory.createConnection();

        //建立连接
        connection.start();

        //创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //创建一个目标
        Destination destination = session.createQueue(queueName);

        //创建一个生产者
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < 100; i ++) {
            //创建消息
            TextMessage message = session.createTextMessage("test" + i);
            producer.send(message);

            //打印发送的消息
            System.out.println("发送消息:" + message.getText());
        }

        //关闭连接
        connection.close();
    }
}

运行消息提供者。

进入ActiveMQ后台的Queue选项查看刚才发送的消息。此时看到名为queue-test的队列中有100条消息。
接下来创建消费者消费队列中的消息。


查看消息
  • 消息消费者
    同样需要指定消息服务器的地址,以及需要消费的队列名称。
package queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Author: WJ
 * @Description: 消费者
 * @Date: Created in 上午11:19 2018/2/6
 */
public class AppConsumer {

    private static final String URL = "tcp://192.168.58.3:61616";
    private static final String queueName = "queue-test";

    public static void main(String[] args) throws JMSException {
        //创建ConnectionFactory
        ConnectionFactory factory = new ActiveMQConnectionFactory(URL);

        //创建Connection
        Connection connection = factory.createConnection();

        //建立连接
        connection.start();

        //创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //创建一个目标
        Destination destination = session.createQueue(queueName);

        //创建一个消费者
        MessageConsumer consumer = session.createConsumer(destination);

        //创建监听器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    //获取消息并打印
                    String text = textMessage.getText();
                    System.out.println("接收到的消息:" + text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        /**
         * 消息异步接收
         * 若在此处关闭连接
         * 可能会只接收到部分消息后
         * 因连接关闭而不能接收消息队列中的全部消息
         */
//        connection.close();
    }
}

运行消费者,并观察运行结果。


接收到的消息

可以看到队列中的100条消息已经全部接收到了。
ActiveMQ后台也显示queue-test这个队列中已经没有消息了,并且有一个消费者在线,100个消息出队列。


后台情况
  • 运行多个消费者
    这次先启动3个消费者,看看三个消费者接收到的消息结果是什么样的。


    启动三个消费者

    运行提供者,发送100条消息。

消费者1


消费者1

消费者2


消费者2
消费者3
消费者3

可以发现100条消息被依次发送给了三个消费者。


队列模型

主题模式

  • 项目结构


    项目结构
  • 消息发布者
    该代码与之前队列模式的发布者十分相似,只是在创建目标的环节由创建队列(createQueue)改为了创建主题(createTopic)。

//创建一个目标
Destination destination = session.createTopic(topicName);

package topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Author: WJ
 * @Description: 消息提供者 向消息中间件发送消息
 * 61616是activemq默认端口
 * @Date: Created in 上午10:45 2018/2/6
 */
public class AppProducer {

    private static final String URL = "tcp://192.168.58.3:61616";
    private static final String topicName = "topic-test";

    public static void main(String[] args) throws JMSException {
        //创建ConnectionFactory
        ConnectionFactory factory = new ActiveMQConnectionFactory(URL);

        //创建Connection
        Connection connection = factory.createConnection();

        //建立连接
        connection.start();

        //创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //创建一个目标
        Destination destination = session.createTopic(topicName);

        //创建一个生产者
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < 100; i ++) {
            //创建消息
            TextMessage message = session.createTextMessage("test" + i);
            producer.send(message);

            //打印发送的消息
            System.out.println("发送消息:" + message.getText());
        }

        //关闭连接
        connection.close();
    }
}

  • 创建消费者
    同样在创建目标时使用createTopic

Destination destination = session.createTopic(topicName);

package topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Author: WJ
 * @Description: 消费者
 * @Date: Created in 上午11:19 2018/2/6
 */
public class AppConsumer {

    private static final String URL = "tcp://192.168.58.3:61616";
    private static final String topicName = "topic-test";

    public static void main(String[] args) throws JMSException {
        //创建ConnectionFactory
        ConnectionFactory factory = new ActiveMQConnectionFactory(URL);

        //创建Connection
        Connection connection = factory.createConnection();

        //建立连接
        connection.start();

        //创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //创建一个目标
        Destination destination = session.createTopic(topicName);

        //创建一个消费者
        MessageConsumer consumer = session.createConsumer(destination);

        //创建监听器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    //获取消息并打印
                    String text = textMessage.getText();
                    System.out.println("接收到的消息:" + text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        /**
         * 消息异步接收
         * 若在此处关闭连接
         * 可能会只接收到部分消息后
         * 因连接关闭而不能接收消息队列中的全部消息
         */
//        connection.close();
    }
}

  • 测试
    我们仿照之前的测试步骤,先启动消息提供者。
    进入到ActiveMQ后台,进入Topic页面,发现名为topic-test的主题中已经有了100条消息。


    ActiveMQ后台

    再启动消费者。
    此时看到后台中已经显示有一名消费者在线,但并没有消息被消费。


    ActiveMQ后台
    同时在消费者的控制台中也没有任何信息打印出来,说明确实没有获取到消息。
    消费者后台
  • 问题原因
    该现象产生的原因就是主题模式下,消费者无法接收到在它订阅该主题时刻之前的主题中的消息,只能接收到订阅时刻后主题中的消息。

  • 再次测试
    先启动消费者订阅topic-test主题,再让生产者提供新的消息。发现成功接收所有消息。


    成功接收消息
  • 启动多个消费者
    这次我们依然启动三个消费者。发现三个消费者都接收到了同样的100条消息。

消费者1


消费者1

消费者2


消费者2
消费者3
消费者3
  • 主题模式示意


    主题模型

SpringBoot整合ActiveMQ

新建项目

勾选JMS(ActiveMQ)


新建项目

配置ActiveMQ连接

spring:
  activemq:
    broker-url: tcp://192.168.58.3:61616
    close-timeout: 5000
    in-memory: false
    pool:
      max-connections: 100
#      enabled: true
    send-timeout: 3000

broker-url即访问远程消息服务器的地址,同时需要关闭内存消息服务器(in-memory=false)
注释掉的配置(spring.activemq.pool.enabled)如果配置为true,则需要额外引入如下依赖:

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.3</version>
        </dependency>

启动类开启JMS支持

在启动类上添加@EnableJMS注解

@EnableJMS

队列模式

  • 配置队列名称


    队列名称
  • 创建生产者

这里引入了Spring提供的JmsMessagingTemplate和刚才创建的执行队列消息的对象。并使用jmsMessagingTemplate.convertAndSend()发送消息到消息服务器。

package dev.wj.springbootjmsdemo.queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

import javax.jms.Queue;

/**
 * @Author: WJ
 * @Description:
 * @Date: Created in 下午2:44 2018/2/6
 */
@Component
public class QueueProducer {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue queue;

    public void send(String message) {
        System.out.println("发送消息:" + message);
        jmsMessagingTemplate.convertAndSend(this.queue, message);
    }
}
  • 创建消费者

此处使用@JmsListener(destination = "")注解监听我们的队列。destination属性即要监听的队列名。

package dev.wj.springbootjmsdemo.queue;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * @Author: WJ
 * @Description:
 * @Date: Created in 下午2:47 2018/2/6
 */
@Component
public class QueueConsumer {

    @JmsListener(destination = "spring-queue")
    public void receive(String text) {
        System.out.println("接收到消息:" + text);
    }
}

  • 测试
package dev.wj.springbootjmsdemo.queue;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @Author: WJ
 * @Description:
 * @Date: Created in 下午2:49 2018/2/6
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class QueueTest {

    @Autowired
    private QueueProducer queueProducer;

    @Test
    public void testQueue() {
        queueProducer.send("This is SpringBoot JMS Queue");
    }
}

控制台结果:


控制台结果

ActiveMQ后台结果


ActiveMQ后台结果
可以看到刚才创建的队列消息已经出现了。

订阅模式

  • 配置主题


    配置主题
  • 创建发布者

package dev.wj.springbootjmsdemo.topic;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

import javax.jms.Topic;

/**
 * @Author: WJ
 * @Description:
 * @Date: Created in 下午2:52 2018/2/6
 */
@Component
public class TopicProducer {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Topic topic;

    public void send(String text) {
        System.out.println("topic发送消息:" + text);
        jmsMessagingTemplate.convertAndSend(this.topic, text);
    }
}
  • 创建订阅者

这里需要注意的就是需要为订阅者的监听指定containerFactory才能正确地接收主题中的消息。

package dev.wj.springbootjmsdemo.topic;

import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.stereotype.Component;

import javax.jms.ConnectionFactory;

/**
 * @Author: WJ
 * @Description:
 * @Date: Created in 下午2:53 2018/2/6
 */
@Component
public class TopicConsumer {

    /**
     * 为主题订阅者Listener指定containerFactory
     * @param connectionFactory
     * @return
     */
    @Bean
    public JmsListenerContainerFactory jmsListenerContainerTopic(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

    @JmsListener(destination = "spring-topic", containerFactory = "jmsListenerContainerTopic")
    public void receive(String text) {
        System.out.println("topic接收到消息:" + text);
    }
}

  • 测试
package dev.wj.springbootjmsdemo.topic;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @Author: WJ
 * @Description:
 * @Date: Created in 下午2:55 2018/2/6
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class TopicTest {

    @Autowired
    private TopicProducer topicProducer;

    @Test
    public void testTopic() {
        topicProducer.send("This is SpringBoot JMS Topic");
    }
}

  • 执行结果
    控制台:


    控制台结果

    ActiveMQ后台结果:


    ActiveMQ后台结果

相关文章

  • JMS-ActiveMQ-Demo

    慕课网Java消息中间件笔记 笔记代码https://gitee.com/oooh2016/JMS-DEMO 安装...

网友评论

      本文标题:JMS-ActiveMQ-Demo

      本文链接:https://www.haomeiwen.com/subject/axzwzxtx.html