一. 导包 ( pooled-jms 包用最新的即可, 当前 springboot 版本为2.2.1)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
<version>1.0.6</version>
</dependency>
二。 配置项
spring:
activemq:
broker-url: tcp://x.x.x.x:61616
user: admin
password: admin
packages:
trust-all: true
pool:
enabled: true
max-connections: 50
jms:
pub-sub-domain: true
listener:
auto-startup: true
concurrency: 2
max-concurrency: 2
template:
delivery-mode: persistent
priority: 100
qos-enabled: true
receive-timeout: 1000
time-to-live: 36000
三。代码添加配置
@EnableJms
@Configuration
public class ActiveMQConfig {
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(connectionFactory);
// bean.setSessionAcknowledgeMode(4); // 需要手动ack的时候,需要这个配置,否则默认是客户端自动ack的。
return bean;
}
}
四。 发送消息
/**
* 发送消息底层方法
* @param serializable 消息对象,需要实现 serializable 接口
* @param header 传递头,可以为null
* @param destination 队列名字, Destination destination = new ActiveMQQueue("队列名");
*/
private void sendMessage(final Serializable serializable, Map<String, Object> header, Destination destination){
jmsMessagingTemplate.getJmsTemplate().setExplicitQosEnabled(false);
jmsMessagingTemplate.convertAndSend(destination, serializable, header);
}
五。接收消息
/**
* 当需要手动ack的时候,这种监听方法非常有效
* 加上这个注解,配合这个containerFactory能增大服务的吞吐量。
* @param message
* @throws JMSException
*/
@JmsListener(destination = "test.queue?consumer.prefetchSize=0", containerFactory = "jmsListenerContainerQueue")
public void listenHello(Message message) throws JMSException { // 这个 message 类型是javax.jms.Message
log.info("收到消息 : {}", message);
if(message instanceof ActiveMQTextMessage){
// 字符串消息走这个实现
ActiveMQTextMessage newMessage = (ActiveMQTextMessage) message;
}else if(message instanceof ActiveMQObjectMessage){
// 当消息类型为 Object 的时候,走这个读取消息接口
ActiveMQObjectMessage newMessage = (ActiveMQObjectMessage) message;
// testbean为这个队列的消息类型
TestBean testBean = (TestBean) newMessage.getObject();
}
// 手动ack消息,需配合 配置类里的设置才能生效
message.acknowledge();
}
/**
* 当不需要手动ack的时候,这样写能最大化利用服务器资源,加大吞吐量和消息消费效率
* 加上这个注解,配合这个containerFactory能增大服务的吞吐量。
* @param testBean
* @throws JMSException
*/
@JmsListener(destination = "test.queue", containerFactory = "jmsListenerContainerQueue")
public void listenHello(@Payload TestBean testBean) throws JMSException {
// do some service ....
}
/**
* 当异步任务执行时间特别长的时候,需要客户端主动去activemq上主动拉取消息的时候,就得用这个接收方式,
* 这个会阻塞直到activemq有任务。然后才会消费,由于消费的时间会特别长,所以配置自动 ack 即可(默认)。
*/
public void testQueue(){
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> {
while (true) {
try {
Destination destination = new ActiveMQQueue("test.queue?consumer.prefetchSize=0");
jmsMessagingTemplate.getJmsTemplate().setReceiveTimeout(JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT);
// 会阻塞直到队列中有消息
Message<?> message = jmsMessagingTemplate.receive(destination); // 这里的 Message 是 org.springframework.messaging.Message 类。上面的Message是javax.jms.Message
String text = (String) message.getPayload();
log.warn("收到消息 : {}, 休眠一分钟,查看ack状态", text);
Thread.sleep(20000);
}catch (Exception e){
log.warn("[Hello] 任务运行过程中出现异常,10秒后重试");
Thread.sleep(10000);
}
}
});
}
网友评论