美文网首页
ActiveMQ系列(一)——ActiveMQ的基础和快速入门

ActiveMQ系列(一)——ActiveMQ的基础和快速入门

作者: moutory | 来源:发表于2021-11-26 15:02 被阅读0次

前言

我们在项目开发中,难免需要和外部系统进行通信,甚至本身项目拆分成细颗粒度的子模块后,每个子模块之间可能也会涉及到数据的交互,Java EE基于这种场景提出了JMS(Java Message Service)规范,让我们可以利用异步、解耦的方式来处理这个场景的问题。ActiveMQ是基于JMS规范实现的产品,也是中小型公司应用较多的MQ中间件,本篇文章将对ActiveMQ的推送机制、签收模式、持久化等基础知识进行讲解,让各位读者可以实现快速入门。

一、知识储备

在学习ActiveMQ之前,我们不妨先来了解什么是MQ和JMS。

**MQ: **MQ的全称是Message Queue,即消息队列。本质上消息队列就是一个简单的数据结构——队列,该队列中存放着各种各样的信息,信息出入遵循着先进先出的FIFO原则。

MQ的结构

JMS:JMS即Java消息服务(Java Message Service的简称),是Java EE 的标准/规范之一。这种规范(标准)指出:消息的发送应该是异步的、非阻塞的。也就是说消息的发送者发送完消息后就直接返回了,不需要等待接收者返回后才能返回,发送者和接收者可以说是互不影响。所以这种规范(标准)能够减轻或消除系统瓶颈,实现系统之间去除耦合,提高系统的整体可伸缩性和灵活性。JMS只是Java EE中定义的一组标准API,它自身并不是一个消息服务系统,它是消息传送服务的一个抽象,也就是说它定义了消息传送的接口而并没有具体实现。

ActiveMQ:正如上面对JMS的介绍一样,JMS虽然定义了了如何处理信息的接口但是并不能直接使用,而ActiveMQ就是遵循JMS规范下的一款产品,它是Apache下的一个项目,采用Java语言开发;是一款非常流行的开源消息服务器。(当然了,现在的MQ产品比较丰富,有RocketMQ、RabbitMQ、Kafka等多种选择)

二、下载和安装ActiveMQ

(一)下载ActiveMQ

我们通过访问ActiveMQ官方网站:http://activemq.apache.org/ ,就可以很便捷地下载最新版本的ActiveMQ,如果说想要下载指定版本的ActiveMQ,则可以通过下面这个链接来进行访问:https://activemq.apache.org/download-archives。需要注意的是,不同版本的ActiveMQ对应的特性可能有所区别,本篇文章将采用5.14.0这个版本来进行演示。

image.png
下载对应的压缩包
(二)安装ActiveMQ

安装ActiveMQ这个过程十分简单,我们只需要在自定义的目录下解压zip压缩包即可。需要注意的是,ActiveMQ依赖了JDK环境,所以如果本地没有JDK环境(JDK1.8以上)的话,建议先自己准备一下环境。
压缩完成后,我们运行/bin/win64/activemq.bat文件,就可以启动我们的项目了

image.png
启动完成后,我们可以访问localhost:8161(8161为ActiveMQ的默认控制台端口)来进行访问,默认的账号和密码均为admin
访问可视化控制台
输入用户名密码
成功输入后,即可查看activemq各个实例的情况
三、ActiveMQ的入门案例

我们知道,ActiveMQ本质上就是一个存储队列,其中信息的提供者和消费者都是我们外部来定义的,一般我们将信息的提供者称为生产者,将信息的接收者称为消费者。基于这个模型,我们来简单创建一个ActiveMQ的入门案例吧

步骤一:引入坐标依赖
  <properties>
        <activemq.version>5.16.1</activemq.version>
    </properties>
    <dependency>
       <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-client</artifactId>
       <version>${activemq.version}</version>
   </dependency>
步骤二:创建生产者代码

创建生产者的代码其实挺简单的,基本上就是通过connection创建各类资源来发送信息的过程。

public class Producer {

    public static final String URL = "failover:(tcp://localhost:61616)";

    public static final String QUEUE_NAME = "TEST_CUSTOMER_QUEUE";

    public static void main(String[] args) throws JMSException {
        // 创建connection工厂对象,需要传入用户名,密码以及BrokerURL(也就是ActiveMQ实例对应协议的访问路径)
        ConnectionFactory cf = new ActiveMQConnectionFactory("admin","admin",URL);
        Connection conn = null;
        Session session = null;
        MessageProducer producer = null;
        try{
            conn = cf.createConnection();
           // 创建session,第一个参数表示是否开启事务,第二个参数表示信息的接收模式
            session = conn.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
          // 创建队列,传入队列名称
            Queue queue = session.createQueue(QUEUE_NAME);
            producer = session.createProducer(queue);
            for (int i = 1; i <= 10 ; i++){
                TextMessage msg = session.createTextMessage("this is no " + i + " message");
                producer.send(msg);
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
           发送完信息后,要记得关闭掉对应的资源
            if(null != conn){
                conn.close();
            }
            if(null != session){
                session.close();
            }
            if(null != producer){
                session.close();
            }
        }

    }
}
步骤三:定义消费者端代码

消费者端的代码其实和生产端类似,主要的区别在于消费端一般可以有两种方式来消费信息,一种是同步消费,一种是异步消费。一般我们会采用传入自定义的监听器让消费者对信息进行异步消费。监听器会监听每一条消费端接收到的信息,从而进行信息解析以及其他业务操作。

PS:需要注意的是,在编写消费端的代码时,不需要手动关闭session等资源,不然的话消费端就直接停掉了。

消费端代码
public class Consumer {
    public static final String URL = "failover:(tcp://localhost:61616)";

    public static final String QUEUE_NAME = "TEST_CUSTOMER_QUEUE";

    public static void main(String[] args)  {
        ConnectionFactory cf = new ActiveMQConnectionFactory("admin","admin",URL);
        Connection conn = null;
        Session session = null;
        MessageConsumer consumer = null;
        try {
            conn = cf.createConnection();
            conn.start();
            session = conn.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
            Queue queue = session.createQueue(QUEUE_NAME);
            consumer = session.createConsumer(queue);
            consumer.setMessageListener(new PointMessageListener());
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
监听器代码
public class PointMessageListener implements MessageListener {
    public void onMessage(Message message) {
        if(message instanceof TextMessage){
            TextMessage msg = (TextMessage) message;
            try {
                System.out.println("receive message : "+msg.getText());
                // 表明进行手动的信息签收
                message.acknowledge();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}
(三)验证结果

我们分别启动消费端,生产端的代码,然后查看控制台结果:
我们可以看到,控制台顺利地输出了生产端发送的信息,同时我们还可以在activemq的控制台上面看到具体的消费情况。

image.png

下图的信息表示,当前队列有1个消费者,队列累积入队10条信息,出队10条信息。


在activemq控制台上查看结果

四、ActiveMQ的消息推送模式

ActiveMQ的信息推送模式一共有两种,分别是点对点推送(Point To Point)以及发布订阅推送(Pub Sub)
这两种消息推送方式的区别在于消费者对于消息是否具有独占性,我们可以用一个小例子来进行说明:现在有生产者A往队列中发送了1条消息,而此时正有2个队列(B和C)监听着MQ的信息。如果MQ的推送模式为点对点推送,那么这条信息一旦被消费者B/C任意一方消费,那么另外一方就无法再进行消费了。而发布订阅模式则是:MQ会将该条信息推送给所有订阅了某个Topic的消费者,这其实就比较类似于B站上面的UP主发布视频了,关注他的粉丝就能够收到推送。
我们在入门案例中就是使用点对点推送来接收信息,所有下面我们只对发布订阅模式来进行演示

生产端代码
我们可以发现,其实和前面入门案例中的点对点方式相比较,发布订阅这种方式只是改了一个api而已,我们原先是创建queue资源,现在是创建topic主题资源。

...
try {
    conn = cf.createConnection();
    session = conn.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
    Topic topic = session.createTopic("TEST_TOPIC_QUEUE");
    MessageProducer producer = session.createProducer(topic);
    for(int i=0;i<10;i++){
        Message msg = session.createTextMessage("this is the"+i+"message");
        producer.send(msg);
    }
消费端代码

消费端代码也是基本和之前一样,只需要将原先的consumer改为topic就行,需要注意的是定义topic时传入的topic name需要和生产端定义的一致。

public class Client {
    ...
    public static void main(String[] args) throws JMSException {
        ConnectionFactory cf = new ActiveMQConnectionFactory("admin","admin",URL);
        Connection conn = null;
        Session session = null;

        try {
            conn = cf.createConnection();
            session = conn.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("TEST_TOPIC_QUEUE");
            MessageConsumer consumer = session.createConsumer(topic);
            consumer.setMessageListener(new MyMessageListener());
            conn.start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
监听器代码
public class MyMessageListener implements MessageListener {
    public void onMessage(Message message) {
        if(message instanceof TextMessage){
            TextMessage tm = (TextMessage) message;
            try {
                System.out.println("接收到的信息是:"+tm.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

分别启动2个消费者和1个生产者,然后我们分别看2个消费者是不是都有收到对应的信息:
我们从结果中可以验证,两个消费者都接收到了所有的信息。需要注意的是,对于发布订阅模式,我们应用启动顺序是有要求的,要先启动消费端,在启动生产端。因为当MQ接收到信息后,他只会将信息推送给当前正在监听的消费者,如果没有则丢弃该条信息。所以要想让消费者可以收到消息,就需要先启动开始监听(订阅)队列才行。

验证topic模式的结果

五、ActiveMQ的消息签收机制

消息只有在被确认之后,才认为已经被成功消费,然后消息才会从队列或主题中删除。消息的成功消费通常包含三个阶段:

  1. 生产者推送消息给MQ
  2. MQ推送消息给消费者
  3. 生产者消费成功后,发送消费成功的信息给MQ
消息的处理阶段

ActiveMQ提供了4种消息签收机制给我们使用

签收机制 含义
Session.AUTO_ACKNOWLEDGE 客户(消费者)成功从receive方法返回时,或者从MessageListener.onMessage方法成功返回时,会话自动确认消息,然后自动删除消息.
Session.CLIENT_ACKNOWLEDGE 客户通过显式调用消息的acknowledge方法确认消息,。 即在接收端调用message.acknowledge();方法,否则,消息是不会被删除的.
Session.DUPS_OK_ACKNOWLEDGE 不是必须确认,是一种“懒散的”消息确认,消息可能会重复发送,在第二次重新传送消息时,消息头的JMSRedelivered会被置为true标识当前消息已经传送过一次,客户端需要进行消息的重复处理控制
Session.SESSION_TRANSACTED 事务提交并确认

上面这四种签收机制,其实我们常用的就2种,对于不太重要、不需要严格保障消息可靠性的消息我们一般使用自动签收机制,即Session.AUTO_ACKNOWLEDGE即可。选择该模式后,只要成功接收到信息就会发送消费成功的反馈给ActiveMQ服务器,而服务器此时就会将该条信息进行删除。而对于比较重要,对于可靠性有高要求的信息,我们用的比较多的就是Session.CLIENT_ACKNOWLEDGE机制,在这个模式下,只有当消费者手动ACK签收后,才会给MQ服务器发送已消费的信号,这样就可以避免出现消费失败,但是MQ服务器已经将消息删除的问题出现。
我们可以简单来做一个小测试:

生产端代码不变,我们将消费端的代码改一下签收模式(不开启事务):
public class Consumer {
    public static final String URL = "failover:(tcp://localhost:61616)";

    public static final String QUEUE_NAME = "TEST_ACK_QUEUE";

    public static void main(String[] args)  {
        ConnectionFactory cf = new ActiveMQConnectionFactory("admin","admin",URL);
        Connection conn = null;
        Session session = null;
        MessageConsumer consumer = null;
        try {
            conn = cf.createConnection();
            conn.start();
            session = conn.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
            Queue queue = session.createQueue(QUEUE_NAME);
            consumer = session.createConsumer(queue);
            consumer.setMessageListener(new PointMessageListener());
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
在监听器中,我们只对接收到的信息进行打印处理
public class PointMessageListener implements MessageListener {
    public void onMessage(Message message) {
        if(message instanceof TextMessage){
            TextMessage msg = (TextMessage) message;
            try {
                System.out.println("receive message : "+msg.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

代码编写完成后,我们分别启动消费者和生产者的代码,可以看到消费者已经成功接收并消费生产者的代码了

image.png
但是我们去ActiveMQ的控制台进行查看后会发现,实际上这些信息并没有出队,还是在MQ服务器上。这就验证了我们之前的理论:只要消费者没有手动进行ACK确认,消息就不会被MQ服务器删除。这样一旦消费端进行消息消费时出现发生异常时,我们就可以保证信息不丢失。需要注意的是,此时虽然Broker不会删除没被确认删除的消息,但是也不会重新发送!!!要想触发重发机制,就还是要配合session.recover()方法来使用。重发机制我们会在下一章讲到。
activemq控制台截图

六、ActiveMQ的事务和非事务信息

其实事务和非事务信息是和签收模式一样,都是用于解决消息的可靠性问题的,但官方更加推荐我们使用消息的事务性特性来应对这种场景的问题。事务机制和签收有点类似,后者需要我们手动调用message.acknowledge()表示消息已经成功消费,事务机制也是需要我们显式地调用session.commit()来表示该条信息被成功消费了。
我们下面用一个小例子来进行演示:
我们同样是使用入门案例中生产者的代码,只需要在消费者端开启事务

public class Consumer {
    public static final String URL = "failover:(tcp://localhost:61616)";

    public static final String QUEUE_NAME = "TEST_TRANSACTION_QUEUE";

    public static void main(String[] args)  {
        ConnectionFactory cf = new ActiveMQConnectionFactory("admin","admin",URL);
        Connection conn = null;
        Session session = null;
        MessageConsumer consumer = null;
        try {
            conn = cf.createConnection();
            conn.start();
            session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(QUEUE_NAME);
            consumer = session.createConsumer(queue);
            consumer.setMessageListener(new TransactionMessageListener(session));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
监听器代码:
public class TransactionMessageListener implements MessageListener {

    private Session session;

    public TransactionMessageListener(Session session){
        this.session = session;
    }

    public void onMessage(Message message) {
        if(message instanceof TextMessage){
            TextMessage msg = (TextMessage) message;
            try {
                System.out.println("receive message : "+msg.getText());
                session.commit();
            } catch (Exception e) {
                try {
                    session.rollback();
                } catch (JMSException ex) {
                    ex.printStackTrace();
                }
                e.printStackTrace();
            }
        }
    }
}

分别运行消费者和生产者的代码后,我们可以发现消息都被正常的消费了(不要忘记调用session.commit()方法

image.png

同时,如果在监听器方法中手动抛出异常的话,那么就会执行session.rollback()方法,触发消息的重发,我们可以在控制台上面看到,消息会被退回到broker后重新发送给消费者进行消费。

1637907839(1).png

七、ActiveMQ的持久化

MQ服务器上的信息是存放在内存中的,假如服务器出现宕机的情况,那么未被消费的信息可能就会面临丢失的风险。为了避免宕机后出现信息丢失的情况出现,ActiveMQ提供了AMQ、KahaDB、JDBC等若干持久化的方案供我们选择,且在ActiveMQ5.4之前,ActiveMQ服务器默认使用的是AMQ持久化机制是AMQ,而5.4版本之后则是使用KahaDB作为默认的持久化机制,原因是后者提供了更为优秀的性能。下面我们介绍也主要针对KahaDB进行详细的介绍,其余的持久化机制我们不会做更加深入的探讨。

下面我们来对这几种机制来做一个简单的介绍:

AMQ

AMQ是早期版本的默认持久化存储方式,基于文件的事务存储,对于消息的存储进行了调优,速度还是非常快的。默认大小32M。当消息被成功使用时,就会被标记为清理或者存档,这个操作将在下个清理时发送。基本配置如下(其他参数详见官网):

<broker persistent="true" xmlns="http://activemq.apache.org/schema/core">
...
<persistenceAdapter>
<amqPersistenceAdapter/>
</persistenceAdapter>
...
</broker>

具体的官网介绍:http://activemq.apache.org/amq-message-store.html

KahaDB

5.4以后默认的持久化存储方式,也是基于文件的,与AMQ不同的是,KahaDB采用了B-Tree存储的布局。拥有高性能和可扩展性等特点。基本配置如下:

<broker brokerName="broker" persistent="true" useShutdownHook="false">
...
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="32mb"/>
</persistenceAdapter>
...
</broker>

上面的配置表明,对应的持久化日志文件将存放在activemq的\data\kahadb目录下,且每个日志文件最大的存储空间为32mb。我们可以打开这个目录来观察一下里面的文件结构:

image.png

想要了解更多可配置项,可以参考官方文档:http://activemq.apache.org/kahadb

JDBC

jdbc模式其实就是借助第三方的数据库来做持久化,我们需要手动在activemq目录中引入数据库驱动包,同时对一些必要的数据库连接参数进行配置。

具体步骤可以看这篇推文:https://blog.csdn.net/fyj13925475957/article/details/105708505

LevelDB (了解)
  1. 这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性。

  2. 但它不使用自定义B-Tree实现来索引独写日志,而是使用基于LevelDB的索引

基本配置如下:

  <broker brokerName="broker" ... >
    ...
    <persistenceAdapter>
      <levelDB directory="activemq-data"/>
    </persistenceAdapter>
    ...
  </broker>
不使用持久化

对于项目有特别的场景,不需要对message进行持久化操作,我们可以通过activemq.xml设置broker实例不启用持久化,给broker配置persistent属性即可。

<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker brokerName="test-broker" persistent="false"
    xmlns="http://activemq.apache.org/schema/core">
        <transportConnectors>
            ...
        </transportConnectors>
    </broker>
</beans>

个人感觉,activemq本身提供的默认持久化机制已经可以满足我们项目的大部分需要,所以对于不同持久化机制之前的具体区别和性能上的差异,我们只要做到了解即可,有兴趣或者项目有实际需要的话再去进一步深究。同时,个人觉得ActiveMQ虽然开源免费,但是它的文档做的一般般...而且部分功能感觉不如RabbitMQ,所以如果是大型项目的话,建议可以使用其他MQ产品。

参考文章:
ActiveMQ---知识点整理: http://www.uml.org.cn/zjjs/201802111.asp
ActiveMQ官方文档介绍:https://activemq.apache.org/features

相关文章

网友评论

      本文标题:ActiveMQ系列(一)——ActiveMQ的基础和快速入门

      本文链接:https://www.haomeiwen.com/subject/cqqztrtx.html