美文网首页
Spring4.x 手动事务,监听处理未关闭事务的几点思路,文末

Spring4.x 手动事务,监听处理未关闭事务的几点思路,文末

作者: 慢黑八 | 来源:发表于2019-05-22 11:20 被阅读0次

    简书 慢黑八
    转载请注明原创出处,谢谢!
    如果读完觉得有收获的话,欢迎点赞加关注

    背景

    由于某项目独特的特色需要手动开启事务。然而,在手动开启事务后,事务能否正常结束 commit or rollback 就出现了各式各样的不确定情况。如果commit or rollback未执行或执行失败,将会导致该事务持有的数据库连接无法正常归还到连接池中。高并发场景下的现象就是连接池中的可用连接越来越少,最后导致获取连接超时的异常。

    以下为手动事务工具类
    @Service
    public class TransactionTool {
    
        //spring注入事务管理对象
        @Resource(name = "transactionManager")
        private PlatformTransactionManager transManager ;
          
        public TransactionStatus getTransSatus(int propagate) {
    
            // TransactionStatus.
            // TransactionDefinition
            // 事务定义
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            // 传播范围
            def.setPropagationBehavior(propagate);
    
            TransactionStatus transactionStatus = transManager.getTransaction(def);
            return transactionStatus;
        }
    }
    
    
    下面是开启事务的业务处理逻辑
    @Service
    public class BizService{
        @Autowired
        TransactionTool transactionTool;
    
        public void bizMethod(){
            //以下代码手动开启事务
            TransactionStatus transactionStatus = null;
    
            try{
                transactionStatus = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
                // ..业务逻辑
                transactionManager.commit(transactionStatus);
            }catch(Exception e){
                transactionManager.rollback(transactionStatus);
            }finally{
                //略掉一些分库分表的特殊处理
            }
        }
    }
    
    主要导致事务没有正常结束的三种场景
    • 场景 1、处理业务逻辑时,抛出的是Error而不是Exceptioncatch接不住,导致rollback不能正常执行,这也意味着事务无法正常回滚,造成连接泄露。
    • 场景 2、处理业务逻辑时,未执行到commitreturn了,这样也会导致了该事务没有正常结束,connection没有正常归还连接池,造成泄露。
    • 场景3、同一个方法中事务双开,双关,按照以下顺序执行
      开启事务1(requires_new)-> 然后开事务2(requires_new) -> 之后提交事务1(commit) -> 在提交事务2(commit)
      事务上下文状态切换如下:
      TS=TransactionStatus      TE=TransactionEvent       T=Transaction
    步骤 事务操作 TransactionSynchronizationManager 挂起\执行
    1 TS1=getTransaction(REQUIRES_NEW)
    publish TE1
    T1(con1)、TE1 挂起 NULL
    2 TS2=getTransaction(REQUIRES_NEW)
    publish TE2
    T2(con2)、TE2 挂起T1,TE1
    3 commit(TS1) TE2执行,同步器清理T2
    解挂步骤1挂起的null事务资源
    执行T1.commit成功
    con1归还连接池
    4 commit(TS2) 当前事务资源为null导致同步器
    事件处理出现异常,导致con2
    不能正常归还到连接池,造成
    连接泄露
    执行 T2.commit失败
    con2泄露

    在开启事务1的时候挂起的事务资源为空,在commit事务1的之后,会解挂当前线程的事务资源为:null,提交事务2时候,如果当前线程的事务资源为null,会抛空指针异常,最后在解绑资源unbindResource()的时候抛出以下代码块中的IllegalStateException异常(遗憾的是,该异常被spring框架捕获后没有打印出来)。最终导致事务2持有的连接不能正常释放。TransactionEvent 会在事务结束的时候执行当前TransactionSynchronizationManager线程本地变量中的synchronizations事件。

    public static Object unbindResource(Object key) throws IllegalStateException {
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        Object value = doUnbindResource(actualKey);
        if (value == null) {
            throw new IllegalStateException(
                    "No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
        }
        return value;
    }
    

    以上3中情况,在自动事务@Transactional的处理逻辑中都不会出现。首先spring-tx都进行了统一封装充分考虑了非正常的可以。其次,在嵌套事务双开的时候,都是先开的事务后关。所以,手动事务一定要遵循先开的事务后关这个原则

    监控解决未关闭事务的几个思路
    • 思路1:采用spring的ApplicationEventPublisher的事件发布监听机制
      订阅@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)事务完成阶段的监听,对“一定时间内”未关闭的事件进行预警,发现后整改。
    • 思路2:在finally中对事务进行统一关闭。
      调整catch的范围,从Exception修改为Throwable捕捉到所有Exception 或者Error的情况,把commit移动到finally中。commit的前置条件是transactionStatus!=null&&transactionStatus.isNewTransaction() && !transactionStatus.isCompleted(),这样会对所有 新建的且未完成的 事务进行commit。如果小伙伴觉得思路2改动方式比较激进,想暂时先观察一下那些服务存在 事务未正常结束 的情况,可以参考思路3
    @Service
    public class BizService{
        @Autowired
        TransactionTool transactionTool;
    
        public void bizMethod(){
            //以下代码手动开启事务
            TransactionStatus transactionStatus = null;
    
            try{
                transactionStatus = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
                // ..业务逻辑
            }catch(Throwable t){
                transactionManager.rollback(transactionStatus);
            }finally{
                //try..catch内容可提炼成公共方法
                try {
                    if (transactionStatus != null && transactionStatus.isNewTransaction() 
                            && !transactionStatus.isCompleted()) {
                        //TODO: arms日志输出 堆栈相关信息
                        transactionManager.commit(transactionStatus);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                //略掉一些分库分表的特殊处理
            }
        }
    }
    
    • 思路3: 在finally中检查未完成的事物并进行预警
      预警的前提条件是transactionStatus!=null&&transactionStatus.isNewTransaction() && !transactionStatus.isCompleted(),这样会对所有 新建的且未完成的 事务进行预警日志信息输出。该思路在finally中增加try..catch块进行检查,对应用程序改动影响较小。
      需要注意的是:这种方式仍然监控不到上文中场景3连接泄露的问题,如果想解决场景3的问题,需要从TransactionStatus中获取事务对象,抽取ConnectionHolder中的数据库Connection,用conn.isClosed()来判断连接是否已经关闭。另外还需要修改DataSourceTransactionManager源码,把内部类DataSourceTransactionObject的访问修饰符从private修改为public
      参考如下代码:
    @Service
    public class BizService{
        @Autowired
        TransactionTool transactionTool;
    
        public void bizMethod(){
            //以下代码手动开启事务
            TransactionStatus transactionStatus = null;
    
            try{
                transactionStatus = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
                // ..业务逻辑
                transactionManager.commit(transactionStatus);
            }catch(Throwable t){
                transactionManager.rollback(transactionStatus);
            }finally{
                //try..catch内容可提炼成公共方法
                try {
                    if (transactionStatus != null && transactionStatus.isNewTransaction()) {
                        if(!transactionStatus.isCompleted()) {
                            // arms日志输出 堆栈相关信息
                            System.out.println("事务未结束原因[事务-未完成]");
                            printStackTrace(Thread.currentThread().getStackTrace());
                        }else {
                            Connection conn = null;
                            DefaultTransactionStatus defaultTransactionStatus = (DefaultTransactionStatus)transactionStatus;
                            if(defaultTransactionStatus.getTransaction().getClass().getClassLoader() == DataSourceTransactionObject.class.getClassLoader()) {
                                conn = ((DataSourceTransactionObject)defaultTransactionStatus.getTransaction()).getConnectionHolder().getConnection();  
                                if(conn != null && conn.isClosed()==false) {
                                    System.out.println("事务未结束原因[连接-未关闭]");
                                    printStackTrace(Thread.currentThread().getStackTrace());
                                }
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                //略掉一些分库分表的特殊处理
            }
        }
    }
    
    接下来说下上面三种思路的可行性

    [X ] 思路1,不可行
    [ok] 思路2,可行
    [ok] 思路3为过渡监控性的解决方案,可行
    [ok] 思路2+思路3为最终解决方案,可行

    思路1中,基于spring事件的发布订阅模式会存在什么问题?

    使用spring的ApplicationEventPublisher的事件发布监听机制
    订阅@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
    事务完成阶段**的监听,对“一定时间内”未关闭的事件进行预警,发现后整改。
    1、改造TransactionTool在执行getTransSatus方法时调用publishTransactionEvent(transactionStatus , propagate)发布包含transactionId 的 "新事务事件" ,然后把需要监控的事务事件存放在aliveTransactionMap中 。

    @Service
    public class TransactionTool {
        private AtomicLong transactionId = new AtomicLong(0);
        // transcatioId,BizTransactionEvent 存储存活的事务事件
        public static ConcurrentHashMap<String, BizTransactionEvent> aliveTransactionMap = 
            new ConcurrentHashMap<String, BizTransactionEvent>();
    
        //spring注入事务管理对象
        @Resource(name = "transactionManager")
        private PlatformTransactionManager transManager ;
    
        @Autowired
        private ApplicationEventPublisher publisher;
        public TransactionStatus getTransSatus(int propagate) {
    
            // TransactionStatus.
            // TransactionDefinition
            // 事务定义
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            // 传播范围
            def.setPropagationBehavior(propagate);
    
            TransactionStatus transactionStatus = transManager.getTransaction(def);
            // 增加事务监听
            publishTransactionEvent(transactionStatus , propagate);
            return transactionStatus;
        }
    
        public void publishEvent(long tid,int propagate) {
            long temp = tid;
            StackTraceElement[] stackTraceElementArray = Thread.currentThread().getStackTrace();
            if(stackTraceElementArray.length>2) {
                if(transactionId.longValue() == Long.MAX_VALUE) {
                    transactionId.compareAndSet(Long.MAX_VALUE, 0);
                }
                BizTransactionEvent bizTransactionEvent = new BizTransactionEvent();
                bizTransactionEvent.setTransactionId(""+temp);
                bizTransactionEvent.setTransactionName(stackTraceElementArray[3].getClassName()+":"
                        +stackTraceElementArray[3].getMethodName()+":"+stackTraceElementArray[3].getLineNumber());
                bizTransactionEvent.setCurrentTimeMillis(System.currentTimeMillis());
                bizTransactionEvent.setStackTraceElement(stackTraceElementArray);
                bizTransactionEvent.setPropagate(propagate);
                System.out.println("[NEWTX"+bizTransactionEvent.getTransactionId()+"]"+bizTransactionEvent.toString());
                publisher.publishEvent(bizTransactionEvent);
                //在这里处理新建的事务操作,可以放入一个map中
                TransactionTool.aliveTransactionMap.put(bizTransactionEvent.getTransactionId(), bizTransactionEvent);
            }
        }
    }
    

    2、增加事物事件类1BizTransactionEvent ,事务监听类BizTransactionEventListener,通过事务commit时候,同步调用标有注解@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)afterCompletion方法把aliveTransactionMap中transactionId对应的事务事件删掉。

    事务事件监听类

    @Component
    public class BizTransactionEventListener {
        @TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
        public void afterCompletion(PayloadApplicationEvent<BizTransactionEvent> event) {
            System.out.println("[NEWTX" + event.getPayload().getTransactionId() + "-REMOVE]  " + event.toString()
                    + "Duration:" + (System.currentTimeMillis() - event.getPayload().getCurrentTimeMillis()) + "ms");
            TransactionTool.aliveTransactionMap.remove(event.getPayload().getTransactionId());
        }
    }
    

    事务事件类

    public class BizTransactionEvent {
        
        private static final int STACK_TRACE_ELEMENT_DEEP = 4;
        private String transactionId;
        private String transactionName;
        private StackTraceElement[] stackTraceElement;
        private long currentTimeMillis;
        private int propagate;
        
        
        public String getTransactionName() {
            return transactionName;
        }
        public void setTransactionName(String transactionName) {
            this.transactionName = transactionName;
        }
    
        
        public int getPropagate() {
            return propagate;
        }
        public void setPropagate(int propagate) {
            this.propagate = propagate;
        }
        public String getTransactionId() {
            return transactionId;
        }
        public void setTransactionId(String transactionId) {
            this.transactionId = transactionId;
        }
    
        public long getCurrentTimeMillis() {
            return currentTimeMillis;
        }
        public void setCurrentTimeMillis(long currentTimeMillis) {
            this.currentTimeMillis = currentTimeMillis;
        }
    
        public StackTraceElement[] getStackTraceElement() {
            return stackTraceElement;
        }
        public void setStackTraceElement(StackTraceElement[] stackTraceElement) {
            this.stackTraceElement = stackTraceElement;
        }
        @Override
        public String toString() {
            return "BizTransactionEvent [transactionId=" + transactionId + ", transactionName=" + transactionName
                    + ", stackTraceElement=" + Arrays.toString(Arrays.copyOf(stackTraceElement, STACK_TRACE_ELEMENT_DEEP))
                    + ", currentTimeMillis=" + currentTimeMillis + ", propagate=" + propagate + "]";
        }
    }
    

    3、我们可以通过监控aliveTransactionMap中的事务事件存活时间来寻找发现事务未关闭的业务代码。

    代码略...
    

    4、我们看下以下逻辑中问题出在哪:

    @Service
    public class BizService{
        @Autowired
        TransactionTool transactionTool;
    
        @Transactional 
        public void bizMethod(){
            //以下代码手动开启事务
            TransactionStatus transactionStatus1 = null;
            TransactionStatus transactionStatus2 = null;
            try{
                transactionStatus1 = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
                // ..业务逻辑
                //transactionManager.commit(transactionStatus1);
            }catch(Exception){
                transactionManager.rollback(transactionStatus1);
            }finally{
                //略掉一些分库分表的特殊处理
            }
    
            try{
                transactionStatus2 = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
                // ..业务逻辑
                transactionManager.commit(transactionStatus2);
            }catch(Exception){
                transactionManager.rollback(transactionStatus2);
            }finally{
                //略掉一些分库分表的特殊处理
            }
        }
    }
    

    事务上下文状态切换如下:
    TS=TransactionStatus      TE=TransactionEvent       T=Transaction

    步骤 事务操作 TransactionSynchronizationManager 挂起\执行
    1 @Transactional TS0=getTransaction(REQUIRESD) T0(con0) 挂起 NULL
    2 TS1=getTransaction(REQUIRES_NEW)
    publish TE1
    T1(con1)、TE1 挂起T0
    3 commit(TS1)被注掉了,不执行 . con1连接泄露
    4 TS2=getTransaction(REQUIRES_NEW)
    publish TE2
    T2(con1)、TE2 挂起T1、TE1
    5 commit(TS2) TE2执行,同步器清理T2
    解挂步骤4的T1、TE1
    执行T2.commit成功
    con2归还连接池
    6 commit(TS0) TE1执行,同步器清理T1
    解挂步骤2的T0
    执行 T0.commit成功
    con0归还连接池

    这种方式的最大问题在于,程序执行完成后,当前线程在事务同步器中仍存在解挂的事务资源(T0),并且事务commit(TS1)没有执行,TE1却被正常执行了,同时aliveTransactionMap中的TE1被移除了,失去了后续的监控基础。
    所以对于手动事务来说,思路1比较失败

    文末彩蛋:简述手动Spring事务处理逻辑

    spring-tx、spring-jdbc中比较重要的四个关键处理类:

    • AbstractPlatformTransactionManager:事务核心处理类,开启事务,挂起/恢复,释放资源等功能
    • DataSourceTransactionManager:数据库操作都有这个类来完成,例如:setAutoCommit,commit,rollback
    • TransactionSynchronizationManager:这里的TransactionSynchronizationManager都是以线程为单位来记录相关的资源息。resources中记录了,key为datasource,value为ConnectionHolder的map结构信息。上文中publisher.publishEvent(bizTransactionEvent)会把事务事件到synchronizations中,后续事务在提交的时候会执行synchronizations中的事件。
    • DefaultTransactionStatus:存放当前事务,挂起的事务资源,事务定义等内容。

    自动事务cglib代理可参考TransactionAspectSupport

    spring事务

    在事务处理的过程中参考如下步骤,偷个懒不画时序图了,大家按照序号,脑补一下

    [package:spring-tx]AbstractPlatformTransactionManager
    1、首先调用getTransaction()方法,获取连接,获取当前事务状态
    4、调用handleExistingTransaction()处理已存在的事务

    • 如果是REQUIRES_NEW就要挂起当前存在事务、创建新事务把挂起的事务资源放入新事务中,并且切换TransactionSynchronizationManager的本地线程变量为新事务相关内容,解绑当前事务资源。
    • 如果是NESTED则需要创建保存点
    • 如果是REQUIRED,创建新把newTransaction设定为false。

    5、挂起资源SuspendedResourcesHolder结构与TransactionSynchronizationManager相同,用于解挂时恢复TransactionSynchronizationManager中的本地线程变量。
    7、调用prepareSynchronization方法,初始化当前线程的事务同步管理器,设置Threadlocal相关内容,并反回新的TransactionStatus对象。


    以下为事务提交后的操作
    8、调用commit方法提交事务。这里会调用processCommit方法,在这个方法中会调用事务事件监听逻辑。通过ApplicationListenerMethodTransactionalAdapter处理各个不同阶段的transactionEvent,需要注意的是待处理的transactionEvent是从TransactionSynchronizationManager.getSynchronizations()当前的本地线程变量中获取的。
    9、cleanupAfterCompletion设置事务状态为完成,清理当前线程TransactionSynchronizationManager资源,解绑connection资源,设置autocommit=true。还原connection属性,回并且把连接归还给连接池。
    10、调用resume()方法还原挂起的资源,继续执行。

    [package:spring-jdbc]DataSourceTransactionManager
    2、调用doGetTransaction() 获取事务对象DataSourceTransactionObject
    3、检索绑定到当前线程(TransactionSynchronizationManager)的资源(ConnectionHolder),把ConnectionHolder放入DataSourceTransactionObject中
    6、调用dobegin开启事务con.setAutoCommit(false);并且修改transactionActive为true。如果连接资源为空则获取新的连接,并且在TransactionSynchronizationManager进行资源绑定。
    8.1、调用doCommit提交事务

    相关文章

      网友评论

          本文标题:Spring4.x 手动事务,监听处理未关闭事务的几点思路,文末

          本文链接:https://www.haomeiwen.com/subject/fqpdzqtx.html