ActiveMQ入门
异步处理
应用解耦
流量削峰
异步处理
场景说明:用户注册,需要执行三个业务逻辑,分别为写入用户表,发注册邮件以及注册短信。
串行方式
串行方式.png将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
并行方式
并行处理.png将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个以上三个任务完成后,返回给客户端,与串行的差别是,并行的方式可以提高处理的效率。
异步处理
引入消息中间件,将部分的业务逻辑,进行异步处理。改造后的架构如下:
引入消息中间件.png按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高啦,比串行提高了3倍,比并行提高了两倍。
应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。
传统的做法是,订单系统调用库存系统的接口。如下图:
传统做法.png传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合。如何解决以上问题呢?引入应用消息队列后的方案,如下图:
消息队列.png订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。
流量消峰
秒杀业务.png流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。通过加入消息队列完成如下功能:
a、可以控制活动的人数
b、可以缓解短时间内高流量压垮应用
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理
常见的消息中间件产品对比
特性 | ActiveMQ | RabbitMq | RocketMQ | Kafka |
---|---|---|---|---|
开发语言 | Java | Erlang | Java | Scala |
单击吞吐量 | 万级 | 万级 | 10万级 | 10万级 |
实效性 | 毫秒级 | 微秒级 | 毫秒级 | 毫秒级 |
可用性 | 高(支持主从架构) | 高(支持主从架构) | 非常高(分布式架构) | 非常高(分布式架构) |
功能性 | 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持的较好 | 基于erlang开发,所以并发能力很强,性能及其好,延时很低,管理界面丰富 | MQ功能比较完备,扩展性佳 | 像一些消息查询,消息回溯等功能没有提供,在大数据领域应用广泛 |
什么是ActiveMQ?
官网: http://activemq.apache.org/
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。
什么是JMS?
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)。
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。JMS本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。它类似于JDBC(java DatabaseConnectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持 JMS,包括 IBM 的MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JML 客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。
JMS消息模式
消息中间件一般有两种传递模式:点对点模式(P2P)和发布订阅模式(Pub/Sub)。
(1)P2P(Point to Point)点对点模型(Queue队列模型)
(2)Publish/Subscribe(PUB/SUB)发布、定于模型(Topic主题模型)
点对点模型(Pointer-to-Pointer):即生产者和消费者之间的消息往来。每个消息都被发送到特定的消息队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
点对点模式.png点对点模型的特点:
- 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。
- 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有。
- 正在运行,它不会影响到消息被发送到队列。
- 接收者在成功接收消息之后需向队列应答成功。
发布/订阅(Publish-Subscribe)
包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber),多个发布者将消息发送到topic,系统将这些消息投递到订阅此topic的订阅者。
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝。
发布/订阅模型的特点:
- 每个消息可以有多个消费者。
- 发布者和订阅者之间有时间上的依赖性(先订阅主题,再来发送消息)。
- 订阅者必须保持运行的状态,才能接受发布者发布的消息。
原生JMS API操作ActiveMQ
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.2</version>
</dependency>
</dependencies>
步骤
1.创建连接工厂 2.创建连接 3.打开连接 4.创建session 5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息) 6.创建消息生产者 7.创建消息 8.发送消息 9.释放资源
消息的发送者 点对点
package com.czy.producer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import javax.management.Query;
/**
* @ProjectName: jms-producer
* @Package: com.czy.producer
* @ClassName: PTPT_Producer
* @Author: 曹振远
* @Description: 点对点模式--消息生产者
* @Date: 2021/6/10 16:50
* @Version: 1.0
*/
public class PTPT_Producer {
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//2.创建连接
Connection connection = factory.createConnection();
//3.打开连接
connection.start();
//4.创建Session 参数1:是否开启事务 参数二:消息确认机制
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建目标地址(Queue/Topic)
Queue queue = session.createQueue("queue01");
//6.创建消息生产者
MessageProducer producer = session.createProducer(queue);
//7.创建消息
TextMessage testMessage = session.createTextMessage("test Message");
//8.发送消息
producer.send(testMessage);
System.out.println("发送消息成功");
//9.释放资源
session.close();
connection.close();
}
}
消息消费者 点对点
package com.czy.consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: jms-producer
* @Package: com.czy.consumer
* @ClassName: PTP_Consumer
* @Author: 曹振远
* @Description: 点对点模式--消息消费者
* @Date: 2021/6/11 10:39
* @Version: 1.0
*/
public class PTP_Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("queue01");
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
Message message = consumer.receive();
if (message == null) {
break;
}else{
//如果还有消息,判断是什么类型
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
System.out.println("接受到的消息:"+textMessage);
}
}
}
}
}
消费者2 --监听器
package com.czy.consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: jms-producer
* @Package: com.czy.consumer
* @ClassName: PTP_Consumer
* @Author: 曹振远
* @Description: 点对点模式--消息消费者监听模式
* @Date: 2021/6/11 10:39
* @Version: 1.0
*/
public class PTP_Consumer2 {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("queue01");
MessageConsumer consumer = session.createConsumer(queue);
//设置消息监听器
consumer.setMessageListener(new MessageListener() {
//处理逻辑
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage);
}
}
});
}
}
发布订阅消息生产者
package com.czy.producer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: jms-producer
* @Package: com.czy.producer
* @ClassName: PTPT_Producer
* @Author: 曹振远
* @Description: 发布订阅模式--消息生产者
* @Date: 2021/6/10 16:50
* @Version: 1.0
*/
public class PS_Producer {
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//2.创建连接
Connection connection = factory.createConnection();
//3.打开连接
connection.start();
//4.创建Session 参数1:是否开启事务 参数二:消息确认机制
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建目标地址(Queue/Topic)
Topic topic = session.createTopic("topic01");
//6.创建消息生产者
MessageProducer producer = session.createProducer(topic);
//7.创建消息
TextMessage testMessage = session.createTextMessage("test Message-topic");
//8.发送消息
producer.send(testMessage);
System.out.println("发送消息成功");
//9.释放资源
session.close();
connection.close();
}
}
发布订阅消息消费者
package com.czy.consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: jms-producer
* @Package: com.czy.consumer
* @ClassName: PTP_Consumer
* @Author: 曹振远
* @Description: 发布订阅模式--消息消费者监听模式
* @Date: 2021/6/11 10:39
* @Version: 1.0
*/
public class PS_Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("topic01");
MessageConsumer consumer = session.createConsumer(topic);
//设置消息监听器
consumer.setMessageListener(new MessageListener() {
//处理逻辑
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage);
}
}
});
}
}
spring整合ActiveMq
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itheima</groupId>
<artifactId>spring_producer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.7</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
</project>
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amp="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!--1.创建连接工厂对象-->
<amp:connectionFactory
id="connetionFactory"
brokerURL="tcp://127.0.0.1:61616"
userName="admin"
password="admin"
/>
<!--2.创建缓存连接工厂-->
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!--注入连接工厂-->
<property name="targetConnectionFactory" ref="connetionFactory"/>
<!--缓存消息数据-->
<property name="sessionCacheSize" value="5"/>
</bean>
<!--3.创建用于点对点发送的JmsTemplate-->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!--注入缓存连接工厂-->
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<!--指定是否为发布订阅模式-->
<property name="pubSubDomain" value="false"/>
</bean>
<!--4.创建用于发布订阅发送的JmsTemplate-->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!--注入缓存连接工厂-->
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<!--指定是否为发布订阅模式-->
<property name="pubSubDomain" value="true"/>
</bean>
</beans>
package com.czy.producer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
/**
* @ProjectName: jms-producer
* @Package: com.czy.producer
* @ClassName: SpringProducer
* @Author: 曹振远
* @Description:
* @Date: 2021/6/11 14:11
* @Version: 1.0
*/
@RunWith(SpringJUnit4ClassRunner.class) // junit与spring整合
@ContextConfiguration("classpath:applicationContext.xml") // 加载spring配置文件
public class SpringProducer {
//点对点模式
@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsQueueTemplate;
//发布订阅模式
@Autowired
@Qualifier("jmsTopicTemplate")
private JmsTemplate jmsTopicTemplate;
/**
* 点对点的发送
* 第一个参数是:指定队列名称
* 第二个参数是:MessageCreator接口
*/
@Test
public void ptpSends(){
jmsQueueTemplate.send("spring_Queue", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("spring text Message");
}
});
System.out.println("发送消息成功");
}
@Test
public void psSends(){
jmsTopicTemplate.send("topic01", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("spring text Message");
}
});
}
}
消费者
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amp="http://activemq.apache.org/schema/core"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">
<!--1.创建连接工厂对象-->
<amp:connectionFactory
id="connetionFactory"
brokerURL="tcp://127.0.0.1:61616"
userName="admin"
password="admin"
/>
<!--2.创建缓存连接工厂-->
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!--注入连接工厂-->
<property name="targetConnectionFactory" ref="connetionFactory"/>
<!--缓存消息数据-->
<property name="sessionCacheSize" value="5"/>
</bean>
<!--3.配置监听扫描-->
<context:component-scan base-package="com.czy.listener"/>
<!--4.配置监听器(点对点)-->
<jms:listener-container connection-factory="cachingConnectionFactory"
destination-type="queue">
<jms:listener destination="spring_Queue" ref="queueListener"/>
</jms:listener-container>
<!--5.配置监听器(发布订阅)-->
<jms:listener-container connection-factory="cachingConnectionFactory"
destination-type="topic">
<jms:listener destination="spring_Topic01" ref="topicListener"/>
</jms:listener-container>
</beans>
package com.czy.listener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* @ProjectName: jms-producer
* @Package: com.czy.listener
* @ClassName: QueueListener
* @Author: 曹振远
* @Description:
* @Date: 2021/6/11 15:09
* @Version: 1.0
*/
@Component
public class QueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("Queue:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
package com.czy.listener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* @ProjectName: jms-producer
* @Package: com.czy.listener
* @ClassName: TopicListener
* @Author: 曹振远
* @Description:
* @Date: 2021/6/11 15:14
* @Version: 1.0
*/
@Component
public class TopicListener implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("topic:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
启动类
package com.czy.consumer;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import java.io.IOException;
/**
* @ProjectName: jms-producer
* @Package: com.czy.consumer
* @ClassName: SpringConsumer
* @Author: 曹振远
* @Description:
* @Date: 2021/6/11 15:16
* @Version: 1.0
*/
public class SpringConsumer {
public static void main(String[] args) throws IOException {
ClassPathXmlApplicationContext cxt = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
cxt.start();
System.in.read();
}
}
springboot整合activeMQ
生产者
<!--springboot父工程:锁定springboot的版本及其整合框架的版本-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
<relativePath/>
</parent>
<!--导入所需依赖-->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--springboot与ActiveMQ的整合依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.yml
server:
port: 9001
spring:
application:
name: activeMQ-producer
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
#指定发布模式,false是点对点,true是发布订阅
jms:
pub-sub-domain: false
test
package com.czy.producer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @ProjectName: jms-producer
* @Package: com.czy.producer
* @ClassName: SpringBootProducer
* @Author: 曹振远
* @Description:
* @Date: 2021/6/11 15:45
* @Version: 1.0
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Test
public void ptpSends(){
jmsMessagingTemplate.convertAndSend("springboot-queue","你好mq");
System.out.println("发送消息成功");
}
}
消费者
server:
port: 9002
spring:
application:
name: activeMq-Consumer
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
jms:
pub-sub-domain: false
package com.czy.listener;
import org.apache.activemq.Message;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
/**
* @ProjectName: jms-producer
* @Package: com.czy.listener
* @ClassName: QueueListener
* @Author: 曹振远
* @Description:
* @Date: 2021/6/11 15:56
* @Version: 1.0
*/
@Component
public class QueueListener {
@JmsListener(destination = "springboot-queue")
public void receiveMessage(Message message) throws JMSException {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage.getText());
}
}
}
ActiveMQ高级
JMS消息组成
结构 | 描述 |
---|---|
JMS Provider | 消息中间件、消息服务器 |
JMS Producer | 消息生产者 |
JMS Consumer | 消息消费者 |
JMS Message | 消息(重要) |
JMS Message消息由三部分组成
- 消息头
- 消息体
- 消息属性
JMS消息头
JMS消息头预定了若干字段用户用于客户端与JMS提供者之间识别和发送消息,预编译头如下:
- <span style="color:red">红色</span>为重要的消息头
名称 | 描述 |
---|---|
<span style="color:red">JMSDestination</span> | 消息发送的Destination,在发送过程中由提供者设置,发送到哪里(队列) |
<span style="color:red">JMSMessageID</span> | 唯一标识提供者发送的每一个消息,这个字段实在发送过程中由提供者设置的,客户机只能在消息发送后才能确定消息的MessageID |
<span style="color:red">JMSDeliveryMode</span> | 消息持久化。包含DeliveryMode.PERSISTENT或者DeliveryMode.NON_PERSISTENT |
JMSTimestamp | 提供者发送消息的时间,由提供者在发送过程中设置 |
<span style="color:red">JMSExpiration</span> | 消息失效时间,毫秒,值0表明消息不会过期,默认是0 |
<span style="color:red">JMSPriority</span> | 消息的优先级,由提供者在发送过程中设置。优先级0的优先级最低,优先级9的优先级最高。0-4为普通消息,5-9为加急消息。ActiveMQ不保证优先级高就一定先发送,只保证了加急消息必须先于普通消息发送。默认是4 |
<span style="color:red">JMSCorrelationID</span> | 通常用来链接响应消息与请求消息,由消息的JJMS程序设置 |
JMSSReplyTo | 请求程序用它来指出回复消息应发送的地方,由发送消息的JMS程序设置 |
JMSType | JMS程序用来指出消息的类型 |
JMSRedelivered | 消息的重发标志,false,代表该消息时第一次发生,true,代表消息 |
不过需要注意的是,在传送消息时,消息头的值是由JMS提供者来设置的,因此开发者使用以上setJMSxxx()方法分配的值就被忽略了,只有以下几个值是可以由开发者设置的:
JMSCorrelationID、JMSRepltTo、JMSType
JMS消息体
在消息体中,JMS API定义了五种类型的消息格式
TextMessage--一个字符串对象
MapMessage --键值对
ObjectMessage --一个序列化的Java对象,注意对象必须序列化,5.12后需要添加信任列表
BytesMessage --一个字节的数据流
StreamMessage --Java原始值的数据流
spring:
application:
name: activeMq-Consumer
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
packages:
trust-all: true #让ActiveMQ信任全部的自定义对象,实现对象的序列化
JMS消息属性
我们可以给消息设置自定义属性,这些属性主要是提供给应用程序的,对于实现消息过滤功能和标记功能,消息属性非常有用,JMS API定义了一些标准属性,JMS服务提供者可以选择的提供部分标准属性。
message.setStringPropertie("Property",property);//自定义属性
消息持久化
消息持久化是保证消息不丢失的重要方法!!!
ActiveMQ提供了以下三种的消息存储模式:
- Memory消息存储-基于内存的存储方式。
- 基于日志消息存储方式,KahaDB是ActiveMQ的默认日志存储方式,它是提供了容量的提升和恢复能力。
- 基于JDBC的消息存储方式--数据存储于数据库(例如MySQL中)。
为什么要移除呢?
因为持久化不移除的话,有可能会重复发送消息。
不持久化
server:
port: 9001
spring:
application:
name: activeMQ-producer
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
#指定发布模式,false是点对点,true是发布订阅
jms:
pub-sub-domain: false
template:
delivery-mode: non_persistent #非持久化(把消息存储在内存里面)
kahaDB日志存储持久化
server:
port: 9001
spring:
application:
name: activeMQ-producer
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
#指定发布模式,false是点对点,true是发布订阅
jms:
pub-sub-domain: false
template:
delivery-mode: persistent #存储在日志文件里面
JDBC持久化
server:
port: 9001
spring:
application:
name: activeMQ-producer
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
#指定发布模式,false是点对点,true是发布订阅
jms:
pub-sub-domain: false
template:
delivery-mode: persistent #存储在日志文件里面
修改activemq.xml
<!--配置数据库连接池-->
<bean name="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://192.168.66.133:3306/db_activemq" /> <property name="username" value="root" />
<property name="password" value="123456"/>
</bean>
<!--JDBC Jdbc用于master/slave模式的数据库分享 -->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
- 拷贝mysql及durid数据源的jar包到activemq的lib目录下。
4)重启activemq。
消息事务
消息事务,是保证消息传递原子性的一个重要的特征,和JDBC的事务特征类似
一个事务性发送,其中一组消息要么能够全部成功保证到达服务器,要么都不到达服务器,
生产者,消费者与消息服务器直接都支持事务性
ActiveMQ的事务主要偏向于生产者的应用
ActiveMQ消息事务流程图:
ActiveMQ消息事务流程图.png实现:生产者
package com.czy.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import javax.jms.ConnectionFactory;
/**
* @ProjectName: jms-producer
* @Package: com.czy.config
* @ClassName: ActiveMQConfig
* @Author: 曹振远
* @Description:
* @Date: 2021/6/15 14:45
* @Version: 1.0
*/
@Configuration
public class ActiveMQConfig {
/**
* @Author: 曹振远
* @Description: 添加JMS事务管理器
* @Date: 14:47 2021/6/15
*/
@Bean
public PlatformTransactionManager createTransactionManager(ConnectionFactory connectionFactory){
return new JmsTransactionManager(connectionFactory);
}
}
@Service
@Transactional //这个注解不但可以实现消息的事务,也可以解决数据库的事务操作
public class MessageService {
@Autowired
private JmsTemplate jmsTemplate;
public void sendMessage() {
for (int i = 0; i <= 10; i++) {
int a = 10 / 0;
}
jmsTemplate.convertAndSend("spring-demo","testMQ");
}
}
消费者实现
package com.czy.listener;
import org.apache.activemq.Message;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* @ProjectName: jms-producer
* @Package: com.czy.listener
* @ClassName: QueueListener
* @Author: 曹振远
* @Description:
* @Date: 2021/6/11 15:56
* @Version: 1.0
*/
@Component
public class QueueListener {
@JmsListener(destination = "springboot-queue")
public void receiveMessage(Message message, Session session){
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
session.commit();//提交
} catch (JMSException e) {
e.printStackTrace();
try {
session.rollback();//一旦事务回滚,MQ会重发消息,一共重发6次
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
}
}
消息确认机制
JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包括三个阶段:客户接收消息,客户处理消息和消息被确认。在事务性会话中,当一个事务被提交的时候,确认自动发生,在非事务性会话中,消息何时被确认取决于创建会话的应答模式。该参数有以下三个可能:
值 | 描述 |
---|---|
Session.AUTO_ACKNOWLEDGE | 当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法返回的时候,会话自动确认客户收到的消息 |
Session.CLENT_ACKNOWLEDGE | 客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确定是在会话层上进行,确认一个被消费的消息将自动确认所有已被会话消费的消息,例如,如果一个消费者消费了10个消息,然后确认第五个消息,那么所有10个消息都被确认 |
Session.DUPS_ACKNOWLEDGE | 该选择只是会话迟钝消息的提交,如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS proovider必须把消息头的JMSRedelivered设置为true |
注意:消息确认机制和事务机制是冲突的,只能选其中一个
ActiveMQ消费方的消费确认机制.png
消费方配置类
package com.czy.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
/**
* @ProjectName: jms-producer
* @Package: com.czy.config
* @ClassName: AriveMQConfig
* @Author: 曹振远
* @Description:
* @Date: 2021/6/15 16:53
* @Version: 1.0
*/
@Configuration
public class AriveMQConfig {
@Bean("jmsListenerContainerFactory")
public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//关闭事务(重要)
factory.setSessionTransacted(false);
//修盖消息确认机制,springboot整合ActiveMQ后,手动确认是4
factory.setSessionAcknowledgeMode(4);
return factory;
}
}
package com.czy.listener;
import org.apache.activemq.Message;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* @ProjectName: jms-producer
* @Package: com.czy.listener
* @ClassName: QueueListener
* @Author: 曹振远
* @Description:
* @Date: 2021/6/11 15:56
* @Version: 1.0
*/
@Component
public class QueueListener {
@JmsListener(destination = "springboot-queue",containerFactory = "jmsListenerContainerFactory")
public void receiveMessage(Message message){
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
message.acknowledge();//手动确认接收消息
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
消息的投递方式
1.异步投递vs同步投递
同步:消息生产者使用持久传递模式发送消息的时候,Producer.send()方法会被阻塞,直到broker发送一个确认消息给生产者ProducerAck,这个确认消息暗示Broker已经成功接收到消息并把消息保存到二级存储中。
异步:如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送,异步发送不会在收到broker的确认之前一直阻塞Producer.send方法想要使用异步,在brokerURL中增加jms.alwaysSycSend=false&jms.useAsyncSend=true属性。
- 如果设置了alwaysSycSend=true,系统将会忽略useAsyncSend设置的值都采用同步。
- 当alwaysSycSend=false时,“non_persistent”(非持久化)、事务中的消息将使用“异步发送”。
- 当alwaysSycSend=false时,如果指定了useAsyncSend=true,“persistent”类型的消息使用异步发送,如果useAsyncSend=false,“persistent”类型的消息使用的同步发送。
总计:默认情况jms.alwaysSycSend=false&jms.useAsyncSend=false,非持久化消息、事务内的消息均采用异步发送:对持久化的消息采用同步发送。
2.配置异步投递
//1.在连接上配置
new ActiveMQConnertionFactory("tcp://localhost:61616?jms.useAsyncSend=true")
//2.通过ConnectionFactory
(ActiveMQConnertionFactory)connertionFactory.setUserAsyncSend(true);
//3.通过connection
(ActiveMQConnertionFactory)connection.setUserAsyncSend(true);
注意:Spring和SpringBoot项目,通过修改JmsTemplate的默认参数实现异步或同步投递。
/**
* 异步发送非持久化JmsTemplate
* @param connectionFactory
* @return
*/
@Autowired
@Bean
public JmsTemplate asynJmsTemplate(ConnectionFactory connectionFactory) {
JmsTemplate template = new JmsTemplate(connectionFactory);
template.setExplicitQosEnabled(true);
template.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
return template;
}
/**
* 同步发送非持久化JmsTemplate
* @param connectionFactory
* @return
*/
@Autowired
@Bean
public JmsTemplate synJmsTemplate(ConnectionFactory connectionFactory) {
JmsTemplate template = new JmsTemplate(connectionFactory);
return template;
}
3.异步投递如何确认发送成功
异步投递丢失消息的场景:生产者设置UserAsyncSend=true,使用producer.send(message)持续发送消息,由于消息不阻塞,生产者会认为所有的send消息均被成功发送到MQ。如果MQ突然宕机,此时生产者端内存中尚未发送至MQ的消息都会丢失。
这时,可以给异步投递方式接收回调,以确认消息是否发送成功!
producer.send(textMessage,new AsyncCallback() {
@Override public void onSuccess ()
{ // 使用msgid标识来进行消息发送成功的处理
System.out.println(msgid + " 消息发送成功");
}
@Override public void onException (JMSException exception)
{ // 使用msgid表示进行消息发送失败的处理
System.out.println(msgid + " 消息发送失败");
exception.printStackTrace();
}
});
4.延时投递
生产者提供两个发送消息的方法,一个是即时发送,一个是延时发送。
1.修改activemq.xml
<broker xmlns="http://activemq.apache.org/schema/core" ... schedulerSupport="true" > ...... </broker>
重点:schedulerSupport="true"
2.在代码中设置延时
/**
* 延时投递
*/
@Test
public void sendMessage() {
Connection connection = null;
Session session = null;
ActiveMQMessageProducer producer = null;
// 获取连接工厂
ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory();
try {
connection = connectionFactory.createConnection();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("springboot-queue");
int count = 10;
producer = (ActiveMQMessageProducer) session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//创建需要发送的消息
TextMessage textMessage = session.createTextMessage("Hello");
//设置延时时长(延时10秒)
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10000);
producer.send(textMessage);
session.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
5.定时投递
1.启动类添加定时注解
package com.czy;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* @ProjectName: jms-producer
* @Package: com.czy
* @ClassName: AppProducer
* @Author: 曹振远
* @Description:
* @Date: 2021/6/11 15:37
* @Version: 1.0
*/
@SpringBootApplication
@EnableScheduling//开启定时功能
public class AppProducer {
public static void main(String[] args) {
SpringApplication.run(AppProducer.class, args);
}
}
@Scheduled(fixedDelay = 3000)//定时任务注解
public void sendQueue() {
jmsMessagingTemplate.convertAndSend("springboot-queue", "消息ID:" + UUID.randomUUID().toString().substring(0, 6));
System.out.println("消息发送成功...");
}
死信队列
DLQ-Dead Letter Queue,死信队列,用来保存处理失败或者过期的消息。
以下情况,消息会被重发:
- 事务 rollback()。
- 事务没有调用 commit()。
- 没有开启事务,使用手动确认,session.recover()时。
当一个消息被重发6次(缺省为6次),会给broker发送一个“Poison ack",这个消息被认为是a poison pill,这个时候broker会将这个消息发送到死信队列,以便后续处理。
注意两点:
- 缺省持久消息过期,会被送到DLQ,非持久消息不会送到DLQ。
- 缺省的死信队列是ActiveMQ DLQ,如果没有特别指定,死信都会被发送到这个队列。
可以通过配置文件(activemq.xml)来调整死信的发送策略。
1.修改activemq.xml,配置每个队列自己的死信队列
<destinationPolicy>
<policyMap>
<policyEntries>
<!--配置每个队列自己的死信队列-->
<policyEntry queue=">">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ."
useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
2.RedeliveryPolicy重发策略,consumer
package com.czy.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import javax.jms.ConnectionFactory;
/**
* @ProjectName: jms-producer
* @Package: com.czy.config
* @ClassName: AriveMQConfig
* @Author: 曹振远
* @Description:
* @Date: 2021/6/15 16:53
* @Version: 1.0
*/
@Configuration
public class ActiveMQConfig {
//RedeliveryPolicy重发策略设置
@Bean
public RedeliveryPolicy redeliveryPolicy(){
RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();
//是否在每次尝试重新发送失败后,增长这个等待时间
redeliveryPolicy.setUseExponentialBackOff(true);
//重发次数,默认为6次 这里设置为10次
redeliveryPolicy.setMaximumRedeliveries(10);
//重发时间间隔,默认为1秒
redeliveryPolicy.setInitialRedeliveryDelay(2);
//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
redeliveryPolicy.setBackOffMultiplier(2);
//是否避免消息碰撞
redeliveryPolicy.setUseCollisionAvoidance(false);
//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
redeliveryPolicy.setMaximumRedeliveryDelay(-1);
return redeliveryPolicy;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${spring.activemq.broker-url}")String url, RedeliveryPolicy redeliveryPolicy){
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory(
"admin",
"admin",
url);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return activeMQConnectionFactory;
}
@Bean
public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
@Bean(name="jmsQueryListenerFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory,PlatformTransactionManager transactionManager){
DefaultJmsListenerContainerFactory factory=new DefaultJmsListenerContainerFactory ();
factory.setTransactionManager(transactionManager);
factory.setConnectionFactory(connectionFactory);
factory.setSessionTransacted(true); // 开启事务
factory.setSessionAcknowledgeMode(1);
return factory;
}
}
ActiveMQ企业面试经典问题
问题1:ActiveMQ宕机了怎么办?
ActiveMQ主从集群:Zookeeper集群+Replocated LevelDB+ActiveMQ集群
ActiveMQ集群.png问题2:如何防止消费方消息重复消费?(消息幂等)
如果因为网络延迟等原因,MQ无法及时接收到消费方的应答,导致MQ重试,在重试过程中造成重复消费的问题。
解决思路:
- 如果消费方是做数据库操作的,那么可以把消息的ID作为唯一的主键,这样在重试的情况下,会触发主键冲突,从而避免数据出现脏数据。
- 如果消费方不是做数据库操作的,那么可以借助第三方的应用,例如Redis,来记录消费记录,每次消息被消费完成的时候,把当前消息的ID作为key存到Redis中,每次消费前,先到Redis查询有没有该条消费记录。
问题3:如何防止消息丢失?
- 在消息生产者和消费者之间使用事务。
- 在消费方采用手动确认消息(ACK)。
- 消息持久化,例如jdbc和kahaDB日志。
问题4:什么是死信队列?
MQ消息处理失败或者过期,消息不会丢失的一种机制。
网友评论