美文网首页
分布式事务框架FESCAR执行过程-AT

分布式事务框架FESCAR执行过程-AT

作者: 北交吴志炜 | 来源:发表于2019-01-12 22:13 被阅读0次

    本文主要从代码层面解析FESCAR的执行过程,原理架构的图文解析可以见github 项目主页https://github.com/alibaba/fescar/wiki/%E9%98%BF%E9%87%8C%E5%B7%B4%E5%B7%B4%E5%BC%80%E6%BA%90%E5%88%86%E5%B8%83%E5%BC%8F%E4%BA%8B%E5%8A%A1%E8%A7%A3%E5%86%B3%E6%96%B9%E6%A1%88-FESCAR
    源代码可以见https://github.com/alibaba/fescar

    以git主页上面的quickstart开始分析(purchase包含若干分布式的事务service),号称一个注解即可搞定分布式事务

    We just need an annotation @GlobalTransactional on business method:

     @GlobalTransactional
        public void purchase(String userId, String commodityCode, int orderCount) {
            ......
        }
    

    普通场景,我们使用spring管理本地事务使用@Transactional注解,FESCAR使用的是@GlobalTransactional注解,那么我们就从@GlobalTransactional注解入手 ,显然这个注解通过SpringAop起作用,可以看GlobalTransactionalInterceptor代码

    invoke方法中,其实真正的关键就是transactionalTemplate.execute();
    也就是说,加了@GlobalTransactional注解的方法,会在这个transactionalTemplate.execute()方法中执行真正的业务方法

    @Override
        public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
            final GlobalTransactional anno = getAnnotation(methodInvocation.getMethod());
            if (anno != null) {
                try {
                    return transactionalTemplate.execute(new TransactionalExecutor() {
                        @Override
                        public Object execute() throws Throwable {
                            return methodInvocation.proceed();
                        }
    
                        @Override
                        public int timeout() {
                            return anno.timeoutMills();
                        }
    
                        @Override
                        public String name() {
                            if (anno.name() != null) {
                                return anno.name();
                            }
                            return formatMethod(methodInvocation.getMethod());
                        }
                    });
                } catch (TransactionalExecutor.ExecutionException e) { }
            return methodInvocation.proceed();
        }
    

    逻辑比较清楚,加了这个@GlobalTransactional注解的方法,通过transactionalTemplate.execute来执行。那么transactionalTemplate.execute()到底做了什么呢?

    见TransactionalTemplate类

    
     public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
    
           // 1. get or create a transaction
           GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
    
           // 2. begin transaction
           try {
               tx.begin(business.timeout(), business.name());
    
           } catch (TransactionException txe) {
               throw new TransactionalExecutor.ExecutionException(tx, txe,
                   TransactionalExecutor.Code.BeginFailure);
    
           }
    
           Object rs = null;
           try {
    
               // Do Your Business
               rs = business.execute();
    
           } catch (Throwable ex) {
    
               // 3. any business exception, rollback.
               try {
                   tx.rollback();
    
                   // 3.1 Successfully rolled back
                   throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
    
               } catch (TransactionException txe) {
                   // 3.2 Failed to rollback
                   throw new TransactionalExecutor.ExecutionException(tx, txe,
                       TransactionalExecutor.Code.RollbackFailure, ex);
    
               }
    
           }
    
           // 4. everything is fine, commit.
           try {
               tx.commit();
    
           } catch (TransactionException txe) {
               // 4.1 Failed to commit
               throw new TransactionalExecutor.ExecutionException(tx, txe,
                   TransactionalExecutor.Code.CommitFailure);
    
           }
           return rs;
       }
    
    }
    

    这个框架注释不是很多,但是这个类是例外,1,2,3,4,5很清楚。
    简单说就是
    1.获取或者创建一个GlobalTransaction;
    2.开始事务
    3.执行分布式方法,
    4.有异常回滚
    5.无异常提交。

    下面分别说一下commit和rollback

    此处的commit实质上什么也没做,只是维护一下GlobalSession中的分支状态,可以认为是个假的commit,提交这个动作其实第三步execute方法执行分支事务方法的时候就已经做了,也就是说,分布式事务在各自执行的时候就已经提交了,感兴趣的可以看代码DefaultCore#doGlobalCommit,以quickstart为例,在分支方法中打断点,可以看到,断点之前的其他分支方法里面已经写了DB且commit了。

    那么如果某一个分支事务执行失败,其他已经提交了的分支事务还怎么回滚呢?看下面
    rollback的流程,可以看DefaultCore.doGlobalRollback方法
    其中最关键的就是UndoLogManager类,代码很长,感兴趣的自己研究,具体就是,这个undo,其实就是通过undolog来反向生成一个回滚sql,然后执行这个回滚sql来达到rollback的效果。

    public static void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
            assertDbSupport(dataSourceProxy.getTargetDataSource().getDbType());
    
            Connection conn = null;
            ResultSet rs = null;
            PreparedStatement selectPST = null;
            try {
                conn = dataSourceProxy.getPlainConnection();
    
                // The entire undo process should run in a local transaction.
                conn.setAutoCommit(false);
    
                // Find UNDO LOG
                selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                selectPST.setLong(1, branchId);
                selectPST.setString(2, xid);
                rs = selectPST.executeQuery();
    
                while (rs.next()) {
                    Blob b = rs.getBlob("rollback_info");
                    String rollbackInfo = StringUtils.blob2string(b);
                    BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance().decode(rollbackInfo);
    
                    for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {
                        TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
                        sqlUndoLog.setTableMeta(tableMeta);
                        AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
                        undoExecutor.executeOn(conn);
                    }
    
                }
                deleteUndoLog(xid, branchId, conn);
    
                conn.commit();
    
            } catch (Throwable e) {
                if (conn != null) {
                    try {
                        conn.rollback();
                    } catch (SQLException rollbackEx) {
                        LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
                    }
                }
                throw new TransactionException(BranchRollbackFailed_Retriable, String.format("%s/%s", branchId, xid), e);
    
            } finally {
                try {
                    if (rs != null) {
                        rs.close();
                    }
                    if (selectPST != null) {
                        selectPST.close();
                    }
                    if (conn != null) {
                        conn.close();
                    }
                } catch (SQLException closeEx) {
                    LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
                }
            }
    
        }
    

    那么这个回滚sql是如何生成的呢,我们以insert的undo为例,可以看到,其实就是通过生成一条delete语句来进行回滚。
    响应的,delete的事务,回滚就是insert一条。

    public class MySQLUndoInsertExecutor extends AbstractUndoExecutor {
    
        @Override
        protected String buildUndoSQL() {
            TableRecords afterImage = sqlUndoLog.getAfterImage();
            List<Row> afterImageRows = afterImage.getRows();
            if (afterImageRows == null || afterImageRows.size() == 0) {
                throw new ShouldNeverHappenException("Invalid UNDO LOG");
            }
            Row row = afterImageRows.get(0);
            StringBuffer mainSQL = new StringBuffer("DELETE FROM " + sqlUndoLog.getTableName());
            StringBuffer where = new StringBuffer(" WHERE ");
            boolean first = true;
            for (Field field : row.getFields()) {
                if (field.getKeyType() == KeyType.PrimaryKey) {
                    where.append(field.getName() + " = ? ");
                }
    
            }
            return mainSQL.append(where).toString();
        }
    

    总结一下,FESCAR其实就是说,把分布式事务当作一批branch本地事务来执行,branch各自执行,各自提交,假如所有branch都成功,那么commit的时候,维护一个状态即可,因为大家已经提交了;假如某一个branch执行失败,那么进行回滚,回滚的方式是根据之前的undolog生成一个反向的回滚sql,各个branch分别执行自己的回滚sql来达到回滚的效果。
    这与XA的两阶段提交相比,优势在哪呢?1.不需要数据库支持XA协议,因为这个proxy是在应用层面。限制更少。2.减少了事务持锁时间,从而提高了事务的并发度

    问题点:由于追求尽量少的持锁时间,那就需要考虑回滚场景下,分支事务提交到回滚sql执行之间是有时间差的,这个时间区间内的数据状态可以认为是一种脏数据。对于这种数据的读写都需要注意,读会产生“脏读”,写需要加乐观锁,不然会产生丢失更新。

    相关文章

      网友评论

          本文标题:分布式事务框架FESCAR执行过程-AT

          本文链接:https://www.haomeiwen.com/subject/tyxhdqtx.html