activeMQ消息的持久化:分为两部分说,queue队列部分、topic主题部分;
- queue队列
首先需要设计生产者消息持久化,其他地方无需修改,而且queue模式默认消息持久化。
//创建生产者
messageProducer = session.createProducer(queue);
//设置生产者持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
特点:activeMQ在点对点模式下,生产者开启消息持久化,在服务机后,重启服务,消息仍然存在,仍可消费。
- topic主题
关于activeMQ在发布/订阅模式下的消息持久化,则会有较大的不同。
特点:[1]、先运行消费订阅者,订阅主题topic;[2]、然后生产者发布消息;[3]、无论生产者是否在线,都会接收到消息,不在线的,下次连接时会把没有收到的消息接收下来。
上代码:
1)生产者:
package com.jjclub.activeMQ_01;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
//生产者01
public class Producer_topic01 {
//服务器地址
private static String url = "tcp://localhost:61616";
//主题名称
private static String topicName="queue1";
public static void main(String[] args) {
//创建activemq连接工场
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = null;
Session session =null;
MessageProducer messageProducer =null;
try {
//创建连接connection
connection = activeMQConnectionFactory.createConnection();
//创建连接session;第一个参数为事务,第二个参数为签发机制。暂时选择默认,后续说明;
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建消息目的地topic(主题名称)
Topic topic = session.createTopic(topicName);
//创建生产者
messageProducer = session.createProducer(topic);
//需要在connection.start();前设置消息持久化;
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
//启动连接
connection.start();
//创建消息体(文本消息内容)
TextMessage textMessage = session.createTextMessage("hello");
//发送消息到队列中
messageProducer.send(textMessage);
} catch (JMSException e) {
e.printStackTrace();
}finally {
try {
//关闭消息
messageProducer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
2)消费者需要注意:此时需要将之前的消费者,修改为订阅消费者。
package com.jjclub.activeMQ_01;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import org.apache.activemq.ActiveMQConnectionFactory;
//消费者
public class Consumer_topic01 {
//服务器地址
private static String url = "tcp://localhost:61616";
//主题名称
private static String topicName="queue1";
public static void main(String[] args) {
//创建activemq连接工场
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = null;
Session session =null;
TopicPublisher topicPublisher= null;
try {
//创建连接connection
connection = activeMQConnectionFactory.createConnection();
//创建连接session;第一个参数为事务,第二个参数为签发机制。暂时选择默认,后续说明;
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建消息目的地topic(主题名称)
Topic topic = session.createTopic(topicName);
//设置创建对主题topic的订阅者,第二个参数为标识字段;
topicPublisher = (TopicPublisher) session.createDurableSubscriber(topic, "remark");
//启动连接,需要放置在订阅者之后;
connection.start();
while(true) {
// messageConsumer.receive();此方法会一直等待消息,不会中止进程
// messageConsumer.receive(4000L);等待4s后,若无消息,则中止进程,不再等待
Message message = ((MessageConsumer) topicPublisher).receive();
if(message!=null) {
TextMessage textMessage = (TextMessage) message;
System.out.println("消费的消息是"+textMessage);
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}finally {
try {
//关闭消息
topicPublisher.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
网友评论