项目需要用到消息队列,就想先了解一下activeMQ这个东西
关于activeMQ
ActiveMQ是Apache出品的,非常流行的消息中间件。
安装
上官网下载对应系统的版本。http://activemq.apache.org/.
启动
windows 直接运行bin目录下activemq.bat
linux 在bin目录下执行./activemq start
目录介绍
-
conf里面是配置文件,重点关注的是activemq.xml(链接端口灯)、jetty.xml(登录地址,端口信息)、jetty-realm.properties(Web控制台需要用户名、密码信息)。。
-
data目录下是ActiveMQ进行消息持久化存放的地方,默认采用的是kahadb,当然我们可以采用leveldb,或者采用JDBC存储到MySQL,或者干脆不使用持久化机制。
-
webapps,注意ActiveMQ自带Jetty提供Web管控台
使用activeMQ
配置maven
根据activeMQ版本增加对应依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.0</version>
</dependency>
消息队列P2P模式
创建生产者队列并发送消息
package active_mq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TestActiveMQ {
ConnectionFactory getConnectionFactory(){
return new ActiveMQConnectionFactory("user","uesr","tcp://localhost:61616");
}
void testMQ() throws JMSException {
//创建Connection
Connection conn = getConnectionFactory().createConnection();
conn.start();
//创建Session
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//创建Destination 目标队列
Destination destination = session.createQueue("firstQueue");
//创建MessageProducer 生产者
MessageProducer messageProducer = session.createProducer(destination);
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//定义消息对象
TextMessage textMessage = session.createTextMessage();
textMessage.setText("Hello,ActiveMQ"+System.currentTimeMillis());
//发送
messageProducer.send(textMessage);
//关闭链接
if (conn != null) {
conn.close();
}
}
public static void main(String[] args) {
TestActiveMQ activeMQ = new TestActiveMQ();
try {
activeMQ.testMQ();
activeMQ.testMQ();
activeMQ.testMQ();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
发送了三条消息
创建消费者并接收消息
package active_mq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TestActiveMQConsumer {
ConnectionFactory getConnectionFactory(){
return new ActiveMQConnectionFactory("user","uesr","tcp://localhost:61616");
}
void testMQ(String name) throws JMSException {
Connection conn = getConnectionFactory().createConnection();
conn.start();
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("firstQueue");
MessageConsumer messageConsumer = session.createConsumer(destination);
//设置listener,封装好了消息轮询
messageConsumer.setMessageListener(message -> {
try {
System.out.println("消息来啦:"+name);
System.out.println(((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
}
public static void main(String[] args) {
TestActiveMQConsumer activeMQ = new TestActiveMQConsumer();
try {
activeMQ.testMQ("xiaohong");
activeMQ.testMQ("xiaolv");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
运行结果:结果每个人可能会不一样。
消息来啦:xiaohong
Hello,ActiveMQ1529400563732
消息来啦:xiaolv
Hello,ActiveMQ1529400563763
消息来啦:xiaohong
Hello,ActiveMQ1529400563788
订阅模式Pub/Sub
一对多通信,发送一条消息,所有订阅了该目标的消费者都会收到消息。
P2P、Pub/Sub在代码上的区别点仅仅在于,目标类型的创建是createQueue or createTopic,其他一切照旧
创建生产者队列并发送消息
代码跟队列基本没差别。只是创建的消息对象时topic
package active_mq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TestActiveMQ {
ConnectionFactory getConnectionFactory(){
return new ActiveMQConnectionFactory("user","uesr","tcp://localhost:61616");
}
void testMQ() throws JMSException {
Connection conn = getConnectionFactory().createConnection();
conn.setClientID("clientFirst");
conn.start();
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//创建消息订阅对象
Destination destination = session.createTopic("topic");
MessageProducer messageProducer = session.createProducer(destination);
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage textMessage = session.createTextMessage();
textMessage.setText("Hello,ActiveMQ"+System.currentTimeMillis());
messageProducer.send(textMessage);
if (conn != null) {
conn.close();
}
}
public static void main(String[] args) {
TestActiveMQ activeMQ = new TestActiveMQ();
try {
activeMQ.testMQ();
activeMQ.testMQ();
activeMQ.testMQ();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
订阅消息
package active_mq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TestActiveMQConsumer {
ConnectionFactory getConnectionFactory(){
return new ActiveMQConnectionFactory("user","uesr","tcp://localhost:61616");
}
void testMQ(String name) throws JMSException {
Connection conn = getConnectionFactory().createConnection();
//设置id后,不在线也不会丢失订阅消息,下次上线的时候就可以获取到消息
conn.setClientID(name);
conn.start();
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("topic");
//订阅方式的差别在这里
MessageConsumer messageConsumer = session.createDurableSubscriber((Topic)destination,name);
messageConsumer.setMessageListener(message -> {
try {
System.out.println("消息来啦:"+name);
System.out.println(((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
}
public static void main(String[] args) {
TestActiveMQConsumer activeMQ = new TestActiveMQConsumer();
try {
activeMQ.testMQ("xiaohong");
activeMQ.testMQ("xiaolv");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
执行结果
消息来啦:xiaolv
Hello,ActiveMQ1529403218938
消息来啦:xiaohong
Hello,ActiveMQ1529403218938
消息来啦:xiaohong
Hello,ActiveMQ1529403218975
消息来啦:xiaolv
Hello,ActiveMQ1529403218975
消息来啦:xiaolv
Hello,ActiveMQ1529403219001
消息来啦:xiaohong
Hello,ActiveMQ1529403219001
网友评论