一、简单说明
消息队列需要有生产者生产消息、消费者消费消息,下面生产者和消费者代码处理流程是类似的:
- 创建连接工厂
- 创建连接对象
- 创建会话
- 创建点对点接收/发送目标
- 创建生产者/消费者
- 发送/接收消息
默认的 ActiveMQ 连接的服务端口是61616
,使用 tcp 连接,IP 或域名根据服务器实际情况修改。
二、程序
1、依赖引入
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.8</version>
</dependency>
2、生产者
package com.study.mq.a1_example.helloworld.queue;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producer {
public static void main(String[] args) {
new ProducerThread("tcp://192.168.27.133:61616", "queue").start();
}
static class ProducerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ProducerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn = null;
Session session = null;
try {
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
// 2、创建连接对象
conn = connectionFactory.createConnection();
conn.start();
/**
* 3、创建会话
* 第一个参数: 是否支持事务,如果为true,则忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
* 第一个参数为false时,第二个参数的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE、Session.DUPS_ACKNOWLEDGE
* Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常
* Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端收到消息后,必须调用javax.jms.Message的acknowledge
* Session.DUPS_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认
*/
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建点对点发送的目标
Destination destination = session.createQueue(destinationUrl);
// 5、创建生产者消息
MessageProducer producer = session.createProducer(destination);
// 设置生产者的模式,有两种可选 持久化/不持久化
// DeliveryMode.PERSISTENT: 当activemq关闭时,队列数据将会被保存
// DeliveryMode.NON_PERSISTENT: 当activemq关闭饿时候,队列里面的数据将会被清空
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
/**
* 6、创建一条文本消息
* 有6种消息类型:
* BytesMessage 用来传递字节
* MapMessage 用来传递Map
* ObjectMessage 用来传递序列化对象
* StreamMessage 用来传递文件等
* TextMessage 用来传递字符串
*/
String text = "Hello World";
TextMessage message = session.createTextMessage(text);
// 7、发送消息
for (int i = 0; i < 1; i++) {
producer.send(message);
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (session != null) session.close();
if (conn != null) conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
3、消费者
package com.study.mq.a1_example.helloworld.queue;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer {
public static void main(String[] args) {
new ConsumerThread("tcp://192.168.27.133:61616", "queue").start();
}
}
class ConsumerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ConsumerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn = null;
Session session = null;
MessageConsumer consumer = null;
try {
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
// 2、创建连接对象
conn = connectionFactory.createConnection();
conn.start();
// 3、创建会话
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建点对点接收的目标
Destination destination = session.createQueue(destinationUrl);
// 5、创建消费者消息
consumer = session.createConsumer(destination);
// 6、接收消息(没有消息就持续等待)
Message message = consumer.receive();
if (message instanceof TextMessage) {
System.out.println("收到文本消息:" + ((TextMessage) message).getText());
} else {
System.out.println(message);
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (consumer != null) consumer.close();
if (session != null) session.close();
if (conn != null) conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
三、运行
1、启动生产者
打开 WEB 控制台,发现新增了一个名字叫queue
的队列,待消费消息有一条。
2、启动消费者
接收打印队列queue
中的消息
消息已出队
网友评论