代码在运行的时候需要先运行一下消费者,相当于你订阅了这个topic,不然接收不到消息。
生产者:
package activemq.test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.jms.*;
@SpringBootApplication
public class Producer{
//连接账号
private static String userName = "admin";
//连接密码
private static String password = "password";
//连接地址
private static String brokerURL = "tcp://localhost:61616";
public static void main(String []args) throws JMSException {
//初始化连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName,password,brokerURL);
//获得连接
Connection conn = connectionFactory.createConnection();
//创建Session,此方法第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//创建队列
Destination dest = session.createTopic("first_topic");
//通过session可以创建消息的生产者
MessageProducer producer = session.createProducer(dest);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);//持久化
//启动连接 ,连接的启动位置和queue略有不同,需要把配置配完,在启动连接
conn.start();
for (int i=0;i<3;i++) {
//初始化一个mq消息
TextMessage message = session.createTextMessage("发送消息id:" + i);
System.out.println("发送消息id:" + i);
//发送消息
producer.send(message);
}
session.commit();
session.close();
//关闭mq连接
conn.close();
}
}
消费者:
package activemq.test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.jms.*;
@SpringBootApplication
public class Consumer{
//连接账号
private static String userName = "admin";
//连接密码
private static String password = "password";
//连接地址
private static String brokerURL = "tcp://localhost:61616";
public static void main(String []args) throws JMSException {
//初始化ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName,password,brokerURL);
//创建mq连接
Connection conn = connectionFactory.createConnection();
//参数名称随便起
conn.setClientID("client_id_1");
//创建会话
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//通过会话创建目标
Topic topic = session.createTopic("first_topic");
//第二个参数随便起
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "name_1");//持久化消息接收
//启动连接
conn.start();
//创建mq消息的消费者
Message msg = subscriber.receive();
while(msg!=null){
onMessage(msg);
msg = subscriber.receive(1000L);
}
session.commit();
session.close();
subscriber.close();
conn.close();
}
public static void onMessage(Message message) throws JMSException {
TextMessage txtMessage = (TextMessage)message;
System.out.println("接受消息id:" + txtMessage.getText());
}
}
可以在后台管理页面查看:

因为我们创建的是topic,所以点击topics,这个对应于生产者的一些信息

点击Subscribers查看消费者的信息:

网友评论