美文网首页
Spring 事务扩展机制 TransactionSynchro

Spring 事务扩展机制 TransactionSynchro

作者: 小波同学 | 来源:发表于2022-11-15 00:00 被阅读0次

    在进行数据库操作的时候,如果需要多个操作要么一起成功,要么一起失败那么就需要使用事务操作了。使用 Spring 框架只需要在方法上添加 @Transactional 注解这个方法就具有事务特性了。而且 Spring 也事务操作给开发者提供了很方便的扩展。

    1、TransactionSynchronizationManager

    操作多个方法 Spring 是如何来进行事务处理的呢?Spring 对于事务的管理都是基于 TransactionSynchronizationManager,下面我们就来简单的分析一下这个类。

    TransactionSynchronizationManager.java

    public abstract class TransactionSynchronizationManager {
        // 线程私有事务资源
        private static final ThreadLocal<Map<Object, Object>> resources =
                new NamedThreadLocal<>("Transactional resources");
    
        // 事务同步
        private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
                new NamedThreadLocal<>("Transaction synchronizations");
    
        // 当前事务的名称
        private static final ThreadLocal<String> currentTransactionName =
                new NamedThreadLocal<>("Current transaction name");
    
        // 当前事务是否只读
        private static final ThreadLocal<Boolean> currentTransactionReadOnly =
                new NamedThreadLocal<>("Current transaction read-only status");
    
        // 当前事务的隔离级别
        private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
                new NamedThreadLocal<>("Current transaction isolation level");
    
        // 实际事务是否激活
        private static final ThreadLocal<Boolean> actualTransactionActive =
                new NamedThreadLocal<>("Actual transaction active");
    }
    

    这个对象里面通过 ThreadLocal 保存了线程需要状态以及资源对象。

    • resources:保存连接资源,因为一个方法里面可能包含两个事务(比如事务传播特性为:TransactionDefinition#PROPAGATION_REQUIRES_NEW),所以就用 Map 来保存资源.
    • synchronizations:在进行数据库操作的时候,如果需要多个操作要么一起成功,要么一起失败那么就需要使用事务操作了。使用 Spring 框架只需要在方法上添加 @Transactional 注解这个方法就具有事务特性了。而且 Spring 也事务操作给开发者提供了很方便的扩展。:线程同步器,这个就是对 Spring 事务的扩展,通过 TransactionSynchronizationManager#registerSynchronization 来注册,我们稍后来分析这个对象在进行数据库操作的时候,如果需要多个操作要么一起成功,要么一起失败那么就需要使用事务操作了。使用 Spring 框架只需要在方法上添加 @Transactional 注解这个方法就具有事务特性了。而且 Spring 也事务操作给开发者提供了很方便的扩展。
    • currentTransactionReadOnly:用于保存当前事务是否只读
    • 在进行数据库操作的时候,如果需要多个操作要么一起成功,要么一起失败那么就需要使用事务操作了。使用 Spring 框架只需要在方法上添加 @Transactional 注解这个方法就具有事务特性了。而且 Spring 也事务操作给开发者提供了很方便的扩展。
    • currentTransactionName:用于保存当前事务名称,默认为空
    • currentTransactionIsolationLevel:用来保存当前事务的隔离级别
    • actualTransactionActive:用于保存当前事务是否还是 Active 状态

    下面我们来分析一下事务操作对于连接资源的处理,也就是事务处理中 TransactionSynchronizationManager 对于资源(resources 属性)的管理。

    2、Spring 事务处理

    TransactionSynchronizationManager#bindResource绑定连接资源到 TransactionSynchronizationManager 中的 resources 属性当中。下面是绑定时序图:


    TransactionInterceptor 是 Spring 对于事务方法处理的代理入口,里面对 JDBC 事务的抽象:

    // 获取连接
    Connection conn = DataSource.getConnection();
    
    // 设置自动提交 false
    conn..setAutoCommit(false);
    
    try {
        // 业务操作
        doSomething();
    } catch (Exception e) {
        // 回滚事务
        conn.rollback();
    }
    // 提交事务
    conn.commit();
    

    TransactionAspectSupport#invokeWithinTransaction

    TransactionAspectSupport#invokeWithinTransaction 是 Spring 对处理的处理。下面我们来大概分析一下它的处理过程:


    上面的代码逻辑如下:

    • TransactionAttributeSource#getTransactionAttribute 获取事务相关的信息(TransactionAttribute),以注解型事务为例,看方法获取类上有没有标注@Transactional注解。
    • 获取到 Spring 容器中配置的事务管理器 (PlatformTransactionManager),后面就是真正的事务处理
    • 创建事务信息(TransactionInfo),里面包含事务管理器(PlatformTransactionManager) 以及事务相关信息(TransactionAttribute)
    • 后面就是 Spring 对于事务的抽象操作,包含设置自动提交 false、业务操作、异常回滚事务和正常就提交事务

    回到正题, Spring 通过创建事务信息(TransactionInfo),把数据库连接通过 TransactionSynchronizationManager#bindResource 绑定到 ThreadLocal 变量当中。然后标注到一个事务当中的其它数据库操作就可以通过TransactionSynchronizationManager#getResource 获取到这个连接。

    数据库的事务是基于连接的,Spring 对于多个数据库操作的事务实现是基于 ThreadLocal。所以在事务操作当中不能使用多线程。

    3、Spring 事务的扩展 – TransactionSynchronization

    在上面的 TransactionSynchronizationManager 类中我们知道,事务操作的时候它的当前线程还保存了 TransactionSynchronization 对象。而这个对象伴随着 Spring 对 事务处理的各个生命周期都会有相应的扩展。

    TransactionSynchronization.java

    public interface TransactionSynchronization extends Flushable {
    
        /** 事务提交状态 */
        int STATUS_COMMITTED = 0;
    
        /** 事务回滚状态 */
        int STATUS_ROLLED_BACK = 1;
    
        /**系统异常状态 */
        int STATUS_UNKNOWN = 2;
    
        void suspend();
    
        void resume();
    
        void flush();
    
        // 事务提交之前
        void beforeCommit(boolean readOnly);
    
        // 事务成功或者事务回滚之前
        void beforeCompletion();
    
        // 事务成功提交之后
        void afterCommit();
    
        // 操作完成之后(包含事务成功或者事务回滚)
        void afterCompletion(int status);
    
    }
    

    事务的事务扩展项目中的应用场景是当订单成功之后,发送一条消息到 MQ 当中去。由于事务是和数据库连接相绑定的,如果把发送消息和数据库操作放在一个事务里面。当发送消息时间过长时会占用数据库连接,所以就要把数据库操作与发送消息到 MQ 解耦开来。可以利用 TransactionSynchronization#afterCommit 的这个方法,当数据成功保存到数据库并且事务提交了就把消息发送到 MQ 里面。

    @Transactional
    public void finishOrder(Order order){
        // 修改订单成功
        updateOrderSuccess(order);
    
        // 发送消息到 MQ
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter(){
           @Override
           public void afterCommit() {
               mqService.send(order);
           }
        });
    }
    

    当事务成功提交之后,就会把消息发送给 MQ,并且不会占用数据库连接资源。

    4、Spring 事务扩展 – @TransactionalEventListener

    在 Spring framework 4.2 之后还可以使用@TransactionalEventListener处理数据库事务提交成功后再执行操作。这种方式比 TransactionSynchronization 更加优雅。它的使用方式如下:

    @Transactional
    public void finishOrder(Order order){
        // 修改订单成功
        updateOrderSuccess(order);
    
        // 发布 Spring Event 事件
        applicationEventPublisher.publishEvent(new MyAfterTransactionEvent(order));
    }
    
    @Slf4j
    @Component
    private static class MyTransactionListener {
        @Autowired
        private MqService mqService;
    
        @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
        private void onHelloEvent(MyAfterTransactionEvent event) {
            Order order = event.getOrder();
            mqService.send(order);
        }
    }
    
    // 定一个事件,继承自ApplicationEvent 
    private static class MyAfterTransactionEvent extends ApplicationEvent {
    
        private Order order;
    
        public MyAfterTransactionEvent(Object source, Order order) {
            super(source);
            this.order = order;
        }
    
        public Order getOrder() {
            return order;
        }
    }
    

    它的实现原理是当 Spring Bean 的方法标注了通过 TransactionalEventListenerFactory#createApplicationListener创建 ApplicationListenerMethodTransactionalAdapter 然后在事件回调当中创建 TransactionSynchronization的实现类TransactionSynchronizationEventAdapter。并且通过 TransactionSynchronizationManager.registerSynchronization
    把 TransactionSynchronizationEventAdapter 注册到当前线程当中。

    TransactionSynchronizationEventAdapter

    private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter {
    
        private final ApplicationListenerMethodAdapter listener;
    
        private final ApplicationEvent event;
    
        private final TransactionPhase phase;
    
        public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener,
                ApplicationEvent event, TransactionPhase phase) {
    
            this.listener = listener;
            this.event = event;
            this.phase = phase;
        }
    
        @Override
        public int getOrder() {
            return this.listener.getOrder();
        }
    
        @Override
        public void beforeCommit(boolean readOnly) {
            if (this.phase == TransactionPhase.BEFORE_COMMIT) {
                processEvent();
            }
        }
    
        @Override
        public void afterCompletion(int status) {
            if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
                processEvent();
            }
            else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
                processEvent();
            }
            else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
                processEvent();
            }
        }
    
        protected void processEvent() {
            this.listener.processEvent(this.event);
        }
    }
    

    上面就是使用@TransactionalEventListener处理数据库事务提交成功后再执行操作的原理。

    参考:
    https://www.cnblogs.com/kkkfff/p/13778692.html

    https://blog.csdn.net/guntun8987/article/details/125022167

    相关文章

      网友评论

          本文标题:Spring 事务扩展机制 TransactionSynchro

          本文链接:https://www.haomeiwen.com/subject/bpgrxdtx.html