1. JMS是什么
什么是Java消息服务?
Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。
image.png image.png2. 消息头
JMS的消息头属性:
- JMSDestination: 消息目的地
- JMSDeliverymode: 消息持久化模式
- JMSExpiration: 消息过期时间
- JMSPriority: 消息的优先级
- JMSMessageID: 消息的唯一标识符. 后面我们会介绍如何解决幂等性.
消息的生产者可以set这些属性, 消息的消费者可以get 这些属性. 这些属性在 send 方法里面也可以设置
package com.at.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce_topic {
public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("topic_name--" + i);
//******************************** 设置属性 ***************
// 这里可以指定每个消息的目的地
textMessage.setJMSDestination(topic);
/*
持久模式和非持久模式。
一条持久性的消息:应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。
一条非持久的消息:最多会传递一次,这意味着服务器出现故障,该消息将会永远丢失。
*/
textMessage.setJMSDeliveryMode(0);
/*
可以设置消息在一定时间后过期,默认是永不过期。
消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。
如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期。
如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除。
*/
textMessage.setJMSExpiration(1000);
/* 消息优先级,从0-9十个级别,0-4是普通消息5-9是加急消息。
JMS不要求MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级。
*/
textMessage.setJMSPriority(10);
// 唯一标识每个消息的标识。MQ会给我们默认生成一个,我们也可以自己指定。
textMessage.setJMSMessageID("ABCD");
// 上面有些属性在send方法里也能设置
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** TOPIC_NAME消息发送到MQ完成 ****");
}
}
3. 消息体
- 封装具体的消息数据
- 5种消息体格式
- 发送和接受的消息体必须一直对应
5种消息体格式:
-
TextMessage
: 普通字符串消息, 包含一个string -
MapMessage
: 一个Map 类型的消息, key 为string 类型 - BytesMessage : 二进制数组消息, 包含一个 byte []
- StreamMessage: Java数据流消息, 用标准操作来 顺序的填充和读取
- ObjectMessage: 对象消息, 包含一个可序列化的Java对象
消息生产者:
package com.at.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce_topic {
public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
for (int i = 1; i < 4 ; i++) {
// 发送TextMessage消息体
TextMessage textMessage = session.createTextMessage("topic_name--" + i);
messageProducer.send(textMessage);
// 发送MapMessage 消息体。set方法: 添加,get方式:获取
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name", "张三"+i);
mapMessage.setInt("age", 18+i);
messageProducer.send(mapMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** TOPIC_NAME消息发送到MQ完成 ****");
}
}
消息消费者
package com.at.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsummer_topic {
public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer messageConsumer = session.createConsumer(topic);
messageConsumer.setMessageListener( (message) -> {
// 判断消息是哪种类型之后,再强转。
if (null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("****消费者text的消息:"+textMessage.getText());
}catch (JMSException e) {
}
}
if (null != message && message instanceof MapMessage){
MapMessage mapMessage = (MapMessage)message;
try {
System.out.println("****消费者的map消息:"+mapMessage.getString("name"));
System.out.println("****消费者的map消息:"+mapMessage.getInt("age"));
}catch (JMSException e) {
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
4. 消息属性
如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别/去重/重点标注等操作,非常有用的方法。
他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。
消息生产者
package com.at.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce_topic {
public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("topic_name--" + i);
// 调用Message的set*Property()方法,就能设置消息属性。根据value的数据类型的不同,有相应的API。
textMessage.setStringProperty("From","ZhangSan@qq.com");
textMessage.setByteProperty("Spec", (byte) 1);
textMessage.setBooleanProperty("Invalide",true);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** TOPIC_NAME消息发送到MQ完成 ****");
}
}
消息消费者
package com.at.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsummer_topic {
public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer messageConsumer = session.createConsumer(topic);
messageConsumer.setMessageListener( (message) -> {
if (null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("消息体:"+textMessage.getText());
System.out.println("消息属性:"+textMessage.getStringProperty("From"));
System.out.println("消息属性:"+textMessage.getByteProperty("Spec"));
System.out.println("消息属性:"+textMessage.getBooleanProperty("Invalide"));
}catch (JMSException e) {
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
网友评论