回顾下,activeMQ的发布/订阅模式(topic主题)
特点:
1)每一个消息可以有多个消费者,即一对多的关系;
2)生产者与消费者有时间上的相关性,消费者只能消费订阅之后发布的消息;
3)生产者topic不保存消息,当没有消费者时,则视为废消息,所以一般先启动消费者,再启动生产者。
-
简易图如下:
activemq.png
- 代码实现
生产者:
package com.jjclub.activeMQ_01;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
//生产者01
public class Producer_topic01 {
//服务器地址
private static String url = "tcp://localhost:61616";
//主题名称
private static String topicName="queue1";
public static void main(String[] args) {
//创建activemq连接工场
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = null;
Session session =null;
MessageProducer messageProducer =null;
try {
//创建连接connection
connection = activeMQConnectionFactory.createConnection();
//启动连接
connection.start();
//创建连接session;第一个参数为事务,第二个参数为签发机制。暂时选择默认,后续说明;
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建消息目的地topic(主题名称)
Topic topic = session.createTopic(topicName);
//创建生产者
messageProducer = session.createProducer(topic);
//创建消息体(文本消息内容)
TextMessage textMessage = session.createTextMessage("hello");
//发送消息到队列中
messageProducer.send(textMessage);
} catch (JMSException e) {
e.printStackTrace();
}finally {
try {
//关闭消息
messageProducer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
消费者:
同样有两种方式,此处就举例一种:
package com.jjclub.activeMQ_01;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
//消费者
public class Consumer_topic01 {
//服务器地址
private static String url = "tcp://localhost:61616";
//主题名称
private static String topicName="queue1";
public static void main(String[] args) {
//创建activemq连接工场
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = null;
Session session =null;
MessageConsumer messageConsumer =null;
try {
//创建连接connection
connection = activeMQConnectionFactory.createConnection();
//启动连接
connection.start();
//创建连接session;第一个参数为事务,第二个参数为签发机制。暂时选择默认,后续说明;
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建消息目的地topic(主题名称)
Topic topic = session.createTopic(topicName);
//创建消费者
messageConsumer = session.createConsumer(topic);
while(true) {
// messageConsumer.receive();此方法会一直等待消息,不会中止进程
// messageConsumer.receive(4000L);等待4s后,若无消息,则中止进程,不再等待
Message message = messageConsumer.receive();
if(message!=null) {
TextMessage textMessage = (TextMessage) message;
System.out.println("消费的消息是"+textMessage);
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}finally {
try {
//关闭消息
messageConsumer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
网友评论