主题一对多
- 生产者将消息发布到topic上,每个消息都可以有多个消费者,属于1对多的关系。
- 生产者和消费者和之间有时间上的相关性,订阅某一个主题的消费者只能消费自他订阅之后发布的消息。
- 生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息。所以,先有消费者才有生产者。
image.png
主题生产者
package com.yd.telnet.modular.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author
* @Date 2020/3/19
*/
public class JmsProduce {
//为什么是tcp,看源码!
public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.105:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException {
// 1、创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// 2、通过连接工厂,获取连接connection
Connection connection = activeMQConnectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话session
// 两个参数,事务、签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建接收的对象(队列还是主题)
// destination目的地(queue队列、topic主题)
//Destination destination = session.createQueue(TOPIC_NAME );
Topic topic = session.createTopic(TOPIC_NAME );
// 6、创建消息生产者
MessageProducer producer = session.createProducer(topic);
// 7、通过使用producer产生三条消息发送到队列里面
for(int i = 0;i<3;i++){
// 逐一创建消息
TextMessage textMessage = session.createTextMessage("msg------------" + i);
// 通过producer发送给mq
producer.send(textMessage);
}
// 8、关闭资源
producer.close();
session.close();
connection.close();
System.out.println("主题生产完成!");
}
}
主题消费者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
* @author
* @Date 2020/3/19
*/
public class JmsConsumer {
public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.105:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException, IOException {
// 1、创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// 2、通过连接工厂,获取连接connection
Connection connection = activeMQConnectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话session
// 两个参数,事务、签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建接收的对象(队列还是主题)
// destination目的地(queue队列、topic主题)
//Destination destination = session.createQueue(QUEUE_NAME);
Topic topic = session.createTopic(TOPIC_NAME);
// 6、创建消息消费者
MessageConsumer consumer = session.createConsumer(topic);
// 7、通过监听的方式来消费消息
consumer.setMessageListener(message -> {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("监听消费者消费消息" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 8、保持控制台不灭
System.in.read();
// 9、关闭资源
consumer.close();
session.close();
connection.close();
}
}
可以尝试先启动生产者再启动消费者
image.png清空记录,先启动消费者再启动生产者
image.png对比一下,看看先有消费者才有生产者是不是符合第二种启动方式。
网友评论