美文网首页
Spring多数据源开启事务

Spring多数据源开启事务

作者: 机方尼 | 来源:发表于2021-05-01 11:36 被阅读0次

背景

项目中一个service中需要更新两个数据源中的数据,并且业务逻辑比较复杂。如果不加事务的话,一旦程序报错容易产生脏数据,处理起来比较麻烦。
考虑到此项目为单体应用,不涉及到分布式。所以没有采用分布式事务来解决此次问题。此次采用的方案是使用AOP对需要开启多数据源的方法前后进行加强。

多数据源开启事务参考了 https://www.cnblogs.com/shuaiandjun/p/8667815.html

spring + mybatis开启事务的方法

单个数据库开启事务,我们只需要在方法上方加@Transactional注解即可,但是多数据源这个注解是不好使的,只能管理一个数据库的事务。下面来分析一下spring 加 mybatis 是如何管理事务的
根据配置文件中配置的

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="dataSource" />
</bean>

我们从org.springframework.jdbc.datasource.DataSourceTransactionManager 这个类入手,首先看下这个类中有哪些方法

图片.png
根据以往看源码的经验,一般do...的方法是真正处理逻辑的。直接上源码吧,
@Override
protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    ConnectionHolder conHolder =
            (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

我们看到这个方法中只是设置了一些属性,并没有开启事务,

图片.png
通过idea我们找到调用此方法的地方,是DataSourceTransactionManager的父类方法org.springframework.transaction.support.AbstractPlatformTransactionManager
而这个方法是spring容器提供的抽象方法,里面的getTransaction(TransactionDefinition definition)调用了上述方法
@Override
// TransactionDefinition 这个参数为事务的属性信息 其中包括 PROPAGATION:事务传播等级 ISOLATION 事务隔离级别
    public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
        // 这里是调用了我们配置文件中配置的DataSourceTransactionManager 中的doGetTransaction();
        Object transaction = doGetTransaction();

        // Cache debug flag to avoid repeated checks.
        boolean debugEnabled = logger.isDebugEnabled();

        if (definition == null) {
            // Use defaults if no transaction definition given.
     // 这里创建了默认的事务属性信息
            definition = new DefaultTransactionDefinition();
        }

        if (isExistingTransaction(transaction)) {
            // Existing transaction found -> check propagation behavior to find out how to behave.
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }

        // Check definition settings for new transaction.
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }

        // No existing transaction found -> check propagation behavior to find out how to proceed.
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
     // 校验如果事务传播级别如果是 
//REQUIRED:支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。 
//REQUIRES_NEW:新建事务,如果当前存在事务,把当前事务挂起。 
//NESTED:支持当前事务,如果当前事务存在,则执行一个嵌套事务,如果当前没有事务,就新建一个事务。
// 这三种的话就需要开启事务了
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException ex) {
                resume(null, suspendedResources);
                throw ex;
            }
            catch (Error err) {
                resume(null, suspendedResources);
                throw err;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + definition);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }

这个方法中调用了 doBegin(transaction, definition)方法,而此方法在DataSourceTransactionManager这个方法中重写了,也就是刚刚我们分析到的方法。根据我们的经验应该是在此方法中打开事务的,那就直接上源码吧

    protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
            if (!txObject.hasConnectionHolder() ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                Connection newCon = this.dataSource.getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }

            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();

            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);

            // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
            // so we don't want to do it unnecessarily (for example if we've explicitly
            // configured the connection pool to set it already).
    // 我们终于看到了在这里将 autocommit设置为了 false 也就是开启了事务
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
                con.setAutoCommit(false);
            }

            prepareTransactionalConnection(con, definition);
            txObject.getConnectionHolder().setTransactionActive(true);

            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }
                        // 将连接绑定到当前线程
            // Bind the connection holder to the thread.
            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
            }
        }

        catch (Throwable ex) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, this.dataSource);
                txObject.setConnectionHolder(null, false);
            }
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }

通过对源码的分析,得出想要手动设置开启事务,需要获取到spring容器中 DataSourceTransactionManager对象,调用它的getTransaction方法即可。
mybatis会将事务绑定到当前线程,也就是开启事务后当前线程会一直持有数据库连接,当前线程对数据库的操作都是在一个事务中。直到事务回滚或者提交。


编写切面方法

了解到如何开启事务,就可以自己动手编写多数据源开启事务了。

  1. 首先我们需要获取spring容器中的DataSourceTransactionManager对象。
  2. 创建注解类
  3. 编写切面方法
// 注解类
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MultiTransactionAnno {
}
/**
 * @author :wang.j.f
 * @description:TODO
 * @date :Created in 2021/4/28 15:53
 * @modified By:
 * @version: 1.0$
 */
@Aspect
@Component
public class MultiTransactionalAspect  {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private static final String[] transactionManagerNames = {"transactionManager", "transactionManager2"};

    @Pointcut("@annotation(com.tianma.service.aop.MultiTransactionAnno)")
    public void transactionAnno(){}

    @Around(value = "transactionAnno()")
    public Object menageTransaction(ProceedingJoinPoint joinPoint){
        Object result = null;
        Stack<DataSourceTransactionManager> dataSourceTransactionManagers = new Stack<>();
        Stack<TransactionStatus> transactionStatuses = new Stack<>();
        openTransaction(dataSourceTransactionManagers, transactionStatuses);
        try {
            result = joinPoint.proceed();
            commit(dataSourceTransactionManagers, transactionStatuses);
        }catch (Throwable e){
            logger.error("异常", e);
            rollback(dataSourceTransactionManagers, transactionStatuses);
        }

        return result;
    }

    /**
     * 开启事务
     * @param dataSourceTransactionManagers
     * @param transactionStatuses
     * @return
     */
    private void openTransaction(Stack<DataSourceTransactionManager> dataSourceTransactionManagers,
                                    Stack<TransactionStatus> transactionStatuses){

        // 开启事务并将transactionManager 和 transactionStatus 入栈
        for (String transactionManagerName : transactionManagerNames) {
            DataSourceTransactionManager dataSourceTransactionManager = (DataSourceTransactionManager) SpringContextUtil.getBean(transactionManagerName);
            TransactionStatus status = dataSourceTransactionManager.getTransaction(new DefaultTransactionDefinition());
            transactionStatuses.push(status);
            dataSourceTransactionManagers.push(dataSourceTransactionManager);
            logger.info("事务开启成功:{}", transactionManagerName);
        }
    }

    /**
     * 提交栈中事务
     * @param dataSourceTransactionManagerStack
     * @param transactionStatuStack
     */
    private void commit(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack,
                        Stack<TransactionStatus> transactionStatuStack){
        while (!dataSourceTransactionManagerStack.isEmpty()){
            dataSourceTransactionManagerStack.pop().commit(transactionStatuStack.pop());
            logger.info("事务提交成功!");
        }
    }

    /**
     * 回滚栈中事务
     * @param dataSourceTransactionManagerStack
     * @param transactionStatuStack
     */
    private void rollback(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack,
                          Stack<TransactionStatus> transactionStatuStack){
        while (!dataSourceTransactionManagerStack.isEmpty()){
            dataSourceTransactionManagerStack.pop().rollback(transactionStatuStack.pop());
            logger.error("事务回滚!");
        }
    }
}

遗留问题

利用此方法是可以解决,两个事务一同提交和一同回滚的问题。但是由于多个事务提交和回滚是有先后顺序的,当首先执行的事务提交或者回滚成功,而后面事务失败时,同样会产生两个数据库数据不一致的情况。
所以,当对数据一致性要求很高的业务场景,还是尽量使用现有的分布式事务解决方案来管理事务比较稳妥。

相关文章

网友评论

      本文标题:Spring多数据源开启事务

      本文链接:https://www.haomeiwen.com/subject/gmfbrltx.html