一、ActiveMQ简介
1.什么是ActiveMQ
ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1 和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
2.什么是消息
“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串,也可以更复杂,可以包含嵌入对象。
3.什么是队列
是一种有序的,先进先出的数据结构,例如:生活中的排队
4.什么是消息队列
“消息队列”是在消息的传输过程中保存消息的容器
5.常见消息服务应用
- ActiveMQ
- RabbitMQ
- RocketMQ
二、消息服务的应用场景
消息队列的特点主要是异步处理,主要作用是减少消息请求和响应的时间以及解耦。所以主要用于比较耗时并且不需要即时(同步)返回结果的操作。
2.1 异步处理
2.1.1 用户注册
用户注册流程:
- 注册处理及写入数据库
- 发送注册成功的手机短信
- 发送注册成功的邮件信息
如果使用消息中间件,则可以创建两个线程来做这些事情,直接发送消息给消息中间件,然后让邮件服务和短信服务去消息中间件中取消息,取到消息后自己再做对应的操作。
2.2 应用的解耦
2.2.1 订单处理
生成订单流程:
- 在购物车中点击结算
- 完成支付
- 创建订单
- 调用库存系统
订单完成后,订单系统不用直接取调用库存系统,而是发送消息到消息中间件,写入一个订单信息。库存系统自己去消息中间件中获取,然后做发货处理,并更新库存。
2.3流量的削峰
2.3.1 秒杀功能
秒杀流程
- 用户点击秒杀
- 发送请求到秒杀应用
- 在请求秒杀应用之前将请求放入到消息队列
- 秒杀应用从消息队列中获取请求并处理
系统举行秒杀活动,流量蜂拥而至100件商品,10万人挤进来怎么办?
将10万秒杀的操作,放入消息队列。秒杀应用将10万个请求中的前100个进行处理,其它的驳回通知失败。这样将流量控制在了消息队列处。秒杀应用不会被怼死。
三、JMS
1.什么是JMS
JMS(Java Message Service)是Java平台上面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且提供标准的产生、发送、接收消息的接口,简化企业应用的开发。
2.JMS模型
2.1 点对点模型(Point To Point)
生产者发送一条消息到queue,只有一个消费者能收到。
image.png
2.2 发布订阅模型(Publish/Subscribe)
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。
image.png
四、ActiveMQ安装
1.下载资源
ActiveMQ官网:http://activemq.apache.org
1.1 版本说明
ActiveMQ5.10.x 以上版本必须使用 JDK1.8 才能正常使用。
ActiveMQ5.9.x 及以下版本使用 JDK1.7 即可正常使用。
2.上传至Linux服务器
3.解压安装文件
tar -zxf apache-activemq-5.9.0-bin.tar.gz
4.检查权限
ls -al apache-activemq-5.9.0/bin
如果权限不足,则无法执行,需要修改文件权限:
chmod 755 activemq
5.复制应用至本地目录
cp apache-activemq-5.9.0 /usr/local/activemq -r
6.启动ActiveMQ
/usr/local/activemq/bin/activemq start
7.测试ActiveMQ
7.1检查进程
ps aux|grep activemq
7.2管理界面
使用浏览器访问ActiveMQ管理应用,地址如下:
http://ip:8161/admin/
用户名:admin
密码:admin
AcitveMQ使用的是Jetty提供的HTTP服务。启动稍慢,建议短暂等待再访问测试。
见到如下界面代表服务启动成功
7.3 修改访问端口(管理应用监听的端口)
修改ActiveMQ配置文件:
/usr/local/activemq/conf/jetty.xml
image.png
7.4 修改用户名和密码
修改conf/users.properties配置文件,内容为:用户名=密码
保存并启动ActiveMQ服务即可。
image.png
8.重启ActiveMQ
/usr/local/activemq/bin/activemq restart
9.关闭ActiveMQ
/usr/local/activemq/bin/activemq stop
10.配置文件activemq.xml
配置文件中,配置的是ActiveMQ的核心配置信息,是提供服务时使用的配置,可以修改启动的访问端口,即Java编程中访问ActiveMQ的访问端口
image.png
默认端口:61616(编程时使用的端口)
使用协议:TCP协议
修改端口后,保存并重启ActiveMQ服务即可
11.ActiveMQ目录介绍
bin:可执行的脚本文件
conf:相关的配置文件
data:存放的是日志文件
docs:存放的是相关文档
examples:存放的是简单的实例
lib:相关的jar包
webapps:用于存放项目的目录
五、ActiveMQ术语
1.Destination
目的地,JMS Provider(消息中间件)负责维护,用于对Message进行管理的对象。MessageProducer需要指定Destination才能发送消息,MessageReceiver需要指定Destination才能接收消息。
2.Producer
消息生成者,负责发送Message到目的地。
3.Consumer|Receiver
消息消费者,负责从目的地中消费(处理/监听/订阅)Message
4.Message
消息,用于封装一次通信的内容
六、ActiveMQ应用
1.ActiveMQ常用API简介
下述API都是接口类型,定义在javax.jms包中
1.1 ConnectionFactory
连接工厂:用于创建连接的工厂类型
1.2 Connection
连接:用于建立访问ActiveMQ连接的类型,由连接工厂创建
1.3 Session
会话:一次持久、有效、有状态的访问,由连接创建
1.4 Destination & Queue
目的地:用于描述本次访问ActiveMQ的消息访问目的地,即ActiveMQ服务中的具体队列,由会话创建
Interface Queue extends Destination
1.5 MessageProducer
消息生产者:在一次有效会话中,用于发送消息给ActiveMQ的服务工具,由会话创建
1.6 MessageConsumer
消息消费者:在一次有效会话中,用于从ActiveMQ中获取消息的工具,由会话创建
1.7 Message
消息:通过消息生产者向ActiveMQ服务发送消息时使用的数据载体对象或消息消费者从ActiveMQ服务中获取消息时使用的数据载体对象,是所有消息(文本消息、对象消息等)具体类型的顶级接口,可以通过会话创建或通过会话从ActiveMQ服务中获取
2.JMS-HelloWorld
2.1 处理文本消息
2.1.1 创建消息生产者
创建工程
mq-producer
添加坐标
<!--activeMQ-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq-all.version}</version>
</dependency>
编写消息生产者
public class HelloWorldProducer {
public void sendHelloWorldActiveMQ(String msgText){
//定义连接工厂
ConnectionFactory connectionFactory = null;
//定义连接对象
Connection connection = null;
//定义会话
Session session = null;
//目的地
Destination destination = null;
//消息生产者
MessageProducer producer = null;
//定义消息
Message message = null;
try {
//传入的用户名和密码可以通过jetty-realm.properties文件修改
//brokerURL:访问activeMQ的连接地址,路径结构为:协议://主机地址:端口号
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//启动连接(此时才是真正创建连接)
connection.start();
/**
* 创建会话
* transacted:是否使用事务,可选值为true,false
* true:使用事务,设置此变量值,Session.SESSION.TRANSACTION
* false:不使用事务,设置此变量 则acknowledgeMode必须设置
* acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自动确认机制
* Session.CLIENT_ACKNOWLEDGE:客户端确认机制(需手动调用API)
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认机制(前两种一旦收到消息确认就会进行删除,这个则不会)
*/
session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);
//创建目的地,即队列的名称,消息消费者需要通过此名称访问对应的队列
destination = session.createQueue("helloworld-destination");
//创建消息生产者
producer = session.createProducer(destination);
//创建消息对象
message = session.createTextMessage(msgText);
//发送消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
//回收消息发送者资源
if (producer != null){
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null){
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
2.1.2 创建消息消费者
创建工程
mq-consumer
添加坐标
<!--activeMQ-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
创建消息生产者
public class HelloWorldConsumer {
public void receiveHelloWorldActiveMQ() {
//定义连接工厂
ConnectionFactory connectionFactory = null;
//定义连接
Connection connection = null;
//定义会话
Session session = null;
//定义目的地
Destination destination = null;
//定义消息消费者
MessageConsumer consumer = null;
//定义消息
Message message = null;
try {
//创建连接工厂
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//开启连接
connection.start();
//创建会话
session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
//创建目的地
destination = session.createQueue("helloworld-destination");
//创建消息消费者
consumer = session.createConsumer(destination);
//接收消息
message = consumer.receive();
//获取文本消息
String msg = ((TextMessage) message).getText();
System.out.println("从ActiveMQ中获取的文本信息:" + msg);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (consumer != null) {
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
2.1.3 测试
测试生产者
public class Test {
public static void main(String[] args) {
HelloWorldProducer producer = new HelloWorldProducer();
producer.sendHelloWorldActiveMQ("HelloWorld");
}
}
测试消费者
public class Test {
public static void main(String[] args) {
HelloWorldConsumer consumer = new HelloWorldConsumer();
consumer.receiveHelloWorldActiveMQ();
}
}
image.png
2.2 处理对象消息
2.2.1 创建对象
public class User implements Serializable {
private Integer userId;
private String userName;
private Integer userAge;
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public Integer getUserAge() {
return userAge;
}
public void setUserAge(Integer userAge) {
this.userAge = userAge;
}
@Override
public String toString() {
return "User{" +
"userId=" + userId +
", userName='" + userName + '\'' +
", userAge=" + userAge +
'}';
}
}
2.2.2 创建生产者
public class HelloWorldProducer2 {
public void sendHelloWorldActiveMQ(User user){
//定义连接工厂
ConnectionFactory connectionFactory = null;
//定义连接对象
Connection connection = null;
//定义会话
Session session = null;
//目的地
Destination destination = null;
//消息生产者
MessageProducer producer = null;
//定义消息
Message message = null;
try {
//传入的用户名和密码可以通过jetty-realm.properties文件修改
//brokerURL:访问activeMQ的连接地址,路径结构为:协议://主机地址:端口号
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//启动连接(此时才是真正创建连接)
connection.start();
/**
* 创建会话
* transacted:是否使用事务,可选值为true,false
* true:使用事务,设置此变量值,Session.SESSION.TRANSACTION
* false:不使用事务,设置此变量 则acknowledgeMode必须设置
* acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自动确认机制
* Session.CLIENT_ACKNOWLEDGE:客户端确认机制(需手动调用API)
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认机制(前两种一旦收到消息确认就会进行删除,这个则不会)
*/
session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);
//创建目的地,即队列的名称,消息消费者需要通过此名称访问对应的队列
destination = session.createQueue("my-user");
//创建消息生产者
producer = session.createProducer(destination);
//创建消息对象
message = session.createObjectMessage(user);
//发送消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
//回收消息发送者资源
if (producer != null){
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null){
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
2.2.3 创建消费者
public class HelloWorldConsumer2 {
public void receiveHelloWorldActiveMQ() {
//定义连接工厂
ConnectionFactory connectionFactory = null;
//定义连接
Connection connection = null;
//定义会话
Session session = null;
//定义目的地
Destination destination = null;
//定义消息消费者
MessageConsumer consumer = null;
//定义消息
Message message = null;
try {
//创建连接工厂
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//开启连接
connection.start();
//创建会话
session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
//创建目的地
destination = session.createQueue("my-user");
//创建消息消费者
consumer = session.createConsumer(destination);
//接收消息
message = consumer.receive();
Serializable obj = ((ObjectMessage) message).getObject();
User user = (User) obj;
System.out.println("从ActiveMQ中获取的对象信息:" + user);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (consumer != null) {
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
2.2.4 测试
public class Test {
public static void main(String[] args) {
/*HelloWorldProducer producer = new HelloWorldProducer();
producer.sendHelloWorldActiveMQ("HelloWorld");*/
HelloWorldProducer2 producer2 = new HelloWorldProducer2();
producer2.sendHelloWorldActiveMQ(new User(1, "tom", 21));
}
}
public class Test {
public static void main(String[] args) {
/*HelloWorldConsumer consumer = new HelloWorldConsumer();
consumer.receiveHelloWorldActiveMQ();*/
HelloWorldConsumer2 consumer2 = new HelloWorldConsumer2();
consumer2.receiveHelloWorldActiveMQ();
}
}
image.png
3.JMS-实现队列服务监听
队列监听使用了观察者模式
3.1 创建消息生产者
public class HelloWorldProducer3 {
public void sendHelloWorldActiveMQ(User user){
//定义连接工厂
ConnectionFactory connectionFactory = null;
//定义连接对象
Connection connection = null;
//定义会话
Session session = null;
//目的地
Destination destination = null;
//消息生产者
MessageProducer producer = null;
//定义消息
Message message = null;
try {
//传入的用户名和密码可以通过jetty-realm.properties文件修改
//brokerURL:访问activeMQ的连接地址,路径结构为:协议://主机地址:端口号
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//启动连接(此时才是真正创建连接)
connection.start();
/**
* 创建会话
* transacted:是否使用事务,可选值为true,false
* true:使用事务,设置此变量值,Session.SESSION.TRANSACTION
* false:不使用事务,设置此变量 则acknowledgeMode必须设置
* acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自动确认机制
* Session.CLIENT_ACKNOWLEDGE:客户端确认机制(需手动调用API)
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认机制(前两种一旦收到消息确认就会进行删除,这个则不会)
*/
session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);
//创建目的地,即队列的名称,消息消费者需要通过此名称访问对应的队列
destination = session.createQueue("my-destination");
//创建消息生产者
producer = session.createProducer(destination);
//创建消息对象
message = session.createObjectMessage(user);
//发送消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
//回收消息发送者资源
if (producer != null){
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null){
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
3.2 创建消息消费者
public class HelloWorldConsumer3 {
public void receiveHelloWorldActiveMQ() {
//定义连接工厂
ConnectionFactory connectionFactory = null;
//定义连接
Connection connection = null;
//定义会话
Session session = null;
//定义目的地
Destination destination = null;
//定义消息消费者
MessageConsumer consumer = null;
//定义消息
Message message = null;
try {
//创建连接工厂
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//开启连接
connection.start();
//创建会话
session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
//创建目的地
destination = session.createQueue("my-destination");
//创建消息消费者
consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
//ActiveMQ的回调方法,通过该方法将消息传递到consumer中
@Override
public void onMessage(Message message) {
Serializable obj = null;
try {
obj = ((ObjectMessage) message).getObject();
} catch (JMSException e) {
e.printStackTrace();
}
User user = (User) obj;
System.out.println("从ActiveMQ中获取的对象信息:" + user);
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}
4.Topic模型
4.1 Publish/Subscribe 处理模式(Topic)
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。
和点对点方式不同,发布到topic的消息会被所有订阅者消费
当生产者发布消息,不管是否有消费者,都不会保存消息
一定要先有消息的消费者,后有消息生产者
4.2 创建消息生产者
public class HelloWorldProducerTopic {
public void sendHelloWorldActiveMQ(String msgText){
//定义连接工厂
ConnectionFactory connectionFactory = null;
//定义连接对象
Connection connection = null;
//定义会话
Session session = null;
//目的地
Destination destination = null;
//消息生产者
MessageProducer producer = null;
//定义消息
Message message = null;
try {
//传入的用户名和密码可以通过jetty-realm.properties文件修改
//brokerURL:访问activeMQ的连接地址,路径结构为:协议://主机地址:端口号
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//启动连接(此时才是真正创建连接)
connection.start();
/**
* 创建会话
* transacted:是否使用事务,可选值为true,false
* true:使用事务,设置此变量值,Session.SESSION.TRANSACTION
* false:不使用事务,设置此变量 则acknowledgeMode必须设置
* acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自动确认机制
* Session.CLIENT_ACKNOWLEDGE:客户端确认机制(需手动调用API)
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认机制(前两种一旦收到消息确认就会进行删除,这个则不会)
*/
session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);
//创建目的地,即队列的名称,消息消费者需要通过此名称访问对应的队列
destination = session.createTopic("test-topic");
//创建消息生产者
producer = session.createProducer(destination);
//创建消息对象
message = session.createTextMessage(msgText);
//发送消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
//回收消息发送者资源
if (producer != null){
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null){
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
4.3 创建消息消费者
创建三份
public class HelloWorldConsumerTopic1 implements Runnable{
public void receiveHelloWorldActiveMQ() {
//定义连接工厂
ConnectionFactory connectionFactory = null;
//定义连接
Connection connection = null;
//定义会话
Session session = null;
//定义目的地
Destination destination = null;
//定义消息消费者
MessageConsumer consumer = null;
//定义消息
Message message = null;
try {
//创建连接工厂
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//开启连接
connection.start();
//创建会话
session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
//创建目的地
destination = session.createTopic("test-topic");
//创建消息消费者
consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//获取文本消息
String msg = null;
try {
msg = ((TextMessage) message).getText();
} catch (JMSException e) {
e.printStackTrace();
}
System.out.println("从ActiveMQ中获取的文本信息----topic1:" + msg);
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void run() {
receiveHelloWorldActiveMQ();
}
}
4.4 测试
public class Test {
public static void main(String[] args) {
/*HelloWorldProducer producer = new HelloWorldProducer();
producer.sendHelloWorldActiveMQ("HelloWorld");*/
/*HelloWorldProducer2 producer2 = new HelloWorldProducer2();
producer2.sendHelloWorldActiveMQ(new User(1, "tom", 21));*/
/*HelloWorldProducer3 producer3 = new HelloWorldProducer3();
producer3.sendHelloWorldActiveMQ(new User(2,"alice",19));*/
HelloWorldProducerTopic topic = new HelloWorldProducerTopic();
topic.sendHelloWorldActiveMQ("Hello Topic");
}
}
public class Test {
public static void main(String[] args) {
/*HelloWorldConsumer consumer = new HelloWorldConsumer();
consumer.receiveHelloWorldActiveMQ();*/
/*HelloWorldConsumer2 consumer2 = new HelloWorldConsumer2();
consumer2.receiveHelloWorldActiveMQ();*/
/*HelloWorldConsumer3 consumer3 = new HelloWorldConsumer3();
consumer3.receiveHelloWorldActiveMQ();*/
HelloWorldConsumerTopic1 topic1 = new HelloWorldConsumerTopic1();
Thread thread1 = new Thread(topic1);
thread1.start();
HelloWorldConsumerTopic2 topic2 = new HelloWorldConsumerTopic2();
Thread thread2 = new Thread(topic2);
thread2.start();
HelloWorldConsumerTopic3 topic3 = new HelloWorldConsumerTopic3();
Thread thread3 = new Thread(topic3);
thread3.start();
}
}
image.png
七、Spring整合ActiveMQ
1.创建项目
创建spring-activemq-producer
1.1 添加坐标
<dependencies>
<!--activeMQ-->
<!--ActiveMQ客户端完整jar包依赖-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-context</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--ActiveMQ和Spring整合配置文件标签处理jar包依赖-->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
</dependency>
<!--Spring JMS插件相关的jar包依赖-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<!--Active Pool-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jms-pool</artifactId>
</dependency>
<!-- 单元测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<!-- 日志处理 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<!-- spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</dependency>
<!--javaee-->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jsp-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
</dependency>
</dependencies>
1.2 整合ActiveMQ
- web.xml
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
version="2.5">
<servlet>
<servlet-name>springmvc</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring-*.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>springmvc</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
<filter>
<filter-name>encodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>encodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
</web-app>
- spring-mvc.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!--扫包-->
<context:component-scan base-package="com.hxx.web.controller"/>
<!--添加注解驱动-->
<mvc:annotation-driven/>
<!--配置视图解析器-->
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
<property name="prefix" value="/WEB-INF/jsp/"/>
<property name="suffix" value=".jsp"/>
</bean>
<!--放行静态资源-->
</beans>
- spring-service
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!--加载资源文件-->
<context:property-placeholder location="classpath:resource.properties"/>
<!--扫描bean对象-->
<context:component-scan base-package="com.hxx.service.impl"/>
</beans>
- spring-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!--创建一个连接工厂,连接ActiveMQ,ActiveMQConnectionFactory,需要依赖ActiveMQ提供的amq标签-->
<!--amq:connectionFactory是bean的子标签,会在Spring容器中创建一个bean对象,
可以为对象名,类似:<bean id="" class="ActiveMQConnectionFactory"/>-->
<amq:connectionFactory brokerURL="tcp://192.168.254.128:61616"
userName="admin" password="admin" id="amqConnectionFactory"/>
<!--spring管理JMS相关代码的时候,必须依赖jms标签库,Spring-jms提供标签库-->
<!--
定义Spring-jms中的连接工厂对象
CachingConnectionFactory - spring框架提供的连接工厂对象,不能真正访问MOM容器,
类似一个工厂的代理对象,需要提供一个真实工厂,实现MOM容器的连接访问
-->
<bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
<property name="connectionFactory" ref="amqConnectionFactory"/>
<property name="maxConnections" value="10"/>
</bean>
<!--配置有缓存的ConnectionFactory,Session的缓存大小可定制-->
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"/>
<property name="sessionCacheSize" value="3"/>
</bean>
<!--jmsTemplate配置-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!--给定连接工厂-->
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<!--默认目的地命名-->
<property name="defaultDestinationName" value="test-spring"/>
</bean>
</beans>
2.创建项目
spring-activemq-consumer
2.1 添加依赖
<dependencies>
<!--activeMQ-->
<!--ActiveMQ客户端完整jar包依赖-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-context</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--ActiveMQ和Spring整合配置文件标签处理jar包依赖-->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
</dependency>
<!--Spring JMS插件相关的jar包依赖-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<!--Active Pool-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jms-pool</artifactId>
</dependency>
<!-- 单元测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<!-- 日志处理 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<!-- spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
</dependency>
</dependencies>
2.2 整合ActiveMQ
- spring-service.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!--扫描bean对象-->
<context:component-scan base-package="com.hxx.service,com.hxx.listener"/>
</beans>
- spring-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">
<!--创建一个连接工厂,连接ActiveMQ,ActiveMQConnectionFactory,需要依赖ActiveMQ提供的amq标签-->
<!--amq:connectionFactory是bean的子标签,会在Spring容器中创建一个bean对象,
可以为对象名,类似:<bean id="" class="ActiveMQConnectionFactory"/>-->
<amq:connectionFactory brokerURL="tcp://192.168.254.128:61616"
userName="admin" password="admin" id="amqConnectionFactory"/>
<!--spring管理JMS相关代码的时候,必须依赖jms标签库,Spring-jms提供标签库-->
<!--
定义Spring-jms中的连接工厂对象
CachingConnectionFactory - spring框架提供的连接工厂对象,不能真正访问MOM容器,
类似一个工厂的代理对象,需要提供一个真实工厂,实现MOM容器的连接访问
-->
<!--配置有缓存的ConnectionFactory,Session的缓存大小可定制-->
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"/>
<property name="sessionCacheSize" value="3"/>
</bean>
<!--注册监听器-->
<!--
开始注册监听
需要的参数有:
acknowledge:消息确认机制
container-type:simple|default
simple:SimpleMessageListenerContainer最简单的消息监听器容器,只能处理固定数量的JMS会话
default:DefaultMessageListenerContainer是一种用于异步消息监听的管理类,且支持事务
destination-type:目的地类型,使用队列作为目的地,
connection-factory:连接工厂,spring-jms使用的工厂,必须是spring自主创建的
不能使用第三方工具创建工程,如:ActiveMQConnectionFactory
-->
<jms:listener-container acknowledge="auto" container-type="default"
destination-type="queue" connection-factory="cachingConnectionFactory">
<!--
在监听器容器中注册某监听对象,
destination - 设置目的地命名
ref - 指定监听器对象
-->
<jms:listener destination="test-spring" ref="myListener"/>
</jms:listener-container>
</beans>
- 创建MyMessageListener
@Component(value = "myListener")
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
}
}
- 测试
public class TestActiveMQ {
public static void main(String[] args) throws IOException {
ClassPathXmlApplicationContext ac = new ClassPathXmlApplicationContext(new String[]{"classpath:spring-jms.xml"
,"classpath:spring-service.xml"});
ac.start();
System.out.println("spring容器启动");
System.in.read();
}
}
image.png
3.测试整合
需求:
1.在Producer中创建User类
2.将User对象传递到ActiveMQ中
3.在Consumer中获取User对象并在控制台打印
3.1 Producer发送消息
@Service
public class UserServiceImpl implements UserService {
@Autowired
private JmsTemplate jmsTemplate;
@Override
public void addUser(final User user) {
jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
//发送消息
return session.createObjectMessage(user);
}
});
}
}
发送成功
image.png
3.2 Consumer接收消息
- userServiceImpl.java
@Service
public class UserServiceImpl implements UserService {
@Override
public void showUser(User user) {
System.out.println(user);
}
}
- MyMessageListener.java
@Component(value = "myListener")
public class MyMessageListener implements MessageListener {
@Autowired
private UserService userService;
@Override
public void onMessage(Message message) {
Serializable obj = null;
try {
obj = ((ObjectMessage) message).getObject();
} catch (JMSException e) {
e.printStackTrace();
}
User user = (User) obj;
userService.showUser(user);
}
}
image.png
image.png
网友评论