如果服务端没有安装activemq可以参考文:https://www.jianshu.com/p/41fe776151e9
如果不是很了解jms的规范可以参考文章:https://www.jianshu.com/p/c6355fafdd6f
1.queue的事务发送和非事务发送
事务queue发送
/**
* @Project: activemq
* @description: 队列的事务发送
* @author: sunkang
* @create: 2018-12-08 22:28
* @ModificationHistory who when What
**/
public class JMSQueueTransactionProducer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.44.129:61616");
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
//当createSession的第一参数为true,表示事务发送,为false表示非事务发送,事务发送必须要commit,
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//创建目的地
Destination destination = session.createQueue("muqueue");
//创建发送者
MessageProducer producer = session.createProducer(destination);
//发送信息
for(int i=0;i<10;i++){
//创建文本信息
TextMessage message = session.createTextMessage("helloworld");
//创建map信息
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("123","123");
mapMessage.setString("456","456");
//创建流对象信息
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("i am stramMessage");
producer.send(message);
producer.send(mapMessage);
producer.send(streamMessage);
}
//事务发送,必须要commit才能发送成功
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
}finally {
if( connection !=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
执行完之后发现控制台的myque多了30条的数据
非事务的queue发送
/**
* @Project: activemq
* @description: 非事务的queue发送
* @author: sunkang
* @create: 2018-12-14 15:23
**/
public class JMSQueueNoTransactionProducer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.44.129:61616");
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
//当createSession的第一参数为true,表示事务发送,为false表示非事务发送
//当非事务发送的时候,第二个参数有三个选择:
// Session.AUTO_ACKNOWLEDGE 表示回话自动确认
// Session.CLIENT_ACKNOWLEDGE 表示需要消费者调用消息的 acknowledge方法进行确认消息,会确认之前的所有的消息,这个对消息提供者无效
// Session.DUPS_OK_ACKNOWLEDGE 表示消费者的消息延迟确认,这个对消息提供者无效
Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
//创建目的地
Destination destination = session.createQueue("muqueue");
//创建发送者
MessageProducer producer = session.createProducer(destination);
//发送信息
for(int i=0;i<10;i++){
//创建文本信息
TextMessage message = session.createTextMessage("helloworld");
//创建map信息
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("123","123");
mapMessage.setString("456","456");
//创建流对象信息
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("i am stramMessage");
producer.send(message);
producer.send(mapMessage);
producer.send(streamMessage);
}
} catch (JMSException e) {
e.printStackTrace();
}finally {
if( connection !=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
2.queue的消息接收
在这里演示通过消费者receive的方式事务接收
和通过消费者设置MessageListener的监听器的方式主动接收消息
消费者receive的方式事务确认消息
/**
* @Project: activemq
* @description: 消费者receive的方式事务接收
* @author: sunkang
* @create: 2018-12-08 22:56
* @ModificationHistory who when What
**/
public class JMSQueueClientReceiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.44.129:61616");
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
//客户端的事务型
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//创建目的地
Destination destination = session.createQueue("muqueue");
//创建发送者
MessageConsumer consumer = session.createConsumer(destination);
//接收消息
while(true){
Message message = consumer.receive();
parseMessageTypeAndPrint(message);
//消息确认
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
}finally {
if( connection !=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
//解析消息的类型,并打印输入消息
public static void parseMessageTypeAndPrint(Message message){
//支持六种对象,接收端需要根据六种对象来判断
if(message instanceof BytesMessage ){
}else if(message instanceof StreamMessage){
StreamMessage streamMessage = (StreamMessage) message;
try {
String str = streamMessage.readString();
System.out.println("streamMessage:"+ str);
} catch (JMSException e) {
e.printStackTrace();
}
}else if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}else if(message instanceof ObjectMessage){
}else if (message instanceof MapMessage){
MapMessage mapMessage = (MapMessage) message;
Enumeration<String> enumerations = null;
try {
enumerations = mapMessage.getMapNames();
while(enumerations.hasMoreElements()){
String key = enumerations.nextElement();
Object value = mapMessage.getObject(key);
System.out.println(key+ ":"+value);
}
} catch (JMSException e) {
e.printStackTrace();
}
}else if( message instanceof Message){
}
}
}
消费者设置监听器的方式手动确认消息
/**
* @Project: activemq
* @description: 消费者设置监听器的方式手动确认消息
* @author: sunkang
* @create: 2018-12-08 23:07
* @ModificationHistory who when What
**/
public class JMSQueueListenerReceiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.44.129:61616");
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
//设置非事务的手动确认模式
final Session session = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);
//创建目的地
Destination destination = session.createQueue("muqueue");
//创建发送者
MessageConsumer consumer = session.createConsumer(destination);
//创建监听的方式接受消息
MessageListener listener =new MessageListener() {
@Override
public void onMessage(Message message) {
parseMessageTypeAndPrint(message);
//消息手动确认接收
try {
message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
};
consumer.setMessageListener(listener);
//让线程阻塞
System.in.read();
} catch (JMSException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if( connection !=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
//解析消息的类型,并打印输入消息
public static void parseMessageTypeAndPrint(Message message){
//支持六种对象,接收端需要根据六种对象来判断
if(message instanceof BytesMessage ){
}else if(message instanceof StreamMessage){
StreamMessage streamMessage = (StreamMessage) message;
try {
String str = streamMessage.readString();
System.out.println("streamMessage:"+ str);
} catch (JMSException e) {
e.printStackTrace();
}
}else if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}else if(message instanceof ObjectMessage){
}else if (message instanceof MapMessage){
MapMessage mapMessage = (MapMessage) message;
Enumeration<String> enumerations = null;
try {
enumerations = mapMessage.getMapNames();
while(enumerations.hasMoreElements()){
String key = enumerations.nextElement();
Object value = mapMessage.getObject(key);
System.out.println(key+ ":"+value);
}
} catch (JMSException e) {
e.printStackTrace();
}
}else if( message instanceof Message){
}
}
}
3.topic的消息发送和消息的topic的消息接收
下面演示了简单的topic的创建的过程,设置了消息的持久化为非持久化,并且消息发送的过程开启了事务
topic的消息发送
/**
* @Project: activemq
* @description: topic的消息发送
* @author: sunkang
* @create: 2018-12-08 22:28
* @ModificationHistory who when What
**/
public class JMSTopicProducer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.44.129:61616");
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//创建目的地
Destination destination = session.createTopic("myTopic");
//创建发送者
MessageProducer producer = session.createProducer(destination);
//设置持久化模型,消息不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//创建需要发送的信息
TextMessage message = session.createTextMessage("Hello myfriend");
//发送信息
producer.send(message);
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
}finally {
if( connection !=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
-
看下控制台的topic输出,发现多了个myTopic的主题,并且message的进队列为的数量为1
-
消息的topic的消息接收
由于topic上的生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息,这会让后来订阅的消费者发送消息的丢失,于是就有了离线的持久化订阅
在这里显示两种消费者的订阅方式 ,一种为非持久化订阅方式,一种为持久化订阅方式,持久化订阅方式只能对pub/sub模型起作用,对p2p模型不起作用,因为p2p模型不存在时间的相关性。
(1)消费者的非持久化订阅接收消息
非持久化订阅方式需要先启动消费者来监听消息,然后在启动消息提供者发送消息,才可以接收到消息
/**
* @Project: activemq
* @description: topic非持久化订阅方式
* @author: sunkang
* @create: 2018-12-08 22:56
* @ModificationHistory who when What
**/
public class JMSTopicReceiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.44.129:61616");
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.setClientID("topicId");
connection.start();
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//创建目的地
Destination destination = session.createTopic("myTopic");
//创建发送者
MessageConsumer consumer = session.createConsumer(destination);
TextMessage textMessagemessage = (TextMessage) consumer.receive();
System.out.println(textMessagemessage.getText());
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
}finally {
if( connection !=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
(2)消费者的持久化订阅接收消息
持久订阅时,客户端向 JMS 服务器注册一个自己身份的 ID,当这个客户端处于离线时,JMS Provider 会为这个 ID 保存所有发送到主题的消息,当客户再次连接到 JMS Provider 时,会根据自己的 ID 得到所有当自己处于离线时发送到主题的消息
需要先启动消费端先往JMS 服务器注册身份id,然后关闭消费端,然后启动消息提供者,然后在启动消费端,这个时候消费者就可以消费到离线的消息了
当启动消息提供者发送消息的时候,看监控中心的变化,此时topicId标识与myTopic进行了关联了
然后启动消费者消费者时,就可以消费离线的消息了
代码入下
/**
* @Project: activemq
* @description: 持久化订阅的接收离线消息方式
* @author: sunkang
* @create: 2018-12-08 22:56
* @ModificationHistory who when What
**/
public class JMSPersistentTopicReceiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.44.129:61616");
Connection connection = null;
try {
connection = connectionFactory.createConnection();
//设置自己的身份id
connection.setClientID("topicId");
connection.start();
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//创建目的地
Topic destination = session.createTopic("myTopic");
//创建持久化订阅
MessageConsumer consumer = session.createDurableSubscriber(destination,"topicId");
TextMessage textMessagemessage = (TextMessage) consumer.receive();
System.out.println(textMessagemessage.getText());
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
}finally {
if( connection !=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
网友评论