美文网首页
Spring+ActiveMQ 事务

Spring+ActiveMQ 事务

作者: 超人也害羞 | 来源:发表于2019-08-26 23:30 被阅读0次

    一、思路

    1. 背景介绍
    2. spring 调用链路 + activemq事务链路介绍

    二、背景介绍

    写这篇文章背景是什么呢?或者说作者碰到了什么问题呢?是这样的,有个需求要在service层执行一些业务逻辑,如果失败了,则会抛出了一个RunTimeException(至于为什么会抛出RunTimeException就不纠结了),与此同时还需要向MQ发送一个Msg便于后续容错处理。而作者碰到的问题就出现了,jdbc的事务随着RunTimeException异常的抛出回滚了,JmsTemplate的事务也回滚了,导致事务发送失败。之前也没有研究过Spring是如何管理MQ的事务,以为它们是独立的,没想到在这里踩了一个坑。没办法,那就debug源码看看。
    下面是spring-jms的配置,注意其中的sessionTransacted属性配置成了true,表示jms的事务由spring托管。

         <bean id="jmsTemplateQueue" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="pooledConnectionFactory"/>
            <property name="pubSubDomain" value="false"/>
            <property name="deliveryPersistent" value="true"/>
            <property name="sessionTransacted" value="true"/>
        </bean>
    

    下面是Service层的方法,由spring管理jdbc的事务,其中的ToleranceProducer使用JmsTemplate发送消息。

    @Resource
    private ToleranceProducer toleranceProducer;
    // 某个service的方法,这个方法由spring管理jdbc的事务。
    public Object xxx(){
            try {
                // .... 一大坨业务          
                transactionManager.commit(transactionForOrder);
            } catch (Exception e) {
                if (e instanceof CustomFailedException) {
                    Object failedObject = e.getFailedObject();
                    toleranceProducer.sendMessage(failedObject);
                    throw e;
                }
                return something;
            }
    }
    

    三、spring 调用链路 + activemq事务链路介绍

    我们知道spring的事务管理机制也是依赖于AOP,就先从这出发看看到底做了啥。
    可以看到TransactionInteracptor也是采用了代理模式,执行真正的业务逻辑之前先开启事务。继续debug看看invokeWithinTransaction中做了什么。

    //来自 org.springframework.transaction.interceptor.TransactionInterceptor
        @Nullable
        public Object invoke(MethodInvocation invocation) throws Throwable {
            Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
            Method var10001 = invocation.getMethod();
            invocation.getClass();
            // 开始失误并执行业务逻辑
            return this.invokeWithinTransaction(var10001, targetClass, invocation::proceed);
        }
    

    ok,这里可以看到spring开启一个新的事务,并调用代码对象执行业务逻辑代码,如果捕获到异常就回滚事务。看看completeTransactionAfterThrowing中发生了什么有趣的事。

        @Nullable
        protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, TransactionAspectSupport.InvocationCallback invocation) throws Throwable {
            // 开启新事务或者从当前线程获取事务
            TransactionAttributeSource tas = this.getTransactionAttributeSource();
            TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null;
            PlatformTransactionManager tm = this.determineTransactionManager(txAttr);
            String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);
            Object result;
            if (txAttr != null && tm instanceof CallbackPreferringPlatformTransactionManager) {
                // 忽略,这里不关心
            } else {
                // 事务开启
                TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
                result = null;
    
                try {
                    // 执行业务逻辑
                    result = invocation.proceedWithInvocation();
                } catch (Throwable var17) {
                    // 如果执行业务代码过程中抛出了异常那么就就行回滚
                    this.completeTransactionAfterThrowing(txInfo, var17);
                    throw var17;
                } finally {
                    this.cleanupTransactionInfo(txInfo);
                }
    
                this.commitTransactionAfterReturning(txInfo);
                return result;
            }
            // 省略其他else了,这里不关心
        }
    

    哈哈,其实也没啥,调用TransactionManager进行事务回滚。继续debug。

        protected void completeTransactionAfterThrowing(@Nullable TransactionAspectSupport.TransactionInfo txInfo, Throwable ex) {
            if (txInfo != null && txInfo.getTransactionStatus() != null) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex);
                }
    
                if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
                    try {
                        // 调用TransactionManager回滚事务
                        txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
                    } catch (TransactionSystemException var6) {
                        this.logger.error("Application exception overridden by rollback exception", ex);
                        var6.initApplicationException(ex);
                        throw var6;
                    } catch (Error | RuntimeException var7) {
                        this.logger.error("Application exception overridden by rollback exception", ex);
                        throw var7;
                    }
                } else {
                    // don't care
                }
            }
        }
    

    调用this.processRollback继续回滚。。。说的我都烦了,贴个栈信息自行观赏。


    Spring回滚调用栈

    ok,省略一大堆回滚的调用栈了,到关键的方法。

        public void afterCompletion(int status) {
            if (this.shouldUnbindAtCompletion()) {
                boolean releaseNecessary = false;
                if (this.holderActive) {
                    this.holderActive = false;
                    TransactionSynchronizationManager.unbindResourceIfPossible(this.resourceKey);
                    this.resourceHolder.unbound();
                    releaseNecessary = true;
                } else {
                    releaseNecessary = this.shouldReleaseAfterCompletion(this.resourceHolder);
                }
    
                if (releaseNecessary) {
                    // 释放事务同步器中的资源
                    this.releaseResource(this.resourceHolder, this.resourceKey);
                }
            } else {
                this.cleanupResource(this.resourceHolder, this.resourceKey, status == 0);
            }
    
            this.resourceHolder.reset();
        }
    

    看到resource中赫然就有我们要找的MQ连接信息。继续看在releaseResource中发生了啥?


    Resource信息

    一系列方法调用后,我们发现最终调用了Spring Jms包中的ConnectionFactory方法,对JMS的事务进行了回滚。

    // from org.springframework.jms.connection.CachingConnectionFactory
            private void logicalClose(Session proxy) throws JMSException {
                // Preserve rollback-on-close semantics.
                if (this.transactionOpen && this.target.getTransacted()) {
                    this.transactionOpen = false;
                    this.target.rollback();
                }
                // Physically close durable subscribers at time of Session close call.
                for (Iterator<Map.Entry<ConsumerCacheKey, MessageConsumer>> it = this.cachedConsumers.entrySet().iterator(); it.hasNext();) {
                    Map.Entry<ConsumerCacheKey, MessageConsumer> entry = it.next();
                    if (entry.getKey().subscription != null) {
                        entry.getValue().close();
                        it.remove();
                    }
                }
                // Allow for multiple close calls...
                boolean returned = false;
                synchronized (this.sessionList) {
                    if (!this.sessionList.contains(proxy)) {
                        this.sessionList.addLast(proxy);
                        returned = true;
                    }
                }
                if (returned && logger.isTraceEnabled()) {
                    logger.trace("Returned cached Session: " + this.target);
                }
            }
    

    到这里为什么JMS事务会随着JDBC的事务回滚了就一目了然了。但是还有一个问题,TransactionSynchronizationManager事务管理器中resource的MQ connection信息是哪儿来的?
    又经过一番debug后,在JmsTransactionManager中终于找到了MQ是如何注册事务到TransactionSynchronizationManager中的。

        @Override
        protected void doBegin(Object transaction, TransactionDefinition definition) {
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                throw new InvalidIsolationLevelException("JMS does not support an isolation level concept");
            }
    
            ConnectionFactory connectionFactory = obtainConnectionFactory();
            JmsTransactionObject txObject = (JmsTransactionObject) transaction;
            Connection con = null;
            Session session = null;
            try {
                JmsResourceHolder resourceHolder;
                if (this.lazyResourceRetrieval) {
                    resourceHolder = new LazyJmsResourceHolder(connectionFactory);
                }
                else {
                    con = createConnection();
                    session = createSession(con);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Created JMS transaction on Session [" + session + "] from Connection [" + con + "]");
                    }
                    resourceHolder = new JmsResourceHolder(connectionFactory, con, session);
                }
                resourceHolder.setSynchronizedWithTransaction(true);
                int timeout = determineTimeout(definition);
                if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                    resourceHolder.setTimeoutInSeconds(timeout);
                }
                txObject.setResourceHolder(resourceHolder);
                            // 注册当前MQ事务到当前线程的事务管理器中。
                TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolder);
            }
            catch (Throwable ex) {
                if (session != null) {
                    try {
                        session.close();
                    }
                    catch (Throwable ex2) {
                        // ignore
                    }
                }
                if (con != null) {
                    try {
                        con.close();
                    }
                    catch (Throwable ex2) {
                        // ignore
                    }
                }
                throw new CannotCreateTransactionException("Could not create JMS transaction", ex);
            }
        }
    

    OK,到此为止,作者之前碰到问题就有了答案了。但是debug过程中对spring事务倒是有研究的兴趣了,下次我们来说Spring事务的细节。

    相关文章

      网友评论

          本文标题:Spring+ActiveMQ 事务

          本文链接:https://www.haomeiwen.com/subject/jkyjectx.html