tcc-transaction源码分析与思考

作者: 土豆肉丝盖浇饭 | 来源:发表于2018-04-27 23:39 被阅读2365次

    tcc介绍

    tcc是分布式事务的一种解决方案,即Try,Commit,Cancel

    Try: 尝试执行业务

    完成所有业务检查(一致性)
    
    预留必须业务资源(准隔离性)
    

    Confirm: 确认执行业务

    真正执行业务
    
    不作任何业务检查
    
    只使用Try阶段预留的业务资源
    
    Confirm操作满足幂等性
    

    Cancel: 取消执行业务

    释放Try阶段预留的业务资源
    
    Cancel操作满足幂等性
    

    本文我会讲解一个实现tcc思想的框架,tcc-transaction


    image.png

    在github还是比较火的,并且应该也有公司生产使用,熟悉它的源码,一方面可以提升自己眼界,扩宽自己编码能力,另一方面,以后需要使用它的时候,难免有坑需要修改,也能为开源贡献一份力量

    tcc-transaction使用

    在tcc-transaction的tcc-transaction-tutorial-sample模块,做相关配置运行即可

    tcc-transaction原理

    这边我们主要讲解tcc-transaction和dubbo的整合
    tcc-transaction的主要原理是Aop,那么以后面试的时候,问用Aop做什么,就可以回答这个了,再把tcc讲一下,完美的连招
    作为一个处理分布式事务的框架,先来讲下tcc-transaction的事务抽象

    Transaction

    tcc的事务,并不是数据库的事务,而是应用层的事务,所以tcc这三个阶段的操作,全部都需要我们手动实现
    先看下Transaction对象的参数

        private TransactionXid xid;
    
        private TransactionStatus status;
    
        private TransactionType transactionType;
    
        private volatile int retriedCount = 0;
    
        private Date createTime = new Date();
    
        private Date lastUpdateTime = new Date();
    
        private long version = 1;
    
        private List<Participant> participants = new ArrayList<Participant>();
    
        private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();
    
    参数 解释
    xid 全局事务id,内容相当于uuid,用来保证事务唯一性
    status 事务的状态,可以为TRYING,CONFIRMING,CANCELLING,分别对应tcc三个阶段
    transactionType 事务类型,可以为ROOT,BRANCH,ROOT是主事务,BRANCH是分支事务,事务的发起方法对应主事务,其他被调用的dubbo接口在分支事务上
    retriedCount 事务重试次数,当confirm或者cancel失败重试时增加
    createTime 事务的创建时间
    lastUpdateTime 事务最后一次更新时间
    version 事务的版本号,乐观锁?
    participants 事务的参与者
    attachments 附加参数,暂时没发现被用到

    再看下Transaction中两个比较重要的方法

    public void commit() {
    
        for (Participant participant : participants) {
            participant.commit();
        }
    }
    
    public void rollback() {
        for (Participant participant : participants) {
            participant.rollback();
        }
    }
    

    执行Transaction的commit或rollback方法,会对应执行所有participant的commit或rollback方法

    下面讲解Participant抽象

    Participant

    Participant是对tcc事务参与者的抽象

    public class Participant implements Serializable {
    
        private static final long serialVersionUID = 4127729421281425247L;
    
        private TransactionXid xid;
    
        private InvocationContext confirmInvocationContext;
    
        private InvocationContext cancelInvocationContext;
    
        private Terminator terminator = new Terminator();
    
        Class<? extends TransactionContextEditor> transactionContextEditorClass;
    
        ......
    }
    

    在Participant中使用InvocationContext把事务参与者的confirm和cancel方法的元数据保存下来

    public class InvocationContext implements Serializable {
    
        private static final long serialVersionUID = -7969140711432461165L;
        private Class targetClass;
    
        private String methodName;
    
        private Class[] parameterTypes;
    
        private Object[] args;
    }
    

    元数据包括目标Class,目标方法名,目标参数列表,实际参数列表

    Participant也保存了transactionContextEditorClass,用于提取事务上下文

    接下来看Participant的commit和rollback方法

     public void rollback() {
            terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);
        }
    
        public void commit() {
            terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
        }
    

    会通过terminator执行具体的操作,传入构造好的TransactionContext,confirmInvocationContext和transactionContextEditorClass

    public Object invoke(TransactionContext transactionContext, InvocationContext invocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
    
    
            if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {
    
                try {
                    //从spring容器中获取对应事务参与者实现
                    Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();
    
                    Method method = null;
                    //获取对应方法反射对象
                    method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes());
                    //设置上下文
                    FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs());
                    //反射调用
                    return method.invoke(target, invocationContext.getArgs());
    
                } catch (Exception e) {
                    throw new SystemException(e);
                }
            }
            return null;
        }
    

    在terminator的调用过程中,我们需要理解的一点的是,Participant其实分两种,第一种是本地的Participant,直接反射调用即可,第二种是远程的Participant,也就是调用的是Dubbo接口,反射调用的同时会执行远程对等端的接口逻辑,所以这里需要用到transactionContextEditorClass来设置上下文信息,传递到远程dubbo接口中去

    TransactionContext

    image.png

    TransactionContext的保存了全局事务id和事务状态,在调用事务参与者Participant的confirm或cancel方法时会传递过去

    TransactionContextEditor

    public interface TransactionContextEditor {
    
        public TransactionContext get(Object target, Method method, Object[] args);
    
        public void set(TransactionContext transactionContext, Object target, Method method, Object[] args);
    
    }
    

    TransactionContextEditor用于调用事务参与者方法时,设置和获取需要传递的TransactionContext,目前有2种实现,DefaultTransactionContextEditor和DubboTransactionContextEditor
    DefaultTransactionContextEditor从方法参数里面提取TransactionContext对象

    @Override
            public TransactionContext get(Object target, Method method, Object[] args) {
                //获取TransactionContext在args中的位置
                int position = getTransactionContextParamPosition(method.getParameterTypes());
    
                if (position >= 0) {
                    return (TransactionContext) args[position];
                }
    
                return null;
            }
    

    DubboTransactionContextEditor从Dubbo Rpc上下文提取TransactionContext对象

    @Override
        public TransactionContext get(Object target, Method method, Object[] args) {
            //从Dubbo Rpc上下文获取
            String context = RpcContext.getContext().getAttachment(TransactionContextConstants.TRANSACTION_CONTEXT);
    
            if (StringUtils.isNotEmpty(context)) {
                return JSON.parseObject(context, TransactionContext.class);
            }
    
            return null;
        }
    

    TransactionManager

    TransactionManager用来控制Transaction的生命周期,Transaction的改变使用TransactionRepository同步到第三方存储,一般使用mysql数据库存储
    TransactionManager包含的方法以及属性如下


    image.png

    变量介绍

    1. transactionRepository
     private TransactionRepository transactionRepository;
    

    TransactionRepository用于对Transaction的持久化操作,如果是JDBC实现,其实就是对一张Transaction表的CRUD,这张表主要用于补偿任务

    1. CURRENT
    private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();
    

    这是一个双向队列,在这个类主要用作栈,用来处理事务的嵌套,因为是ThreadLocal,所以针对每个线程都是独立的

    1. executorService
     private ExecutorService executorService;
    

    线程池,用于异步执行commit或者cancel

    方法介绍

    1. registerTransaction
    private void registerTransaction(Transaction transaction) {
    
        if (CURRENT.get() == null) {
            CURRENT.set(new LinkedList<Transaction>());
        }
    
        CURRENT.get().push(transaction);
    }
    

    把transaction设置到ThreadLocal对象中去,push方法对应入栈

    1. begin
      public Transaction begin() {

      Transaction transaction = new Transaction(TransactionType.ROOT);
      transactionRepository.create(transaction);
      registerTransaction(transaction);
      return transaction;
      }
      开启事务,同步到repository,注册到ThreadLocal,这个方法与用于主事务的创建

    2. propagationNewBegin

    public Transaction propagationNewBegin(TransactionContext transactionContext) {
    
        Transaction transaction = new Transaction(transactionContext);
        transactionRepository.create(transaction);
    
        registerTransaction(transaction);
        return transaction;
    }
    

    这个方法用于从主事务的上下文创建分支事务,xid保持不变,事务类型变化

    1. propagationExistBegin
    public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {
        Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());
    
        if (transaction != null) {
            transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus()));
            registerTransaction(transaction);
            return transaction;
        } else {
            throw new NoExistedTransactionException();
        }
    }
    

    这个方法用于从事务上下文同步事务状态到ThreadLocal

    1. commit
    public void commit(boolean asyncCommit) {
        //从threadlocal得到当前事务
        final Transaction transaction = getCurrentTransaction();
    
        transaction.changeStatus(TransactionStatus.CONFIRMING);
        //数据库更新transaction
        transactionRepository.update(transaction);
    
        if (asyncCommit) {
            try {
                Long statTime = System.currentTimeMillis();
                //通过线程池异步执行事务提交
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        commitTransaction(transaction);
                    }
                });
                logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime));
            } catch (Throwable commitException) {
                logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException);
                throw new ConfirmingException(commitException);
            }
        } else {
            //同步执行事务提交
            commitTransaction(transaction);
        }
    }
    

    这个方法执行事务的commit,实际事务提交在commitTransaction中执行,会执行每个事务参与者的commit方法

    private void commitTransaction(Transaction transaction) {
        try {
            //调用事务参与者的提交方法
            transaction.commit();
            //事务结束,在数据库删除当前事务,如果commit异常,不会把数据库内事务记录删除,为了重试补偿
            transactionRepository.delete(transaction);
        } catch (Throwable commitException) {
            logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);
            throw new ConfirmingException(commitException);
        }
    }
    

    看了这段逻辑后,我们可以发现,在commit失败的时候,并不会触发rollback,而是不删除数据库事务记录,之后会有定时任务进行扫描重试,后面会讲到这个定时任务

    1. rollback
    public void rollback(boolean asyncRollback) {
    
        final Transaction transaction = getCurrentTransaction();
        transaction.changeStatus(TransactionStatus.CANCELLING);
        
        transactionRepository.update(transaction);
    
        if (asyncRollback) {
    
            try {
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        rollbackTransaction(transaction);
                    }
                });
            } catch (Throwable rollbackException) {
                logger.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", rollbackException);
                throw new CancellingException(rollbackException);
            }
        } else {
    
            rollbackTransaction(transaction);
        }
    }
    

    这个方法用于执行事务的回滚逻辑,和commit方法类似,在rollbackTransaction方法中,会执行每个事务参与者的rollback方法

    private void rollbackTransaction(Transaction transaction) {
        try {
            //调用事务参与者的提交方法
            transaction.rollback();
             //事务结束,在数据库删除当前事务,如果rollback异常,不会把数据库内事务记录删除,为了重试补偿
            transactionRepository.delete(transaction);
        } catch (Throwable rollbackException) {
            logger.warn("compensable transaction rollback failed, recovery job will try to rollback later.", rollbackException);
            throw new CancellingException(rollbackException);
        }
    }
    
    1. cleanAfterCompletion
    public void cleanAfterCompletion(Transaction transaction) {
        if (isTransactionActive() && transaction != null) {
            Transaction currentTransaction = getCurrentTransaction();
            if (currentTransaction == transaction) {
                //栈操作,后进先出
                CURRENT.get().pop();
            } else {
                throw new SystemException("Illegal transaction when clean after completion");
            }
        }
    }
    

    事务结束,从栈中弹出结束的事务。

    1. enlistParticipant
    public void enlistParticipant(Participant participant) {
            Transaction transaction = this.getCurrentTransaction();
            transaction.enlistParticipant(participant);
            transactionRepository.update(transaction);
        }
    

    给事务绑定事务参与者并同步到repository

    接下来讲下核心的两个切面,这两个切面把上面的所有组件串联在一起使用

    核心Aspect

    在使用tcc-transaction的时候,我们需要对开启tcc事务的方法加上@Compensable注解,这个注解可以设置以下参数

    参数 解释
    propagation 事务传播性,包含REQUIRED(必须存在事务,不存在,创建),SUPPORTS(有事务的话在事务内运行),MANDATORY(必须存在事务),REQUIRES_NEW(不管是否存在,创建新的事务)
    confirmMethod confirm阶段对应的方法
    cancelMethod cancel阶段对应的方法
    transactionContextEditor 设置对应transactionContextEditor
    asyncConfirm 是否异步confirm
    asyncCancel 是否异步cancel

    @Compensable注解的参数会在下面两个切面中使用到

    ConfigurableTransactionAspect

    ConfigurableTransactionAspect主要用来控制Transaction的生命周期,内部通过CompensableTransactionInterceptor实现

    @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
    public void compensableService() {
    
    }
    
    @Around("compensableService()")
    public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
    
        return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
    }
    

    直接看interceptCompensableMethod方法

    public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
    
        //解析@Compensable注解
        Method method = CompensableMethodUtils.getCompensableMethod(pjp);
        Compensable compensable = method.getAnnotation(Compensable.class);
        Propagation propagation = compensable.propagation();
        //获取上下文,如果是Root,不会存在上下文,Transaction都还没创建
        TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());
        boolean asyncConfirm = compensable.asyncConfirm();
        boolean asyncCancel = compensable.asyncCancel();
        boolean isTransactionActive = transactionManager.isTransactionActive();
    
        if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) {
            throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName());
        }
    
        //计算方法类型,Root对应主事务入口方法,Provider对应远程提供者方的方法,Normal是主事务内消费者方的方法(是代理方法)
        MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);
    
        switch (methodType) {
            case ROOT:
                //处理主事务切面
                return rootMethodProceed(pjp, asyncConfirm, asyncCancel);
            case PROVIDER:
                //处理提供者事务切面
                return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel);
            default:
                //消费者事务直接执行,会对应执行远端提供者事务切面
                return pjp.proceed();
        }
    }
    

    在tcc事务内被@Compensable注解的方法分三种

    1. Root方法,就是这次事务的入口方法
    2. Normal方法,在Root方法调用的dubbo接口方法
    3. Provider方法,对应dubbo接口方法的远程实现
      被注解的方法都是try的逻辑,confirm和cancel逻辑配置在@Compensable注解参数中

    对被@Compensable注解的方法执行切面逻辑的时候,会根据这三种方法类型做不同处理
    对于Root方法,执行rootMethodProceed的逻辑

    private Object rootMethodProceed(ProceedingJoinPoint pjp, boolean asyncConfirm, boolean asyncCancel) throws Throwable {
    
        Object returnValue = null;
    
        Transaction transaction = null;
    
        try {
    
            //创建事务
            transaction = transactionManager.begin();
    
            try {
                returnValue = pjp.proceed();
            } catch (Throwable tryingException) {
    
                if (isDelayCancelException(tryingException)) {
                    transactionManager.syncTransaction();
                } else {
                    logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);
    
                    //回滚事务
                    transactionManager.rollback(asyncCancel);
                }
    
                throw tryingException;
            }
    
            //提交事务
            transactionManager.commit(asyncConfirm);
    
        } finally {
            //清理操作
            transactionManager.cleanAfterCompletion(transaction);
        }
    
        return returnValue;
    }
    

    会在被切方法(对应try逻辑)执行前,开启事务,try逻辑执行成功,通过transactionManager的commit方法执行每个事务参与者的commit逻辑,如果try失败,通过transactionManager执行每个参与者的rollback逻辑。

    对于Provider方法

    private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext, boolean asyncConfirm, boolean asyncCancel) throws Throwable {
    
        Transaction transaction = null;
        try {
    
            switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
                case TRYING:
                    //使用transactionContext创建分支事务
                    transaction = transactionManager.propagationNewBegin(transactionContext);
                    //执行被切方法逻辑
                    return pjp.proceed();
                case CONFIRMING:
                    try {
                        //更新事务状态
                        transaction = transactionManager.propagationExistBegin(transactionContext);
                        //提交事务,不走切面的方法
                        transactionManager.commit(asyncConfirm);
                    } catch (NoExistedTransactionException excepton) {
                        //the transaction has been commit,ignore it.
                    }
                    break;
                case CANCELLING:
    
                    try {
                        //更新事务状态
                        transaction = transactionManager.propagationExistBegin(transactionContext);
                        //回滚事务,不走切面的方法
                        transactionManager.rollback(asyncCancel);
                    } catch (NoExistedTransactionException exception) {
                        //the transaction has been rollback,ignore it.
                    }
                    break;
            }
    
        } finally {
            //清理资源
            transactionManager.cleanAfterCompletion(transaction);
        }
    
        Method method = ((MethodSignature) (pjp.getSignature())).getMethod();
    
        //对于 confirm和 cancel 返回空值
        //主要针对原始类型做处理,因为不能为null
        return ReflectionUtils.getNullValue(method.getReturnType());
    }
    

    可以看到在provider类型方法的切面,也就是远程的Participant,如果transaction的status为trying,会通过transactionManager.propagationNewBegin创建分支事务并执行被切方法逻辑,如果是status为confirming或者canceling,会调用对应confirm或cancel配置的方法,跳过被切方法

    对于normal类型
    直接调用,normal类型的方法是封装了对远程dubbo接口方法调用逻辑的本地代理方法,所以直接执行即可

    ConfigurableCoordinatorAspect

    ConfigurableCoordinatorAspect主要是为了设置事务的参与者,在一个事务内,每个被@Compensable注解的方法都是事务参与者

    @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
    public void transactionContextCall() {
    
    }
    
    @Around("transactionContextCall()")
    public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
        return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
    }
    

    相关逻辑封装在ResourceCoordinatorInterceptor的interceptTransactionContextMethod方法中

    public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
    
        //得到当前事务
        Transaction transaction = transactionManager.getCurrentTransaction();
    
        if (transaction != null) {
    
            switch (transaction.getStatus()) {
                case TRYING:
                    //只需要在trying的时候把参与者信息提取出来,设置到transaction中去
                    enlistParticipant(pjp);
                    break;
                case CONFIRMING:
                    break;
                case CANCELLING:
                    break;
            }
        }
    
        //执行目标方法
        return pjp.proceed(pjp.getArgs());
    }
    

    在trying阶段会把所有参与者加入到事务中去,对于Root方法,创建主事务,加入的参与者会包括Root方法对应本地参与者以及Normal方法对应的远程参与者,对于Provider方法,通过主事务上下文创建分支事务,加入的参与者包括Provider方法对应的本地参与者以及它包含的Normal方法对应的远程参与者。这里的远程参与者又可以开启新的分支事务。层级多了,势必会产生性能问题。

    接下来看enlistParticipant如何生成参与者对象

    private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException {
    
        //获取@Compensable信息
        Method method = CompensableMethodUtils.getCompensableMethod(pjp);
        if (method == null) {
            throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName()));
        }
        Compensable compensable = method.getAnnotation(Compensable.class);
    
        String confirmMethodName = compensable.confirmMethod();
        String cancelMethodName = compensable.cancelMethod();
    
        Transaction transaction = transactionManager.getCurrentTransaction();
        TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId());
    
        if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) {
            //设置事务上下文到Editor,Editor用来统一提取上下文,这边对应设置dubbo的rpc上下文中去
            //这边的上下文设置后就会调用try逻辑
            FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs());
        }
    
        Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());
    
        //目前的用法,其实只要保存调用参数就行,因为具体执行confirm和cancel都是根据transaction的status来判断的
        //confirm的调用上下文
        InvocationContext confirmInvocation = new InvocationContext(targetClass,
                confirmMethodName,
                method.getParameterTypes(), pjp.getArgs());
    
        //cancel的调用上下文
        InvocationContext cancelInvocation = new InvocationContext(targetClass,
                cancelMethodName,
                method.getParameterTypes(), pjp.getArgs());
    
        Participant participant =
                new Participant(
                        xid,
                        confirmInvocation,
                        cancelInvocation,
                        compensable.transactionContextEditor());
    
        //把participant设置到transaction,并且同步到持久化存储
        transactionManager.enlistParticipant(participant);
    
    }
    

    通过从@Compensable注解配置的信息以及当前Transaction来配置Participant。
    在Participant设置到Transaction后,会执行pjp.proceed(pjp.getArgs()),也就执行了对应try逻辑的被切方法

    ConfigurableCoordinatorAspect的逻辑会在ConfigurableTransactionAspect后执行,这和它们设置的order有关,小的order先执行,后切入

    失败补偿机制

    对于失败的Confirm和Cancel操作,会有补偿任务进行重试,具体实现类为RecoverScheduledJob,在这个类的init方法会启动quartz任务

    public void init() {
    
        try {
            MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
            //设置定时任务执行的对象和方法
            jobDetail.setTargetObject(transactionRecovery);
            jobDetail.setTargetMethod("startRecover");
            jobDetail.setName("transactionRecoveryJob");
            jobDetail.setConcurrent(false);
            jobDetail.afterPropertiesSet();
    
            CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();
            cronTrigger.setBeanName("transactionRecoveryCronTrigger");
            //设置cron表达式
            cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());
            cronTrigger.setJobDetail(jobDetail.getObject());
            cronTrigger.afterPropertiesSet();
    
            scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());
            scheduler.start();
    
        } catch (Exception e) {
            throw new SystemException(e);
        }
    }
    

    在这个方法里会使用RecoverConfig的配置初始化定时任务,定时任务具体的执行逻辑使用MethodInvokingJobDetailFactoryBean的targetObject和targetMethod配置,对应为transactionRecovery的startRecover方法,我们来看下这个方法

    public void startRecover() {
            //获取所有没被处理的transaction
            List<Transaction> transactions = loadErrorTransactions();
            //根据规则处理这些transaction
            recoverErrorTransactions(transactions);
        }
    

    分别看下上述两个方法的逻辑

    private List<Transaction> loadErrorTransactions() {
    
    
            long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
    
            TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
            RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();
    
            //获取在RecoverDuration间隔之前未完成的transaction
            return transactionRepository.findAllUnmodifiedSince(new Date(currentTimeInMillis - recoverConfig.getRecoverDuration() * 1000));
        }
    
     private void recoverErrorTransactions(List<Transaction> transactions) {
    
    
        for (Transaction transaction : transactions) {
    
            //重试次数超过上限的Transaction不再执行补偿
            if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {
    
                logger.error(String.format("recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)));
                continue;
            }
    
            //如果是分支事务,并且超过最长超时时间忽略
            if (transaction.getTransactionType().equals(TransactionType.BRANCH)
                    && (transaction.getCreateTime().getTime() +
                    transactionConfigurator.getRecoverConfig().getMaxRetryCount() *
                            transactionConfigurator.getRecoverConfig().getRecoverDuration() * 1000
                    > System.currentTimeMillis())) {
                continue;
            }
            
            try {
                transaction.addRetriedCount();
    
                //对超时的confiring操作重试
                if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {
    
                    transaction.changeStatus(TransactionStatus.CONFIRMING);
                    transactionConfigurator.getTransactionRepository().update(transaction);
                    transaction.commit();
                    transactionConfigurator.getTransactionRepository().delete(transaction);
    
                } else if (transaction.getStatus().equals(TransactionStatus.CANCELLING)//对超时的Canceling操作重试,或者Root超时的trying进行cancel操作
                        || transaction.getTransactionType().equals(TransactionType.ROOT)) {
    
                    transaction.changeStatus(TransactionStatus.CANCELLING);
                    transactionConfigurator.getTransactionRepository().update(transaction);
                    transaction.rollback();
                    transactionConfigurator.getTransactionRepository().delete(transaction);
                }
    
            } catch (Throwable throwable) {
    
                if (throwable instanceof OptimisticLockException
                        || ExceptionUtils.getRootCause(throwable) instanceof OptimisticLockException) {
                    logger.warn(String.format("optimisticLockException happened while recover. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);
                } else {
                    logger.error(String.format("recover failed, txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);
                }
            }
        }
    }
    

    注意一点,trying阶段不会重试,失败未处理,会触发canceling操作

    思考

    分布式事务解决方案

    下面列举一些分布式事务解决方案的特性

    1. 传统的二/三阶段提交
      这种解决方案会占用数据库事务资源,在互联网公司很少使用
    2. 异步确保型事务



      基于可靠消息的最终一致性,可以异步,但数据绝对不能丢,而且一定要记账成功
      这个难道是依赖mq支持的事务特性?

    3. 最大努力通知型事务



      按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对
      目前项目比较常用的方式

    4. tcc



      适用于实时性要求比较高,数据必须可靠的场景

    我的理解

    依照我目前的工作经验,现在公司对分布式事务的解决方案一般是上述的第三种方法,但是一些对实时性要求比较高,数据必须可靠的场景我们还是可以考虑使用tcc的,但是也没必要全盘tcc,可以和最大努力通知型事务一起使用

    对于tcc还有一个疑问,在高并发情况下,在mq的模式下,由于是异步,能够保证消息最终被消费掉,并且消费速率稳定,而tcc这种模式,会不会导致接口资源不够用,接口资源都占用满,导致不断的try失败。

    由此可见tcc的使用难度不止在业务使用方式上,对于一些极限的场景,需要有经验的人来分析tcc该使用在多大范围内。但是如果是并发量不大的项目,大家可以试着使用。

    朋友们,使用或没使用过tcc的,请留下你的想法。

    最后

    希望大家关注下我的公众号


    image

    资源链接

    tcc-transaction git地址

    相关文章

      网友评论

        本文标题:tcc-transaction源码分析与思考

        本文链接:https://www.haomeiwen.com/subject/aocblftx.html