美文网首页
3.入门案例,MQ标准API详解

3.入门案例,MQ标准API详解

作者: 21号新秀_邓肯 | 来源:发表于2021-05-02 16:08 被阅读0次

    1. pom.xml导入依赖

    <dependencies>
      <!--  activemq  所需要的jar 包-->
      <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.15.9</version>
      </dependency>
      <!--  activemq 和 spring 整合的基础包 -->
      <dependency>
        <groupId>org.apache.xbean</groupId>
        <artifactId>xbean-spring</artifactId>
        <version>3.16</version>
      </dependency>
    </dependencies>
    

    2. JMS编码总体规范

    image.png

    3.Destination 简介

    Destination是目的地。下面拿jvm和mq,做个对比。目的地,我们可以理解为是数据存储的地方。

    image.png

    Destination分为两种:队列和主题

    image.png

    4. 队列消息生产者的入门案例

    package com.at.activemq.queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    
    public class JmsProduce {
        //  linux 上部署的activemq 的 IP 地址 + activemq 的端口号
        public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
        // 目的地的名称
        public static final String QUEUE_NAME = "jdbc01";
    
    
        public static void main(String[] args) throws  Exception{
            // 1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码。
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            // 2 通过连接工厂,获得连接 connection 并启动访问。
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            // 3 创建会话session 。第一参数是是否开启事务, 第二参数是消息签收的方式
            Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            // 4 创建目的地(两种 :队列/主题)。Destination是Queue和Topic的父类
            Queue queue = session.createQueue(QUEUE_NAME);
            // 5 创建消息的生产者
            MessageProducer messageProducer = session.createProducer(queue);
            // 6 通过messageProducer 生产 3 条 消息发送到消息队列中
            for (int i = 1; i < 4 ; i++) {
                // 7  创建消息
                TextMessage textMessage = session.createTextMessage("msg--" + i);
                // 8  通过messageProducer发送给mq
                messageProducer.send(textMessage);
            }
            // 9 关闭资源
            messageProducer.close();
            session.close();
            connection.close();
            System.out.println("  **** 消息发送到MQ完成 ****");
        }
    }
    

    5. ActiveMQ控制台之队列

    image.png

    Number Of Pending Messages:

    等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。

    Number Of Consumers:

    消费者数量,消费者端的消费者数量。

    Messages Enqueued:

    进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。

    Messages Dequeued:

    出队消息数,可以理解为是消费者消费掉的数量。

    总结:

    当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。

    当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。

    当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。

    6.队列消息消费者的入门案例

    package  com.at.activemq.queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    
    // 消息的消费者
    public class JmsConsumer {
    
        public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
        public static final String QUEUE_NAME = "jdbc01";
    
        public static void main(String[] args) throws Exception{
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(QUEUE_NAME);
            // 5 创建消息的消费者
            MessageConsumer messageConsumer = session.createConsumer(queue);
            while(true){
                // reveive() 一直等待接收消息,在能够接收到消息之前将一直阻塞。 是同步阻塞方式 。和socket的accept方法类似的。
    // reveive(Long time) : 等待n毫秒之后还没有收到消息,就是结束阻塞。
                // 因为消息发送者是 TextMessage,所以消息接受者也要是TextMessage
                TextMessage message = (TextMessage)messageConsumer.receive(); 
                if (null != message){
                    System.out.println("****消费者的消息:"+message.getText());
                }else {
                    break;
                }
            }
            messageConsumer.close();
            session.close();
            connection.close();
        }
    }
    

    控制台展示:

    image.png

    7. 异步监听式消费者

    package  com.at.activemq.queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    
    // 消息的消费者  也就是回答消息的系统
    public class JmsConsumer {
    
        public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    
        public static final String QUEUE_NAME = "jdbc01";
    
        public static void main(String[] args) throws Exception{
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(QUEUE_NAME);
            MessageConsumer messageConsumer = session.createConsumer(queue);
    
            /* 通过监听的方式来消费消息,是异步非阻塞的方式消费消息。
               通过messageConsumer 的setMessageListener 注册一个监听器,当有消息发送来时,系统自动调用MessageListener 的 onMessage 方法处理消息
             */
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message)  {
    //  instanceof 判断是否A对象是否是B类的子类
                        if (null != message  && message instanceof TextMessage){
                            TextMessage textMessage = (TextMessage)message;
                            try {
                                System.out.println("****消费者的消息:"+textMessage.getText());
                            }catch (JMSException e) {
                                e.printStackTrace();
                            }
                    }
                }
            });
            // 让主线程不要结束。因为一旦主线程结束了,其他的线程(如此处的监听消息的线程)也都会被迫结束。
            // 实际开发中,我们的程序会一直运行,这句代码都会省略。
            System.in.read();
            messageConsumer.close();
            session.close();
            connection.close();
        }
    }
    

    8. 队列消息(Queue) 总结

    8.1 两种消费方式

    同步阻塞方式(receive)

    订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。

    异步非阻塞方式(监听器onMessage())

    订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。

    8.2 队列的特点

    image.png

    8.3 消息消费情况

    image.png

    情况1:只启动消费者1。

    结果:消费者1会消费所有的数据。

    情况2:先启动消费者1,再启动消费者2。

    结果:消费者1消费所有的数据。消费者2不会消费到消息。

    情况3:生产者发布6条消息,在此之前已经启动了消费者1和消费者2。

    结果:消费者1和消费者2平摊了消息。各自消费3条消息。

    疑问:怎么去将消费者1和消费者2不平均分摊呢?而是按照各自的消费能力去消费。我觉得,现在activemq就是这样的机制。

    9. Topic 介绍, 入门案例, 控制台

    9.1 topic 介绍

    在发布订阅消息传递域中,目的地被称为主题(topic)

    发布/订阅消息传递域的特点如下:

    (1)生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;

    (2)生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。

    (3)生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。

    默认情况下如上所述,但是JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅

    image.png

    9.2 生产者案例

    package  com.at.activemq.topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    
    public class JmsProduce_topic {
    
        public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
        public static final String TOPIC_NAME = "topic01";
    
        public static void main(String[] args) throws  Exception{
                 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //-------   关键字   ---------------
            Topic topic = session.createTopic(TOPIC_NAME);
    
            MessageProducer messageProducer = session.createProducer(topic);
            for (int i = 1; i < 4 ; i++) {
                TextMessage textMessage = session.createTextMessage("topic_name--" + i);
                messageProducer.send(textMessage);
                MapMessage mapMessage = session.createMapMessage();
            }
            messageProducer.close();
            session.close();
            connection.close();
            System.out.println("  **** TOPIC_NAME消息发送到MQ完成 ****");
        }
    }
    

    9.3 消费者入门案例

    package  com.at.activemq.topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    
    public class JmsConsummer_topic {
        public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
        public static final String TOPIC_NAME = "topic01";
    
        public static void main(String[] args) throws Exception{
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            // ------------------------ 关键 ---------------------
            // 4 创建目的地 (两种 : 队列/主题   这里用主题)
            Topic topic = session.createTopic(TOPIC_NAME);
    
            MessageConsumer messageConsumer = session.createConsumer(topic);
    // MessageListener接口只有一个方法,可以使用lambda表达式
            messageConsumer.setMessageListener( (message) -> {
                if (null != message  && message instanceof TextMessage){
                         TextMessage textMessage = (TextMessage)message;
                        try {
                          System.out.println("****消费者text的消息:"+textMessage.getText());
                        }catch (JMSException e) {
                        }
                    }
            });
    
            System.in.read();
            messageConsumer.close();
            session.close();
            connection.close();
        }
    }
    

    存在多个消费者,每个消费者都能收到,自从自己启动后所有生产的消息。

    9.4 ActiveMQ 控制台

    topic有多个消费者时,消费消息的数量 ≈ 在线消费者数量*生产消息的数量。

    下图展示了:我们先启动了3个消费者,再启动一个生产者,并生产了3条消息。

    image.png

    10 topic 和 queue 对比

    比较项目 Topic模式队列 Queue模式队列
    工作模式 "订阅-发布"模式, 如果当前没有订阅者,消息将会被丢弃,如果有多个订阅者,那么这些订阅者都会收到消息 "负载均衡"模式,如果当前没有消费者, 消息也不会丢弃;如果有多个消费者, 那么一条消息也只会发送给一个消费者, 并且要求消费者ack信息
    有无状态 无状态 Queue数据默认会在mq服务器上以文件形式保存, 比如Active MQ一般保存在AMQ_MOME\datakr-store\data下面.也可以
    传递完整性 如果没有订阅者,消息会被丢弃 消息不会丢弃
    处理效率 由于消息按照订阅者的数量进行复制, 所以处理性能会随着订阅者的增加而明显降低, 并且还要结合不同信息协议自身的性能差异 由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显降低,当然不同消息协议的具体性能也是有差异的

    相关文章

      网友评论

          本文标题:3.入门案例,MQ标准API详解

          本文链接:https://www.haomeiwen.com/subject/upitdltx.html