美文网首页
spring Transactional 深入分析

spring Transactional 深入分析

作者: sschrodinger | 来源:发表于2020-08-06 19:41 被阅读0次

    spring Transactional 深入分析

    sschrodinger

    2020年8月6日


    引用


    JDBC 官方文档 - Using Transactions

    spring AOP源码深度解析 - 掘金 - 智能后端小头

    SpringBoot中@Transactional事务控制实现原理及事务无效问题排查 - CSDN - hanchao5272

    Spring AOP: Spring之面向方面编程 拦截器 MethodInterceptor - CSDN - 洪文识途

    spring boot 版本 - 2.3.1.RELEASE

    连接池版本 - c3p0 - 0.9.5.5

    mysql 版本 - mysql communication - 8.0.21


    开始


    对于 JDBC 来说,事务的创建是通过设置 sql 的自动提交为 false 实现的

    以一个最小化的 Spring boot 文件验证。

    验证代码如下:

    /*
     * 需要 junit 与 spring-jdbc 组件
     */
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class JdbcSimpleDatasourceApplicationTests {
    
        @Test
        public void springDataSourceTest(){
            //输出为true
            System.out.println(dataSource instanceof HikariDataSource);
            try{
                Connection connection = dataSource.getConnection();
                System.out.println(connection.getTransactionIsolation());
                // 设置自动提交为 false
                // 当进行一个新 sql 时,会自动开启一个事务
                connection.setAutoCommit(false);
                System.out.println(connection.getTransactionIsolation());
                Statement statement = connection.createStatement();
                ResultSet resultSet = statement.executeQuery("select * from student");
                while (resultSet.next()) {
                    System.out.println(resultSet.getString("name"));
                }
                statement.execute("insert student values (\"name\", 90, 25)");
                Thread.sleep(1000000);
                statement.close();
                connection.close();
            }catch (Exception exception){
                exception.printStackTrace();
            }
        }
        
    }
    

    在测试代码进入 sleep 后,查看 mysql 的 information_schema.innodb_trx 表(可以查看正在运行的事务),如下:

    mysql> select * from information_schema.innodb_trx\G;
    *************************** 1. row ***************************
                        trx_id: 2568
                     trx_state: RUNNING
                   trx_started: 2020-08-06 14:23:41
         trx_requested_lock_id: NULL
              trx_wait_started: NULL
                    trx_weight: 2
           trx_mysql_thread_id: 43
                     trx_query: NULL
           trx_operation_state: NULL
             trx_tables_in_use: 0
             trx_tables_locked: 1
              trx_lock_structs: 1
         trx_lock_memory_bytes: 1136
               trx_rows_locked: 0
             trx_rows_modified: 1
       trx_concurrency_tickets: 0
           trx_isolation_level: REPEATABLE READ
             trx_unique_checks: 1
        trx_foreign_key_checks: 1
    trx_last_foreign_key_error: NULL
     trx_adaptive_hash_latched: 0
     trx_adaptive_hash_timeout: 0
              trx_is_read_only: 0
    trx_autocommit_non_locking: 0
           trx_schedule_weight: NULL
    1 row in set (0.00 sec)
    
    ERROR:
    No query specified
    

    可以看到,这时已经产生了一个事务。

    接下来,验证事务只与连接有关,即只要一个连接设置了自动提交为 false 的标志位,则不管连接在什么线程中,都只会有一个事务。

    测试代码如下:

    /*
     * 注意测试时需要注意 datasource 的选择,最好选择原生 database,以防止两个线程拿到同一个 connection
     *
     */
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class JdbcSimpleDatasourceApplicationTests {
        @Test
        public void springDataThreadSourceTest() throws InterruptedException, SQLException {
            /*
             * 两个线程共用一个连接
             */
            Connection connection = dataSource.getConnection();
            Thread thread_one = new Thread(()->{
                try {
                    System.out.println(connection.getTransactionIsolation());
                    connection.setAutoCommit(false);
                    System.out.println(connection.getTransactionIsolation());
                    Statement statement = connection.createStatement();
                    ResultSet resultSet = statement.executeQuery("select * from student");
                    //...
                    Thread.sleep(1000000);
                } catch (Exception exception){
                    exception.printStackTrace();
                }
            });
            Thread thread_two = new Thread(()->{
                try {
                    System.out.println(connection.getTransactionIsolation());
                    connection.setAutoCommit(false);
                    System.out.println(connection.getTransactionIsolation());
                    Statement statement = connection.createStatement();
                    ResultSet resultSet = statement.executeQuery("select * from student");
                    //...
                    Thread.sleep(1000000);
                } catch (Exception exception){
                    exception.printStackTrace();
                }
            });
            thread_one.start();
            thread_two.start();
            thread_one.join();
            thread_two.join();
        }
        
        @Test
        public void springDataCOnnectSourceTest() throws InterruptedException {
            /*
             * 两个线程分别用两个连接
             */
            Thread thread_one = new Thread(()->{
                try (Connection connection = dataSource.getConnection();){
                    System.out.println(connection.getTransactionIsolation());
                    connection.setAutoCommit(false);
                    System.out.println(connection.getTransactionIsolation());
                    Statement statement = connection.createStatement();
                    ResultSet resultSet = statement.executeQuery("select * from student");
                    //...
                    Thread.sleep(1000000);
                    connection.commit();
                } catch (Exception exception){
                    exception.printStackTrace();
                }
            });
            Thread thread_two = new Thread(()->{
                try (Connection connection = dataSource.getConnection();){
                    System.out.println(connection.getTransactionIsolation());
                    connection.setAutoCommit(false);
                    System.out.println(connection.getTransactionIsolation());
                    Statement statement = connection.createStatement();
                    ResultSet resultSet = statement.executeQuery("select * from student");
                    //...
                    Thread.sleep(1000000);
                    connection.commit();
                } catch (Exception exception){
                    exception.printStackTrace();
                }
            });
            thread_one.start();
            thread_two.start();
            thread_one.join();
            thread_two.join();
        }
    }
    

    查看 mysql 的 information_schema.innodb_trx 表,可以看到共用同一个连接的只有一个事务,两个连接的有两个事务。

    有两个结论:

    结论

    • 设置自动提交为 false 之后,会开启一个事务(隐式),同时,一个连接只允许存在一个事务,多个连接的事务不共享。
    • 除了设置自动提交外,JDBC 没有其他的手段去创建事务。

    根据这两个结论,提出一些猜测:

    推测

    • spring 在实现动态代理时,会在目标函数前加上取消自动提交的语句,并在目标函数完成之后,进行提交或者处理

    代码形式如下:

    // 目标方法
    public void targetMethod() {
        // do some db op
    }
    
    // 代理方法
    public void proxyMethod() {
        Connection connection = getConnection();
        // 首先设置自动提交为 false
        connection.setAutoCommit(false);
        try {
            targetMethod();
            // 提交
            connection.commit();
        } catch(Exception e) {
            // 回滚
            connection.rollback();
        } finally {
            // 最后需要设置自动提交为 true
            connection.setAutoCommit(true);
        }
        
    }
    

    那接下来看 spring 如何包装这些阶段。


    TransactionManager


    TransactionManager 是一个空接口,只标志其是一个事务管理器,我们直接看他的子接口 PlatformTransactionManager,形式如下:

    public interface PlatformTransactionManager extends TransactionManager {
        
        // 用于开启一个事务
        TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
                throws TransactionException;
        
        // 用于事务提交
        void commit(TransactionStatus status) throws TransactionException;
        
        // 用于事务回滚
        void rollback(TransactionStatus status) throws TransactionException;
    

    以最常见的 DataSourceTransactionManager 举例,我们分析他的实现。

    如下是他的一个 UML 类图,我们分析他的主要流程 PlatformTransactionManager -> AbstractPlatformTransactionManager -> DataSourceTransactionManager

    image.png

    AbstractPlatformTransactionManager 实现了事务的基本模板,首先看 getTransaction() 函数:

    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException {
    
        // 1. 设置传播级别等事务信息定义
        TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    
        // 2. 获得当前事务
        Object transaction = doGetTransaction();
        boolean debugEnabled = logger.isDebugEnabled();
    
        if (isExistingTransaction(transaction)) {
            // 存在事务时按照传播级别处理
            return handleExistingTransaction(def, transaction, debugEnabled);
        }
    
        // Check definition settings for new transaction.
        if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
        }
    
        // 3. 如果不存在事务,TransactionDefinition.PROPAGATION_MANDATORY 需要抛出异常
        if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            // 4. 创建事务挂起记录
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
            }
            try {
                // 5. 挂起事务并执行
                return startTransaction(def, transaction, debugEnabled, suspendedResources);
            }
            catch (RuntimeException | Error ex) {
                // 6. 挂起事务继续执行
                resume(null, suspendedResources);
                throw ex;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + def);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
        }
    }
    

    Note

    • step 1:获得事务的定义 def,即定义的 @Transactional 注解中的信息,包括传播级别等
    • step 2:获得当前事务 transaction
    • step 3:如果不存在当前事务,那么遇到传播级别 PROPAGATION_MANDATORY 直接抛出异常
    • step 4:创建事务挂起记录
    • step 5:根据 def 的传播级别挂起事务,并开始执行
    • step 6:如果异常,则取消挂起

    看如何获得当前事务,doGetTransaction()DataSourceTransactionManager 实现,如下:

    @Override
    protected Object doGetTransaction() {
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        ConnectionHolder conHolder =
                (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }
    

    DataSourceTransactionObject 持有了一个 ConnectionHolderConnectionHolder 又持有了一个 connect,简单的说, DataSourceTransactionObject 持有了一个唯一的连接,因为一个连接可以代表一个事务,因此 doGetTransaction() 就是得到一个 connection,用 connection 代表事务。

    重点看一下 TransactionSynchronizationManager

    TransactionSynchronizationManager 管理着多个ThreadLocal,用于存储 ConnectionHolder 等对象,因此,Spring 中每个线程所获得的 Connection 是独立的。并且每个线程只持有一个 Connection

    image

    通过 ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource()); 函数可以返回一个该线程所对应的唯一一个 connectionThreadLocal 保存的 Map 键为 datasource)。

    然后看创建事务挂起记录,主要两个步骤,第一个步骤是移除 ThreadLocal 中的记录,第二个是将 ThreadLocal 中的记录包装成 suspendedResources,记录 Connection 等信息。

    最后看开始事务的过程,startTransaction() 开始新事务,并返回被挂起的事务信息和当前事务信息。

    private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
            boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
    
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        // 
        DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
        doBegin(transaction, definition);
        prepareSynchronization(status, definition);
        return status;
    }
    
    // in DataSourceTransactionManager
    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;
    
        try {
            if (!txObject.hasConnectionHolder() ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                // 新建一个连接
                Connection newCon = obtainDataSource().getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }
            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();
    
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);
            txObject.setReadOnly(definition.isReadOnly());
    
            // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
            // so we don't want to do it unnecessarily (for example if we've explicitly
            // configured the connection pool to set it already).
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
                // 设置自动提交为 false,开启一个事务
                con.setAutoCommit(false);
            }
    
            prepareTransactionalConnection(con, definition);
            txObject.getConnectionHolder().setTransactionActive(true);
    
            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }
    
            // Bind the connection holder to the thread.
            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
            }
        }
    
        catch (Throwable ex) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, obtainDataSource());
                txObject.setConnectionHolder(null, false);
            }
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }
    

    commit()rollback() 过程与其相似,略。

    总结

    • TransactionManager 使用 ThreadLocal 存储唯一对应的连接
    • 事务被挂起,等效于使用新的连接创建事务,并将旧事务数据保存在返回的新事务数据中
    • 事务执行结束,查看有无旧事务,有则将旧事务的信息存储回 ThreadLocal

    TransactionInterceptor


    spring 动态代理时,会顺序执行拦截器(拦截器实现了around通知),只需要调用 invoke 函数就可以完成所有的代理工作,代理函数伪代码如下:

    public XXInterceptor implements MethodInterceptor {
        // invocation 提供一个方法 getMethod 获得被代理方法
        Object invoke(MethodInvocation invocation) throws Throwable {
            try {
                doSomethingBefore();
                invocation.getMethod().invoke();
                doSomethingAfter();
            } catch (Exception e) {
                doSomethingError();
            } finally {
                doSomethingFinal();
            }
        }
        
        public void doSomethingBefore(){};
        public void doSomethingAfter(){};
        public void doSomethingError(){};
        public void doSomethingFinal(){};
    }
    

    事务使用拦截器 TransactionInterceptor,看部分代码,如下:

    public Object invoke(MethodInvocation invocation) throws Throwable {
        // Work out the target class: may be {@code null}.
        // The TransactionAttributeSource should be passed the target class
        // as well as the method, which may be from an interface.
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    
        // Adapt to TransactionAspectSupport's invokeWithinTransaction...
        return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
    }
    
    
    @Nullable
    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
            final InvocationCallback invocation) throws Throwable {
            //...
        // ptm is instance of TransactionManager
        if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
            // Standard transaction demarcation with getTransaction and commit/rollback calls.
            // 新建事务
            TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
    
            Object retVal;
            try {
                // 执行其他代理函数与被代理函数
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // 执行错误回滚(定义的 exception )或提交(没有在该 exception 定义 rollback)
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                cleanupTransactionInfo(txInfo);
            }
    
            if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                // Set rollback-only in case of Vavr failure matching our rollback rules...
                TransactionStatus status = txInfo.getTransactionStatus();
                if (status != null && txAttr != null) {
                    retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                }
            }
            // 调用 commit 提交
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }
        //...
    }
    

    因此,spring 事务是扩展了代理类的功能,并且使用 TransactionManager 提供了事务的功能。


    事务注解的使用场景


    由以上的分析,可得结论。

    结论

    • jdbc 的事务又由 setAutoCommmit 控制,数据库连接设置值为 false 之后,隐式的变成一个事务
    • spring 使用 TransctionManager 封装事务
    • spring 每个线程持有一个唯一的数据库连接,调用 getTransaction 方法后,会生成一个事务(置 AutoCommmit 为 false
    • 只要在同一个线程中,连接就相同,就是一个事务

    因此,注解写法总结如下:

    1. 同一个类函数相互调用,被调用的函数注解无效。

    举例如下:

    public class TestService {
        @Transactional
        public void a() {
            // do some database op
        }
        
        @Transactional
        public void b() {
            a();
            // do some database op
        }
    }
    

    b() 调用 a()a() 的注解会失效(不能在同一个类,原因是Spring aop 原理)

    2. 不同类函数相互调用,如果都是 Required 注解,没有必要在被调用函数增加注解。

    举例如下:

    @Component
    public class TestService_1 {
        @Transactional(propagation = Propagation.REQUIRED)
        public void a() {
            // do some database op
        }
    
    }
    
    public class TestService_2 {
    @Autowired private TestService_1 service;
        @Transactional(propagation = Propagation.REQUIRED)
        public void b() {
            // do some database op
            service.a();
        }
    
    }
    

    b() 调用 a()a() 处于同一个线程,使用相同连接,则本来就存在事务,如果 a() 的传播级别为 Propagation.REQUIRED,则不需要在 a() 增加注解(提高效率)

    相关文章

      网友评论

          本文标题:spring Transactional 深入分析

          本文链接:https://www.haomeiwen.com/subject/dlptdktx.html