美文网首页开源项目
ActiveMQ——基础知识与模拟体验

ActiveMQ——基础知识与模拟体验

作者: 莫问以 | 来源:发表于2018-09-04 15:09 被阅读35次

    最近在学习一个Java的开源项目,git地址:https://gitee.com/shuzheng/zheng
    该开源项目开发环境需具备ActiveMQ,所以特地学习一下ActiveMQ。

    一、ActiveMQ是什么?

    1)JMS概述
    JMS即Java消息服务(Java Message Service的简称),是Java EE 的标准/规范之一。这种规范(标准)指出:消息的发送应该是异步的、非阻塞的。也就是说消息的发送者发送完消息后就直接返回了,不需要等待接收者返回后才能返回,发送者和接收者可以说是互不影响。所以这种规范(标准)能够减轻或消除系统瓶颈,实现系统之间去除耦合,提高系统的整体可伸缩性和灵活性。JMS只是Java EE中定义的一组标准API,它自身并不是一个消息服务系统,它是消息传送服务的一个抽象,也就是说它定义了消息传送的接口而并没有具体实现。

    2)ActiveMQ概述
    ActiveMQ就是JMS规范的具体实现;它是Apache下的一个项目,采用Java语言开发,是一款非常流行的开源消息服务器(消息队列中间件)

    3)ActiveMQ与JMS关系


    二者关系.png

    JMS只是定义了一组有关消息传送的规范和标准,并没有真正实现,也就说JMS只是定义了一组接口;其具体实现由不同的消息中间件厂商提供,比如Apache ActiveMQ就是JMS规范的具体实现,Apache ActiveMQ才是一个消息服务系统,而JMS不是。

    消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。

    二、应用场景——异步处理,应用解耦,流量削锋和消息通讯

    2.1异步处理
    场景说明:用户注册后,需要发注册邮件和注册短信。
    传统的做法:1.串行的方式; 2.并行方式。
    1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。

    2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。
    与串行的差别是,并行的方式可以提高处理的时间。引入消息队列,就可以将不是必须的业务逻辑改为异步处理。

    2.2应用解耦
    场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。这种模式存在如下缺点:
    1) 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败;

    2) 订单系统与库存系统耦合;

    引入应用消息队列后的方案:
    1)订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

    2)库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。

    假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

    2.3流量削锋
    流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
    应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
    可以控制活动的人数;
    可以缓解短时间内高流量压垮应用;

    用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面;
    秒杀业务根据消息队列中的请求信息,再做后续处理。

    2.4日志处理
    日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。
    日志采集客户端,负责日志数据采集,定时写受写入Kafka队列;
    Kafka消息队列,负责日志数据的接收,存储和转发;
    日志处理应用:订阅并消费kafka队列中的日志数据;

    二.ActiveMQ的使用

    1、JMS两种消息传送模式
    1)点对点( Point-to-Point):专门用于使用队列Queue传送消息;基于队列Queue的点对点消息只能被一个消费者消费,如多个消费者都注册到同一个消息队列上,当生产者发送一条消息后,而只有其中一个消费者会接收到该消息,而不是所有消费者都能接收到该消息。

    2)发布/订阅(Publish/Subscribe):专门用于使用主题Topic传送消息。基于主题的发布与订阅消息能被多个消费者消费,生产者发送的消息,所有订阅了该topic的消费者都能接收到。

    2、 JMS API可以分为3个主要部分:
    1)公共API:可用于向一个队列或主题发送消息或从其中接收消息;
    2)点对点API:专门用于使用队列Queue传送消息;
    3)发布/订阅API:专门用于使用主题Topic传送消息。

    JMS公共API:
    在JMS公共API内部,和发送与接收消息有关的JMS API接口主要是:ConnectionFactory/Connection/Session/Message/Destination/MessageProducer/MessageConsumer 。它们的关系是:一旦有了ConnectionFactory,就可以创建Connection,一旦有了Connection,就可以创建Session,而一旦有了Session,就可以创建Message、MessageProducer和MessageConsumer。

    JMS点对点API:
    点对点(p2p)消息传送模型API是指JMS API之内基于队列(Queue)的接口:QueueConnectionFactory/QueueConnection/QueueSession/Message/Queue/QueueSender/QueueReceiver. 从接口的命名可以看出,大多数接口名称仅仅是在公共API接口名称之前添加Queue一词。一般来说,使用点对点消息传送模型的应用程序将使用基于队列的API,而不使用公共API 。

    JMS发布/订阅API:
    发布/订阅消息传送模型API是指JMS API之内基于主题(Topic)的接口:TopicConnectionFactory/TopicConnection/TopicSession/Message/Topic/TopicPublisher/TopicSubscriber. 由于基于主题(Topic)的JMS API类似于基于队列(Queue)的API,因此在大多数情况下,Queue这个词会由Topic取代。

    ActiveMQ点对点发送与接收消息示例.png

    3、ActiveMQ的下载与安装
    1)直接去官网(http://activemq.apache.org/)下载最新版本即可,由于这是免安装的,只需要解压就行了。安装完之后进入bin目录,双击 activemq.bat文件(linux下在bin目录下执行 activemq start

    2)访问控制台(检验是否成功)
    在浏览器输入:http://ip:8161/admin/ 会弹出登陆框,账号和密码都是默认admin。

    3)修改端口号
    61616为对外服务端口号
    8161为控制器端口号
    当端口号冲突时,可以修改这两个端口号。cd conf ,修改activemq.xml 修改里面的61616端口。修改jetty.xml,修改里面的8161端口。

    4)删除不活动队列
    一般情况下,ActiveMQ的queue或者topic在不使用之后,可以通过web控制台来删除掉。当然,也可以通过配置,使得broker可以自动探测到无用的队列(一定时间内为空的队列)并删除掉,回收响应资源。
    activemq.xml

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true" schedulePeriodForDestinationPurge="10000">
        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="100000" memoryLimit="1mb">
                  <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="100000" memoryLimit="1mb">
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>
    </broker>
    

    schedulePeriodForDestinationPurge:10000 每十秒检查一次,默认为0,此功能关闭
    gcInactiveDestinations: true 删除掉不活动队列,默认为false
    inactiveTimoutBeforeGC:30000 不活动30秒后删除,默认为60秒
    PS:对于topic的不活动队列只是,10秒中之类没有消费者进行注册监听,如果一个用户事先注册了这个监听,但是他一直没有登录,那么这算活动队列。而queue只要有消息没有出队列就表示活动队列。

    附:消息中间件的用途和优点
    1)将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块;
    2)负责建立网络通信的通道,进行数据的可靠传送;
    3)保证数据不重发,不丢失;
    4)能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务

    三、模拟HelloWord体验ActiveMQ

    下载MQ以后,要将---bin.zip解压缩后里面的activemq-all-5.11.1.jar包加入到classpath下面,这个包包含了所有jms接口api的实现,项目结构图:

    MQ
    启动MQ,启动成功,打开该链接:http://localhost:8161/admin/
    账号和密码都是admin,启动成功后,图如下:
    image.png
    生产者和消费者代码如下:
    package com.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    /**
     * 生产者(消息发送者)
     * @author admin
     *
     */
    public class JMSProducer {
    
        // 账号(默认连接用户名)
        private static final String user = ActiveMQConnection.DEFAULT_USER;
        // 密码
        private static final String pwd = ActiveMQConnection.DEFAULT_PASSWORD;
        // 地址
        private static final String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    
        private static final int sendnum = 10;
    
        public static void main(String[] args) {
            // 连接工厂
            ConnectionFactory connectionFactory;
            // 连接
            Connection connection = null;
            // 会话
            Session session;
            // 消息目的
            Destination destination;
            // 消息生产者
            MessageProducer messageProducer;
            // 实例化工厂
            connectionFactory = new ActiveMQConnectionFactory(JMSProducer.user, JMSProducer.pwd, JMSProducer.url);
            try {
                connection = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("HelloWord");
                messageProducer = session.createProducer(destination);
                sendMessage(session, messageProducer);
                session.commit();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
        /**
         * 发送消息
         * @param session
         * @param messageProducer  消息生产者
         * @throws Exception
         */
        public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
            for (int i = 0; i < 10; i++) {
                TextMessage message = session.createTextMessage("ActiveMQ" + i);
                System.out.println("发送消息" + i);
                messageProducer.send(message);
            }
        }
    }
    
    package com.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    /**
     * 消费者(消息接收者)
     * @author admin
     *
     */
    public class JMSConsumer {
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认连接密码
        private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认连接地址
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory;// 连接工厂
            Connection connection = null;// 连接
    
            Session session;// 会话 接受或者发送消息的线程
            Destination destination;// 消息的目的地
    
            MessageConsumer messageConsumer;// 消息的消费者
    
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,
                    JMSConsumer.BROKEURL);
    
            try {
                // 通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                // 启动连接
                connection.start();
                // 创建session
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 创建一个连接HelloWorld的消息队列
                destination = session.createQueue("HelloWord");
                // 创建消息消费者
                messageConsumer = session.createConsumer(destination);
    
                while (true) {
                    TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                    if (textMessage != null) {
                        System.out.println("收到的消息:" + textMessage.getText());
                    } else {
                        break;
                    }
                }
    
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    

    启动生产者,输出如下:


    image.png

    然后看一下ActiveMQ服务器,Queues内容如下:


    MQ服务器说明
    我们可以看到创建了一个名称为HelloWorld的消息队列,队列中有10条消息未被消费,我们也可以通过Browse查看是哪些消息:
    image.png

    这些队列中的消息,被删除,消费者则无法消费,接下来,运行一下消费者:


    消费者输出
    现在我们再看一下ActiveMQ服务器,Queues内容如下:
    1536301523(1).jpg

    我们可以看到HelloWorld的消息队列发生变化,多一个消息者,队列中的10条消息被消费了,点击Browse查看,已经为空了。
    点击Active Consumers,我们可以看到这个消费者的详细信息。

    相关文章

      网友评论

        本文标题:ActiveMQ——基础知识与模拟体验

        本文链接:https://www.haomeiwen.com/subject/gxcgwftx.html