美文网首页
记录一次ActiveMQ的学习记录

记录一次ActiveMQ的学习记录

作者: 曹大大 | 来源:发表于2022-04-22 13:18 被阅读0次

    ActiveMQ入门

    异步处理
    应用解耦
    流量削峰
    

    异步处理

    场景说明:用户注册,需要执行三个业务逻辑,分别为写入用户表,发注册邮件以及注册短信。


    串行方式

    将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。

    串行方式.png

    并行方式

    将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个以上三个任务完成后,返回给客户端,与串行的差别是,并行的方式可以提高处理的效率。

    并行处理.png

    异步处理

    引入消息中间件,将部分的业务逻辑,进行异步处理。改造后的架构如下:

    引入消息中间件.png

    按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高啦,比串行提高了3倍,比并行提高了两倍。

    应用解耦

    场景说明:用户下单后,订单系统需要通知库存系统。

    传统的做法是,订单系统调用库存系统的接口。如下图:

    传统做法.png

    传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合。如何解决以上问题呢?引入应用消息队列后的方案,如下图:

    消息队列.png

    订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

    流量消峰

    流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。通过加入消息队列完成如下功能:
    a、可以控制活动的人数
    b、可以缓解短时间内高流量压垮应用

    秒杀业务.png

    用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理

    常见的消息中间件产品对比

    特性 ActiveMQ RabbitMq RocketMQ Kafka
    开发语言 Java Erlang Java Scala
    单击吞吐量 万级 万级 10万级 10万级
    实效性 毫秒级 微秒级 毫秒级 毫秒级
    可用性 高(支持主从架构) 高(支持主从架构) 非常高(分布式架构) 非常高(分布式架构)
    功能性 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持的较好 基于erlang开发,所以并发能力很强,性能及其好,延时很低,管理界面丰富 MQ功能比较完备,扩展性佳 像一些消息查询,消息回溯等功能没有提供,在大数据领域应用广泛

    什么是ActiveMQ?

    官网: http://activemq.apache.org/

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

    什么是JMS?

    消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)。

    消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

    JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。JMS本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。它类似于JDBC(java DatabaseConnectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持 JMS,包括 IBM 的MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JML 客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。

    JMS消息模式

    消息中间件一般有两种传递模式:点对点模式(P2P)和发布订阅模式(Pub/Sub)。

    (1)P2P(Point to Point)点对点模型(Queue队列模型)

    (2)Publish/Subscribe(PUB/SUB)发布、定于模型(Topic主题模型)

    点对点模型(Pointer-to-Pointer):即生产者和消费者之间的消息往来。每个消息都被发送到特定的消息队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

    点对点模式.png

    点对点模型的特点:

    • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。
    • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有。
    • 正在运行,它不会影响到消息被发送到队列。
    • 接收者在成功接收消息之后需向队列应答成功。

    发布/订阅(Publish-Subscribe)

    包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber),多个发布者将消息发送到topic,系统将这些消息投递到订阅此topic的订阅者。
    发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝。

    发布和订阅.png

    发布/订阅模型的特点:

    • 每个消息可以有多个消费者。
    • 发布者和订阅者之间有时间上的依赖性(先订阅主题,再来发送消息)。
    • 订阅者必须保持运行的状态,才能接受发布者发布的消息。

    原生JMS API操作ActiveMQ

        <dependencies>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.11.2</version>
            </dependency>
        </dependencies>
    

    步骤

    1.创建连接工厂 2.创建连接 3.打开连接 4.创建session 5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息) 6.创建消息生产者 7.创建消息 8.发送消息 9.释放资源
    

    消息的发送者 点对点

    package com.czy.producer;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import javax.management.Query;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy.producer
     * @ClassName: PTPT_Producer
     * @Author: 曹振远
     * @Description: 点对点模式--消息生产者
     * @Date: 2021/6/10 16:50
     * @Version: 1.0
     */
    public class PTPT_Producer {
    
        public static void main(String[] args) throws Exception {
            //1.创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //2.创建连接
            Connection connection = factory.createConnection();
            //3.打开连接
            connection.start();
            //4.创建Session 参数1:是否开启事务  参数二:消息确认机制
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建目标地址(Queue/Topic)
            Queue queue = session.createQueue("queue01");
            //6.创建消息生产者
            MessageProducer producer = session.createProducer(queue);
            //7.创建消息
            TextMessage testMessage = session.createTextMessage("test Message");
            //8.发送消息
            producer.send(testMessage);
            System.out.println("发送消息成功");
            //9.释放资源
            session.close();
            connection.close();
        }
    }
    
    

    消息消费者 点对点

    package com.czy.consumer;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy.consumer
     * @ClassName: PTP_Consumer
     * @Author: 曹振远
     * @Description: 点对点模式--消息消费者
     * @Date: 2021/6/11 10:39
     * @Version: 1.0
     */
    public class PTP_Consumer {
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("queue01");
    
            MessageConsumer consumer = session.createConsumer(queue);
    
            while (true) {
                Message message = consumer.receive();
                if (message == null) {
                    break;
                }else{
                    //如果还有消息,判断是什么类型
                    if(message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("接受到的消息:"+textMessage);
                    }
                }
            }
    
        }
    }
    
    

    消费者2 --监听器

    package com.czy.consumer;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy.consumer
     * @ClassName: PTP_Consumer
     * @Author: 曹振远
     * @Description: 点对点模式--消息消费者监听模式
     * @Date: 2021/6/11 10:39
     * @Version: 1.0
     */
    public class PTP_Consumer2 {
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("queue01");
    
            MessageConsumer consumer = session.createConsumer(queue);
    
            //设置消息监听器
            consumer.setMessageListener(new MessageListener() {
                //处理逻辑
                @Override
                public void onMessage(Message message) {
                    if(message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println(textMessage);
                    }
                }
            });
    
        }
    }
    
    

    发布订阅消息生产者

    package com.czy.producer;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy.producer
     * @ClassName: PTPT_Producer
     * @Author: 曹振远
     * @Description: 发布订阅模式--消息生产者
     * @Date: 2021/6/10 16:50
     * @Version: 1.0
     */
    public class PS_Producer {
    
        public static void main(String[] args) throws Exception {
            //1.创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //2.创建连接
            Connection connection = factory.createConnection();
            //3.打开连接
            connection.start();
            //4.创建Session 参数1:是否开启事务  参数二:消息确认机制
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建目标地址(Queue/Topic)
            Topic topic = session.createTopic("topic01");
            //6.创建消息生产者
            MessageProducer producer = session.createProducer(topic);
            //7.创建消息
            TextMessage testMessage = session.createTextMessage("test Message-topic");
            //8.发送消息
            producer.send(testMessage);
            System.out.println("发送消息成功");
            //9.释放资源
            session.close();
            connection.close();
        }
    }
    
    

    发布订阅消息消费者

    package com.czy.consumer;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy.consumer
     * @ClassName: PTP_Consumer
     * @Author: 曹振远
     * @Description: 发布订阅模式--消息消费者监听模式
     * @Date: 2021/6/11 10:39
     * @Version: 1.0
     */
    public class PS_Consumer {
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("topic01");
    
            MessageConsumer consumer = session.createConsumer(topic);
    
            //设置消息监听器
            consumer.setMessageListener(new MessageListener() {
                //处理逻辑
                @Override
                public void onMessage(Message message) {
                    if(message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println(textMessage);
                    }
                }
            });
    
        }
    }
    

    spring整合ActiveMq

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.itheima</groupId>
        <artifactId>spring_producer</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.11.2</version>
            </dependency>
    
    
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-core</artifactId>
                <version>5.0.2.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-web</artifactId>
                <version>5.0.2.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-oxm</artifactId>
                <version>5.0.2.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-tx</artifactId>
                <version>5.0.2.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jdbc</artifactId>
                <version>5.0.2.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
                <version>5.0.2.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-aop</artifactId>
                <version>5.0.2.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context-support</artifactId>
                <version>5.0.2.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>5.0.2.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version>5.0.2.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>javax.jms</groupId>
                <artifactId>javax.jms-api</artifactId>
                <version>2.0.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.xbean</groupId>
                <artifactId>xbean-spring</artifactId>
                <version>3.7</version>
            </dependency>
    
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
            </dependency>
        </dependencies>
    </project>
    

    applicationContext.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amp="http://activemq.apache.org/schema/core"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    
        <!--1.创建连接工厂对象-->
        <amp:connectionFactory
                id="connetionFactory"
                brokerURL="tcp://127.0.0.1:61616"
                userName="admin"
                password="admin"
        />
    
        <!--2.创建缓存连接工厂-->
        <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <!--注入连接工厂-->
            <property name="targetConnectionFactory" ref="connetionFactory"/>
            <!--缓存消息数据-->
            <property name="sessionCacheSize" value="5"/>
        </bean>
    
        <!--3.创建用于点对点发送的JmsTemplate-->
        <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!--注入缓存连接工厂-->
            <property name="connectionFactory" ref="cachingConnectionFactory"/>
            <!--指定是否为发布订阅模式-->
            <property name="pubSubDomain" value="false"/>
        </bean>
    
        <!--4.创建用于发布订阅发送的JmsTemplate-->
        <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!--注入缓存连接工厂-->
            <property name="connectionFactory" ref="cachingConnectionFactory"/>
            <!--指定是否为发布订阅模式-->
            <property name="pubSubDomain" value="true"/>
        </bean>
    </beans>
    
    package com.czy.producer;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy.producer
     * @ClassName: SpringProducer
     * @Author: 曹振远
     * @Description:
     * @Date: 2021/6/11 14:11
     * @Version: 1.0
     */
    @RunWith(SpringJUnit4ClassRunner.class) // junit与spring整合
    @ContextConfiguration("classpath:applicationContext.xml") // 加载spring配置文件
    public class SpringProducer {
    
        //点对点模式
        @Autowired
        @Qualifier("jmsQueueTemplate")
        private JmsTemplate jmsQueueTemplate;
    
        //发布订阅模式
        @Autowired
        @Qualifier("jmsTopicTemplate")
        private JmsTemplate jmsTopicTemplate;
    
        /**
         * 点对点的发送
         * 第一个参数是:指定队列名称
         * 第二个参数是:MessageCreator接口
         */
        @Test
        public void ptpSends(){
            jmsQueueTemplate.send("spring_Queue", new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage("spring text Message");
                }
            });
            System.out.println("发送消息成功");
        }
    
        @Test
        public void psSends(){
            jmsTopicTemplate.send("topic01", new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage("spring text Message");
                }
            });
        }
    }
    
    

    消费者

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amp="http://activemq.apache.org/schema/core"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:jms="http://www.springframework.org/schema/jms"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">
    
    <!--1.创建连接工厂对象-->
        <amp:connectionFactory
                id="connetionFactory"
                brokerURL="tcp://127.0.0.1:61616"
                userName="admin"
                password="admin"
        />
    
        <!--2.创建缓存连接工厂-->
        <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <!--注入连接工厂-->
            <property name="targetConnectionFactory" ref="connetionFactory"/>
            <!--缓存消息数据-->
            <property name="sessionCacheSize" value="5"/>
        </bean>
    
        <!--3.配置监听扫描-->
        <context:component-scan base-package="com.czy.listener"/>
    
        <!--4.配置监听器(点对点)-->
        <jms:listener-container connection-factory="cachingConnectionFactory"
        destination-type="queue">
            <jms:listener destination="spring_Queue" ref="queueListener"/>
        </jms:listener-container>
    
        <!--5.配置监听器(发布订阅)-->
        <jms:listener-container connection-factory="cachingConnectionFactory"
                                destination-type="topic">
            <jms:listener destination="spring_Topic01" ref="topicListener"/>
        </jms:listener-container>
    </beans>
    
    package com.czy.listener;
    
    import org.springframework.stereotype.Component;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy.listener
     * @ClassName: QueueListener
     * @Author: 曹振远
     * @Description:
     * @Date: 2021/6/11 15:09
     * @Version: 1.0
     */
    @Component
    public class QueueListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("Queue:"+textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    package com.czy.listener;
    
    import org.springframework.stereotype.Component;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy.listener
     * @ClassName: TopicListener
     * @Author: 曹振远
     * @Description:
     * @Date: 2021/6/11 15:14
     * @Version: 1.0
     */
    @Component
    public class TopicListener implements MessageListener {
        @Override
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("topic:"+textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    启动类

    package com.czy.consumer;
    
    import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
    
    import java.io.IOException;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy.consumer
     * @ClassName: SpringConsumer
     * @Author: 曹振远
     * @Description:
     * @Date: 2021/6/11 15:16
     * @Version: 1.0
     */
    public class SpringConsumer {
        public static void main(String[] args) throws IOException {
            ClassPathXmlApplicationContext cxt = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
    
            cxt.start();
    
            System.in.read();
        }
    }
    
    

    springboot整合activeMQ

    生产者

     <!--springboot父工程:锁定springboot的版本及其整合框架的版本-->
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.1.RELEASE</version>
            <relativePath/>
        </parent>
    
        <!--导入所需依赖-->
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <!--springboot与ActiveMQ的整合依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    

    application.yml

    server:
      port: 9001
    
    spring:
      application:
        name: activeMQ-producer
    
      activemq:
        broker-url: tcp://127.0.0.1:61616
        user: admin
        password: admin
    
       #指定发布模式,false是点对点,true是发布订阅
      jms:
        pub-sub-domain: false
    

    test

    package com.czy.producer;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.test.context.junit4.SpringRunner;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy.producer
     * @ClassName: SpringBootProducer
     * @Author: 曹振远
     * @Description:
     * @Date: 2021/6/11 15:45
     * @Version: 1.0
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringBootProducer {
    
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        @Test
        public void ptpSends(){
            jmsMessagingTemplate.convertAndSend("springboot-queue","你好mq");
            System.out.println("发送消息成功");
        }
    }
    

    消费者

    server:
      port: 9002
    
    spring:
      application:
        name: activeMq-Consumer
    
      activemq:
        broker-url: tcp://127.0.0.1:61616
        user: admin
        password: admin
    
      jms:
        pub-sub-domain: false
    
    package com.czy.listener;
    
    import org.apache.activemq.Message;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    
    import javax.jms.JMSException;
    import javax.jms.TextMessage;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy.listener
     * @ClassName: QueueListener
     * @Author: 曹振远
     * @Description:
     * @Date: 2021/6/11 15:56
     * @Version: 1.0
     */
    @Component
    public class QueueListener {
    
        @JmsListener(destination = "springboot-queue")
        public void receiveMessage(Message message) throws JMSException {
            if(message instanceof TextMessage){
                TextMessage textMessage = (TextMessage) message;
                System.out.println(textMessage.getText());
            }
        }
    }
    
    

    ActiveMQ高级

    JMS消息组成

    结构 描述
    JMS Provider 消息中间件、消息服务器
    JMS Producer 消息生产者
    JMS Consumer 消息消费者
    JMS Message 消息(重要)

    JMS Message消息由三部分组成

    • 消息头
    • 消息体
    • 消息属性

    JMS消息头

    JMS消息头预定了若干字段用户用于客户端与JMS提供者之间识别和发送消息,预编译头如下:

    • <span style="color:red">红色</span>为重要的消息头
    名称 描述
    <span style="color:red">JMSDestination</span> 消息发送的Destination,在发送过程中由提供者设置,发送到哪里(队列)
    <span style="color:red">JMSMessageID</span> 唯一标识提供者发送的每一个消息,这个字段实在发送过程中由提供者设置的,客户机只能在消息发送后才能确定消息的MessageID
    <span style="color:red">JMSDeliveryMode</span> 消息持久化。包含DeliveryMode.PERSISTENT或者DeliveryMode.NON_PERSISTENT
    JMSTimestamp 提供者发送消息的时间,由提供者在发送过程中设置
    <span style="color:red">JMSExpiration</span> 消息失效时间,毫秒,值0表明消息不会过期,默认是0
    <span style="color:red">JMSPriority</span> 消息的优先级,由提供者在发送过程中设置。优先级0的优先级最低,优先级9的优先级最高。0-4为普通消息,5-9为加急消息。ActiveMQ不保证优先级高就一定先发送,只保证了加急消息必须先于普通消息发送。默认是4
    <span style="color:red">JMSCorrelationID</span> 通常用来链接响应消息与请求消息,由消息的JJMS程序设置
    JMSSReplyTo 请求程序用它来指出回复消息应发送的地方,由发送消息的JMS程序设置
    JMSType JMS程序用来指出消息的类型
    JMSRedelivered 消息的重发标志,false,代表该消息时第一次发生,true,代表消息

    不过需要注意的是,在传送消息时,消息头的值是由JMS提供者来设置的,因此开发者使用以上setJMSxxx()方法分配的值就被忽略了,只有以下几个值是可以由开发者设置的:

    JMSCorrelationID、JMSRepltTo、JMSType


    JMS消息体

    在消息体中,JMS API定义了五种类型的消息格式

    TextMessage--一个字符串对象
    MapMessage --键值对
    ObjectMessage --一个序列化的Java对象,注意对象必须序列化,5.12后需要添加信任列表
    BytesMessage --一个字节的数据流
    StreamMessage --Java原始值的数据流
    
    spring:
      application:
        name: activeMq-Consumer
    
      activemq:
        broker-url: tcp://127.0.0.1:61616
        user: admin
        password: admin
        packages:
          trust-all: true #让ActiveMQ信任全部的自定义对象,实现对象的序列化
    

    JMS消息属性

    我们可以给消息设置自定义属性,这些属性主要是提供给应用程序的,对于实现消息过滤功能和标记功能,消息属性非常有用,JMS API定义了一些标准属性,JMS服务提供者可以选择的提供部分标准属性。

    message.setStringPropertie("Property",property);//自定义属性
    

    消息持久化

    消息持久化是保证消息不丢失的重要方法!!!

    ActiveMQ提供了以下三种的消息存储模式:

    1. Memory消息存储-基于内存的存储方式。
    2. 基于日志消息存储方式,KahaDB是ActiveMQ的默认日志存储方式,它是提供了容量的提升和恢复能力。
    3. 基于JDBC的消息存储方式--数据存储于数据库(例如MySQL中)。
    ActiveMQ持久化.png ActiveMQ持久化-消费者.png

    为什么要移除呢?

    因为持久化不移除的话,有可能会重复发送消息。

    不持久化

    server:
      port: 9001
    
    spring:
      application:
        name: activeMQ-producer
    
      activemq:
        broker-url: tcp://127.0.0.1:61616
        user: admin
        password: admin
    
       #指定发布模式,false是点对点,true是发布订阅
      jms:
        pub-sub-domain: false
        template:
          delivery-mode: non_persistent #非持久化(把消息存储在内存里面)
    

    kahaDB日志存储持久化

    server:
      port: 9001
    
    spring:
      application:
        name: activeMQ-producer
    
      activemq:
        broker-url: tcp://127.0.0.1:61616
        user: admin
        password: admin
    
       #指定发布模式,false是点对点,true是发布订阅
      jms:
        pub-sub-domain: false
        template:
          delivery-mode: persistent #存储在日志文件里面
    

    JDBC持久化

    server:
      port: 9001
    
    spring:
      application:
        name: activeMQ-producer
    
      activemq:
        broker-url: tcp://127.0.0.1:61616
        user: admin
        password: admin
    
       #指定发布模式,false是点对点,true是发布订阅
      jms:
        pub-sub-domain: false
        template:
          delivery-mode: persistent #存储在日志文件里面
    

    修改activemq.xml

    <!--配置数据库连接池--> 
    <bean name="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver" />
    <property name="url" value="jdbc:mysql://192.168.66.133:3306/db_activemq" /> <property name="username" value="root" />
    <property name="password" value="123456"/>
    </bean>
    <!--JDBC Jdbc用于master/slave模式的数据库分享 -->
    <persistenceAdapter> 
    <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
    </persistenceAdapter>
    
    1. 拷贝mysql及durid数据源的jar包到activemq的lib目录下。

    4)重启activemq。

    消息事务

    消息事务,是保证消息传递原子性的一个重要的特征,和JDBC的事务特征类似

    一个事务性发送,其中一组消息要么能够全部成功保证到达服务器,要么都不到达服务器,

    生产者,消费者与消息服务器直接都支持事务性

    ActiveMQ的事务主要偏向于生产者的应用

    ActiveMQ消息事务流程图:

    ActiveMQ消息事务流程图.png

    实现:生产者

    package com.czy.config;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jms.connection.JmsTransactionManager;
    import org.springframework.transaction.PlatformTransactionManager;
    
    import javax.jms.ConnectionFactory;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy.config
     * @ClassName: ActiveMQConfig
     * @Author: 曹振远
     * @Description:
     * @Date: 2021/6/15 14:45
     * @Version: 1.0
     */
    @Configuration
    public class ActiveMQConfig {
    
        /**
         * @Author: 曹振远
         * @Description: 添加JMS事务管理器
         * @Date: 14:47 2021/6/15
         */
        @Bean
        public PlatformTransactionManager createTransactionManager(ConnectionFactory connectionFactory){
            return new JmsTransactionManager(connectionFactory);
        }
    }
    
    
    @Service
    @Transactional //这个注解不但可以实现消息的事务,也可以解决数据库的事务操作
    public class MessageService {
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        public void sendMessage() {
            for (int i = 0; i <= 10; i++) {
                int a = 10 / 0;
            }
            jmsTemplate.convertAndSend("spring-demo","testMQ");
        }
    }
    

    消费者实现

    package com.czy.listener;
    
    import org.apache.activemq.Message;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    
    import javax.jms.JMSException;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy.listener
     * @ClassName: QueueListener
     * @Author: 曹振远
     * @Description:
     * @Date: 2021/6/11 15:56
     * @Version: 1.0
     */
    @Component
    public class QueueListener {
    
        @JmsListener(destination = "springboot-queue")
        public void receiveMessage(Message message, Session session){
            if(message instanceof TextMessage){
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println(textMessage.getText());
                    session.commit();//提交
                } catch (JMSException e) {
                    e.printStackTrace();
                    try {
                        session.rollback();//一旦事务回滚,MQ会重发消息,一共重发6次
                    } catch (JMSException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }
    }
    
    

    消息确认机制

    JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包括三个阶段:客户接收消息,客户处理消息和消息被确认。在事务性会话中,当一个事务被提交的时候,确认自动发生,在非事务性会话中,消息何时被确认取决于创建会话的应答模式。该参数有以下三个可能:

    描述
    Session.AUTO_ACKNOWLEDGE 当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法返回的时候,会话自动确认客户收到的消息
    Session.CLENT_ACKNOWLEDGE 客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确定是在会话层上进行,确认一个被消费的消息将自动确认所有已被会话消费的消息,例如,如果一个消费者消费了10个消息,然后确认第五个消息,那么所有10个消息都被确认
    Session.DUPS_ACKNOWLEDGE 该选择只是会话迟钝消息的提交,如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS proovider必须把消息头的JMSRedelivered设置为true

    注意:消息确认机制和事务机制是冲突的,只能选其中一个


    ActiveMQ消费方的消费确认机制.png

    消费方配置类

    package com.czy.config;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    
    import javax.jms.ConnectionFactory;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy.config
     * @ClassName: AriveMQConfig
     * @Author: 曹振远
     * @Description:
     * @Date: 2021/6/15 16:53
     * @Version: 1.0
     */
    @Configuration
    public class AriveMQConfig {
    
        @Bean("jmsListenerContainerFactory")
        public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory(ConnectionFactory connectionFactory){
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            //关闭事务(重要)
            factory.setSessionTransacted(false);
            //修盖消息确认机制,springboot整合ActiveMQ后,手动确认是4
            factory.setSessionAcknowledgeMode(4);
            return factory;
        }
    }
    
    
    package com.czy.listener;
    
    import org.apache.activemq.Message;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    
    import javax.jms.JMSException;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy.listener
     * @ClassName: QueueListener
     * @Author: 曹振远
     * @Description:
     * @Date: 2021/6/11 15:56
     * @Version: 1.0
     */
    @Component
    public class QueueListener {
    
        @JmsListener(destination = "springboot-queue",containerFactory = "jmsListenerContainerFactory")
        public void receiveMessage(Message message){
            if(message instanceof TextMessage){
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println(textMessage.getText());
                    message.acknowledge();//手动确认接收消息
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    

    消息的投递方式

    1.异步投递vs同步投递

    同步:消息生产者使用持久传递模式发送消息的时候,Producer.send()方法会被阻塞,直到broker发送一个确认消息给生产者ProducerAck,这个确认消息暗示Broker已经成功接收到消息并把消息保存到二级存储中。

    异步:如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送,异步发送不会在收到broker的确认之前一直阻塞Producer.send方法想要使用异步,在brokerURL中增加jms.alwaysSycSend=false&jms.useAsyncSend=true属性。

    1. 如果设置了alwaysSycSend=true,系统将会忽略useAsyncSend设置的值都采用同步。
    2. 当alwaysSycSend=false时,“non_persistent”(非持久化)、事务中的消息将使用“异步发送”。
    3. 当alwaysSycSend=false时,如果指定了useAsyncSend=true,“persistent”类型的消息使用异步发送,如果useAsyncSend=false,“persistent”类型的消息使用的同步发送。

    总计:默认情况jms.alwaysSycSend=false&jms.useAsyncSend=false,非持久化消息、事务内的消息均采用异步发送:对持久化的消息采用同步发送。

    2.配置异步投递
    //1.在连接上配置
    new ActiveMQConnertionFactory("tcp://localhost:61616?jms.useAsyncSend=true")
        
    //2.通过ConnectionFactory
    (ActiveMQConnertionFactory)connertionFactory.setUserAsyncSend(true);
    
    //3.通过connection
    (ActiveMQConnertionFactory)connection.setUserAsyncSend(true);
    

    注意:Spring和SpringBoot项目,通过修改JmsTemplate的默认参数实现异步或同步投递。

     /**
         *  异步发送非持久化JmsTemplate
         * @param connectionFactory
         * @return
         */
        @Autowired
        @Bean
        public JmsTemplate asynJmsTemplate(ConnectionFactory connectionFactory) {
            JmsTemplate template = new JmsTemplate(connectionFactory);
            template.setExplicitQosEnabled(true);
            template.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            return template;
        }
    
        /**
         * 同步发送非持久化JmsTemplate
         * @param connectionFactory
         * @return
         */
        @Autowired
        @Bean
        public JmsTemplate synJmsTemplate(ConnectionFactory connectionFactory) {
            JmsTemplate template = new JmsTemplate(connectionFactory);
            return template;
        }
    
    3.异步投递如何确认发送成功

    异步投递丢失消息的场景:生产者设置UserAsyncSend=true,使用producer.send(message)持续发送消息,由于消息不阻塞,生产者会认为所有的send消息均被成功发送到MQ。如果MQ突然宕机,此时生产者端内存中尚未发送至MQ的消息都会丢失。

    这时,可以给异步投递方式接收回调,以确认消息是否发送成功!

    producer.send(textMessage,new AsyncCallback() {
            @Override public void onSuccess ()
            { // 使用msgid标识来进行消息发送成功的处理 
                System.out.println(msgid + " 消息发送成功");
            }
            @Override public void onException (JMSException exception)
            { // 使用msgid表示进行消息发送失败的处理
                System.out.println(msgid + " 消息发送失败");
                exception.printStackTrace();
            }
        });
    
    4.延时投递

    生产者提供两个发送消息的方法,一个是即时发送,一个是延时发送。

    1.修改activemq.xml

    <broker xmlns="http://activemq.apache.org/schema/core" ... schedulerSupport="true" > ...... </broker>
    

    重点:schedulerSupport="true"

    2.在代码中设置延时

        /**
         * 延时投递
         */
        @Test
        public void sendMessage() {
            Connection connection = null;
            Session session = null;
            ActiveMQMessageProducer producer = null;
            // 获取连接工厂
            ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory();
            try {
                connection = connectionFactory.createConnection();
                session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                Queue queue = session.createQueue("springboot-queue");
                int count = 10;
                producer = (ActiveMQMessageProducer) session.createProducer(queue);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                //创建需要发送的消息
                TextMessage textMessage = session.createTextMessage("Hello");
                //设置延时时长(延时10秒)
                textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10000);
                producer.send(textMessage);
                session.commit();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    5.定时投递

    1.启动类添加定时注解

    package com.czy;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy
     * @ClassName: AppProducer
     * @Author: 曹振远
     * @Description:
     * @Date: 2021/6/11 15:37
     * @Version: 1.0
     */
    @SpringBootApplication
    @EnableScheduling//开启定时功能
    public class AppProducer {
        public static void main(String[] args) {
            SpringApplication.run(AppProducer.class, args);
        }
    }
    
    
    @Scheduled(fixedDelay = 3000)//定时任务注解
        public void sendQueue() {
            jmsMessagingTemplate.convertAndSend("springboot-queue", "消息ID:" + UUID.randomUUID().toString().substring(0, 6));
            System.out.println("消息发送成功...");
        }
    

    死信队列

    DLQ-Dead Letter Queue,死信队列,用来保存处理失败或者过期的消息。

    以下情况,消息会被重发:

    1. 事务 rollback()。
    2. 事务没有调用 commit()。
    3. 没有开启事务,使用手动确认,session.recover()时。

    当一个消息被重发6次(缺省为6次),会给broker发送一个“Poison ack",这个消息被认为是a poison pill,这个时候broker会将这个消息发送到死信队列,以便后续处理。

    注意两点:

    • 缺省持久消息过期,会被送到DLQ,非持久消息不会送到DLQ。
    • 缺省的死信队列是ActiveMQ DLQ,如果没有特别指定,死信都会被发送到这个队列。

    可以通过配置文件(activemq.xml)来调整死信的发送策略。

    1.修改activemq.xml,配置每个队列自己的死信队列

    <destinationPolicy>
        <policyMap>
            <policyEntries>
                <!--配置每个队列自己的死信队列-->
                <policyEntry queue=">">
                    <deadLetterStrategy>
                        <individualDeadLetterStrategy queuePrefix="DLQ."
                         useQueueForQueueMessages="true" />
                    </deadLetterStrategy>
                </policyEntry>
                <policyEntry topic=">" >
                    <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                    </pendingMessageLimitStrategy>
                </policyEntry>
            </policyEntries>
        </policyMap>
    </destinationPolicy>
    

    2.RedeliveryPolicy重发策略,consumer

    package com.czy.config;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.RedeliveryPolicy;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    import org.springframework.jms.connection.JmsTransactionManager;
    import org.springframework.transaction.PlatformTransactionManager;
    
    import javax.jms.ConnectionFactory;
    
    /**
     * @ProjectName: jms-producer
     * @Package: com.czy.config
     * @ClassName: AriveMQConfig
     * @Author: 曹振远
     * @Description:
     * @Date: 2021/6/15 16:53
     * @Version: 1.0
     */
    @Configuration
    public class ActiveMQConfig {
    
        //RedeliveryPolicy重发策略设置
        @Bean
        public RedeliveryPolicy redeliveryPolicy(){
            RedeliveryPolicy  redeliveryPolicy=   new RedeliveryPolicy();
            //是否在每次尝试重新发送失败后,增长这个等待时间
            redeliveryPolicy.setUseExponentialBackOff(true);
            //重发次数,默认为6次   这里设置为10次
            redeliveryPolicy.setMaximumRedeliveries(10);
            //重发时间间隔,默认为1秒
            redeliveryPolicy.setInitialRedeliveryDelay(2);
            //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
            redeliveryPolicy.setBackOffMultiplier(2);
            //是否避免消息碰撞
            redeliveryPolicy.setUseCollisionAvoidance(false);
            //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
            redeliveryPolicy.setMaximumRedeliveryDelay(-1);
    
            return redeliveryPolicy;
        }
    
        @Bean
        public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${spring.activemq.broker-url}")String url, RedeliveryPolicy redeliveryPolicy){
            ActiveMQConnectionFactory activeMQConnectionFactory =
                    new ActiveMQConnectionFactory(
                            "admin",
                            "admin",
                            url);
            activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
            return activeMQConnectionFactory;
        }
    
        @Bean
        public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
            return new JmsTransactionManager(connectionFactory);
        }
    
        @Bean(name="jmsQueryListenerFactory")
        public DefaultJmsListenerContainerFactory   jmsListenerContainerFactory(ConnectionFactory connectionFactory,PlatformTransactionManager transactionManager){
            DefaultJmsListenerContainerFactory  factory=new DefaultJmsListenerContainerFactory ();
            factory.setTransactionManager(transactionManager);
            factory.setConnectionFactory(connectionFactory);
            factory.setSessionTransacted(true); // 开启事务
            factory.setSessionAcknowledgeMode(1);
            return factory;
        }
    }
    
    

    ActiveMQ企业面试经典问题

    问题1:ActiveMQ宕机了怎么办?

    ActiveMQ主从集群:Zookeeper集群+Replocated LevelDB+ActiveMQ集群

    ActiveMQ集群.png

    问题2:如何防止消费方消息重复消费?(消息幂等)

    如果因为网络延迟等原因,MQ无法及时接收到消费方的应答,导致MQ重试,在重试过程中造成重复消费的问题。

    解决思路:

    • 如果消费方是做数据库操作的,那么可以把消息的ID作为唯一的主键,这样在重试的情况下,会触发主键冲突,从而避免数据出现脏数据。
    • 如果消费方不是做数据库操作的,那么可以借助第三方的应用,例如Redis,来记录消费记录,每次消息被消费完成的时候,把当前消息的ID作为key存到Redis中,每次消费前,先到Redis查询有没有该条消费记录。

    问题3:如何防止消息丢失?

    • 在消息生产者和消费者之间使用事务。
    • 在消费方采用手动确认消息(ACK)。
    • 消息持久化,例如jdbc和kahaDB日志。

    问题4:什么是死信队列?

    MQ消息处理失败或者过期,消息不会丢失的一种机制。

    相关文章

      网友评论

          本文标题:记录一次ActiveMQ的学习记录

          本文链接:https://www.haomeiwen.com/subject/hufnertx.html