ActiveMQ: 本质的好处就是解决异步处理消息的问题。
- toptic模式
消息为广播模式,每一个监听到的队列都可以收到消息。
@JmsListener(destination = "topic.receivemsg", containerFactory = "jmsListenerContainerMsgTopic", concurrency = "5")
- queue模式
消息为竞争消费模式,只有竞争到消息的队列可以消费,即使多个监听者监听同一个队列,最终也只有一个消费者可以消费。
// 配置
@JmsListener(destination = "${lly.queue.sendMsgInfo}" + "${spring.profiles.active}", containerFactory = "jmsListenerContainerQueue", concurrency = "5")
package com.xes.ops.insight.plus.mp;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.concurrent.TimeUnit;
public class Sender {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"system",
"manager",
"tcp://203.195.176.21:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("first");
MessageProducer messageProducer = session.createProducer(destination);
for (int i = 0; i < 100; ++i) {
TextMessage textMessage = session.createTextMessage("这是消息:" + i);
messageProducer.send(destination, textMessage);
TimeUnit.SECONDS.sleep(1);
}
System.out.println("ok");
}
}
import javax.jms.*;
import java.util.concurrent.TimeUnit;
public class Consumer {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"system",
"manager",
"tcp://203.195.176.21:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("first");
MessageConsumer consumer = session.createConsumer(destination);
for (int i = 0; i < 100; ++i) {
Message message = consumer.receive();
System.out.println(message);
TimeUnit.SECONDS.sleep(1);
}
}
}
- 消息的消费者接收消息可以采用两种方式
- consumer.receive() 或 consumer.receive(int timeout);
- 注册一个MessageListener.
采用第一种方式,消息的接收者会一直等待下去,知道有消息到达或者超时。后一种方式会注册一个监听器,
当有消息到达的时候,会调用它的onMessage()方法。以下举例说明:
MessageConsumer consumer=session.createConsumer(queue);
consumer.setMessageListener(new MessageListener(){
public void onMessage(Message msg)
{
System.out.println("接收到的消息为+"((TextMessage)msg).getText());
}
})
- Java jms连接ActiveMQ密码问题
参考
1 JAVA JMS 连接 ActiveMQ,帐号密码错误都可以登录的原因以及解决方法
2 Configuring advanced settings for ActiveMQ
3 ActiveMQ权限配置
网友评论