美文网首页
spring activemq 队列,广播,延迟队列,jacks

spring activemq 队列,广播,延迟队列,jacks

作者: cuber2simple | 来源:发表于2020-12-29 17:50 被阅读0次

前言

 经过一段说不清楚的历史原因,我们决定使用activemq, 当然我的观点是,不论你当前的需求是什么,公司是小而美的公司,还是厚积薄发,处于中等规模,尽量使用rabbitmq. 因为能使用自定义系列延迟真的是无敌的特性。而activemq的不可靠性,  不要去触碰,这毕竟是一个连我手动测试都能丢失的MQ服务器

好啦 开始吧

  1. 为我们的开始使用引入必要的依赖(我们假定我们都是spring boot 的无知群众)
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <!--配置activemq pool需要-->
        <dependency>
            <groupId>org.messaginghub</groupId>
            <artifactId>pooled-jms</artifactId>
            <version>1.2.1</version>
        </dependency>

        <!--配置activemq pool需要-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>

        <!--配置activemq pool需要-->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.8.0</version>
        </dependency>
  1. 增加配置
spring:
  application:
    name: spring-jms-example
  activemq:
    broker-url: failover:(tcp://127.0.0.1:61616)?initialReconnectDelay=100&timeout=3000
    user: admin
    password: admin
    pool:
      enabled: true
      max-connections: 3
      max-sessions-per-connection: 15
    packages:
      trust-all: true
  jms:
    cache:
      enabled: true
  1. 开始编写我们的代码
    为了实现我们错误可以接着再次发送, 所以我们得找找原生框架的错误处理类(可以附带原生消息的handler), jms 并没有把这个handler给我们, 所以我们必须代理messageListener.
    以下为代理messageListener的代码:
    private ResendHandler resendHandler;

    private MessagingMessageListenerAdapter delegate;

    public ProxyMessageListener(ResendHandler resendHandler, MessagingMessageListenerAdapter messagingMessageListenerAdapter) {
        this.resendHandler = resendHandler;
        this.delegate = messagingMessageListenerAdapter;
    }

    @Override
    public void onMessage(Message jmsMessage, Session session) throws JMSException {
        try {
            delegate.onMessage(jmsMessage, session);
        } catch (Throwable e) {
            session.commit();
            //这里必须在原来的session提交后再次触发, 而且保证resendHandler的代码在异步的方法里
            //@EnableAsync
            resendHandler.doResend(jmsMessage);
            throw e;
        }
    }

在这里说下sleuth 也proxy了代码只不过他proxy了JmsListenerEndpointRegistry,为了不影响sleuth的proxy
所以我们proxy了messageListener, 注释的代码是为了正确拿到JmsListenerEndpointRegistry附加说一句
jms发送的mq header里面附带了 b3信息, 可以传输 trace信息

    @EventListener
    public void applicationStart(ApplicationReadyEvent applicationReadyEvent) throws Exception {
        JmsListenerEndpointRegistry jmsListenerEndpointRegistry = applicationContext.getBean(JmsListenerConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, JmsListenerEndpointRegistry.class);
//        if (jmsListenerEndpointRegistry instanceof TracingJmsListenerEndpointRegistry) {
//            Field delegate = FieldUtils.getField(TracingJmsListenerEndpointRegistry.class, "delegate", true);
//            jmsListenerEndpointRegistry = (JmsListenerEndpointRegistry) delegate.get(jmsListenerEndpointRegistry);
//        }
        Field listenerContainers = FieldUtils.getField(JmsListenerEndpointRegistry.class, "listenerContainers", true);
        Map<String, MessageListenerContainer> listenerContainerMap = (Map<String, MessageListenerContainer>) listenerContainers.get(jmsListenerEndpointRegistry);
        if (MapUtils.isNotEmpty(listenerContainerMap)) {
            listenerContainerMap.forEach((id, value) -> {
                if (value instanceof AbstractMessageListenerContainer) {
                    AbstractMessageListenerContainer abstractMessageListenerContainer = (AbstractMessageListenerContainer) value;
                    Object messageListener = abstractMessageListenerContainer.getMessageListener();
                    if (messageListener instanceof MessagingMessageListenerAdapter && !(
                            messageListener instanceof ProxyMessageListener)) {
                        abstractMessageListenerContainer.setMessageListener(new ProxyMessageListener(resendHandler, (MessagingMessageListenerAdapter) messageListener));
                    }
                }
            });
        }
    }

在JmsConf 需要配置topicConnectionFactory 还有发送topic的jmsTemplate
这点和其他的通用不一样:
完整代码

相关文章