前言
经过一段说不清楚的历史原因,我们决定使用activemq, 当然我的观点是,不论你当前的需求是什么,公司是小而美的公司,还是厚积薄发,处于中等规模,尽量使用rabbitmq. 因为能使用自定义系列延迟真的是无敌的特性。而activemq的不可靠性, 不要去触碰,这毕竟是一个连我手动测试都能丢失的MQ服务器
好啦 开始吧
- 为我们的开始使用引入必要的依赖(我们假定我们都是spring boot 的无知群众)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--配置activemq pool需要-->
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
<version>1.2.1</version>
</dependency>
<!--配置activemq pool需要-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<!--配置activemq pool需要-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.8.0</version>
</dependency>
- 增加配置
spring:
application:
name: spring-jms-example
activemq:
broker-url: failover:(tcp://127.0.0.1:61616)?initialReconnectDelay=100&timeout=3000
user: admin
password: admin
pool:
enabled: true
max-connections: 3
max-sessions-per-connection: 15
packages:
trust-all: true
jms:
cache:
enabled: true
- 开始编写我们的代码
为了实现我们错误可以接着再次发送, 所以我们得找找原生框架的错误处理类(可以附带原生消息的handler), jms 并没有把这个handler给我们, 所以我们必须代理messageListener.
以下为代理messageListener的代码:
private ResendHandler resendHandler;
private MessagingMessageListenerAdapter delegate;
public ProxyMessageListener(ResendHandler resendHandler, MessagingMessageListenerAdapter messagingMessageListenerAdapter) {
this.resendHandler = resendHandler;
this.delegate = messagingMessageListenerAdapter;
}
@Override
public void onMessage(Message jmsMessage, Session session) throws JMSException {
try {
delegate.onMessage(jmsMessage, session);
} catch (Throwable e) {
session.commit();
//这里必须在原来的session提交后再次触发, 而且保证resendHandler的代码在异步的方法里
//@EnableAsync
resendHandler.doResend(jmsMessage);
throw e;
}
}
在这里说下sleuth 也proxy了代码只不过他proxy了JmsListenerEndpointRegistry,为了不影响sleuth的proxy
所以我们proxy了messageListener, 注释的代码是为了正确拿到JmsListenerEndpointRegistry附加说一句
jms发送的mq header里面附带了 b3信息, 可以传输 trace信息
@EventListener
public void applicationStart(ApplicationReadyEvent applicationReadyEvent) throws Exception {
JmsListenerEndpointRegistry jmsListenerEndpointRegistry = applicationContext.getBean(JmsListenerConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, JmsListenerEndpointRegistry.class);
// if (jmsListenerEndpointRegistry instanceof TracingJmsListenerEndpointRegistry) {
// Field delegate = FieldUtils.getField(TracingJmsListenerEndpointRegistry.class, "delegate", true);
// jmsListenerEndpointRegistry = (JmsListenerEndpointRegistry) delegate.get(jmsListenerEndpointRegistry);
// }
Field listenerContainers = FieldUtils.getField(JmsListenerEndpointRegistry.class, "listenerContainers", true);
Map<String, MessageListenerContainer> listenerContainerMap = (Map<String, MessageListenerContainer>) listenerContainers.get(jmsListenerEndpointRegistry);
if (MapUtils.isNotEmpty(listenerContainerMap)) {
listenerContainerMap.forEach((id, value) -> {
if (value instanceof AbstractMessageListenerContainer) {
AbstractMessageListenerContainer abstractMessageListenerContainer = (AbstractMessageListenerContainer) value;
Object messageListener = abstractMessageListenerContainer.getMessageListener();
if (messageListener instanceof MessagingMessageListenerAdapter && !(
messageListener instanceof ProxyMessageListener)) {
abstractMessageListenerContainer.setMessageListener(new ProxyMessageListener(resendHandler, (MessagingMessageListenerAdapter) messageListener));
}
}
});
}
}
在JmsConf 需要配置topicConnectionFactory 还有发送topic的jmsTemplate
这点和其他的通用不一样:
完整代码