美文网首页fescar
Fescar - RM SelectForUpdateExecu

Fescar - RM SelectForUpdateExecu

作者: 晴天哥_王志 | 来源:发表于2019-02-17 18:38 被阅读6次

    开篇

     这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。

     个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。这篇文章讲解Executor的实现类SelectForUpdateExecutor。

    类依赖图


    说明:
    • 着重讲解SelectForUpdateExecutor实现类。
    • SelectForUpdateExecutor的实现和其他的Executor从类图实现上不同,因为select不需要保存镜像。

    SelectForUpdateExecutor方法介绍

    public class SelectForUpdateExecutor<S extends Statement> extends BaseTransactionalExecutor<ResultSet, S> {
    
        public SelectForUpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<ResultSet, S> statementCallback, SQLRecognizer sqlRecognizer) {
            super(statementProxy, statementCallback, sqlRecognizer);
        }
    
        @Override
        public Object doExecute(Object... args) throws Throwable {
            SQLSelectRecognizer recognizer = (SQLSelectRecognizer) sqlRecognizer;
    
            Connection conn = statementProxy.getConnection();
            ResultSet rs = null;
            Savepoint sp = null;
            LockRetryController lockRetryController = new LockRetryController();
            boolean originalAutoCommit = conn.getAutoCommit();
    
            StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
            selectSQLAppender.append(getColumnNameInSQL(getTableMeta().getPkName()));
            selectSQLAppender.append(" FROM " + getFromTableInSQL());
            String whereCondition = null;
            ArrayList<Object> paramAppender = new ArrayList<>();
            if (statementProxy instanceof ParametersHolder) {
                whereCondition = recognizer.getWhereCondition((ParametersHolder) statementProxy, paramAppender);
            } else {
                whereCondition = recognizer.getWhereCondition();
            }
            if (!StringUtils.isEmpty(whereCondition)) {
                selectSQLAppender.append(" WHERE " + whereCondition);
            }
            selectSQLAppender.append(" FOR UPDATE");
            String selectPKSQL = selectSQLAppender.toString();
    
            try {
                if (originalAutoCommit) {
                    conn.setAutoCommit(false);
                }
                sp = conn.setSavepoint();
                rs = statementCallback.execute(statementProxy.getTargetStatement(), args);
    
                while (true) {
                    // Try to get global lock of those rows selected
                    Statement stPK = null;
                    PreparedStatement pstPK = null;
                    ResultSet rsPK = null;
                    try {
                        if (paramAppender.isEmpty()) {
                            stPK = statementProxy.getConnection().createStatement();
                            rsPK = stPK.executeQuery(selectPKSQL);
                        } else {
                            pstPK = statementProxy.getConnection().prepareStatement(selectPKSQL);
                            for (int i = 0; i < paramAppender.size(); i++) {
                                pstPK.setObject(i + 1, paramAppender.get(i));
                            }
                            rsPK = pstPK.executeQuery();
                        }
    
                        TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
                        statementProxy.getConnectionProxy().checkLock(selectPKRows);
                        break;
    
                    } catch (LockConflictException lce) {
                        conn.rollback(sp);
                        lockRetryController.sleep(lce);
    
                    } finally {
                        if (rsPK != null) {
                            rsPK.close();
                        }
                        if (stPK != null) {
                            stPK.close();
                        }
                        if (pstPK != null) {
                            pstPK.close();
                        }
                    }
                }
    
            } finally {
                if (sp != null) {
                    conn.releaseSavepoint(sp);
                }
                if (originalAutoCommit) {
                    conn.setAutoCommit(true);
                }
            }
            return rs;
        }
    }
    

    说明:

    • 从类的依赖图可以看出SelectForUpdateExecutor的实现方式和其他Executor不一样。
    • SelectForUpdateExecutor因为涉及到的查询操作,所以没有执行前后镜像的问题。
    • SelectForUpdateExecutor的执行逻辑在于设置回滚点,conn.setSavepoint()。
    • SelectForUpdateExecutor通过拼接查询主键的SQL语句获取查询记录的主键key。
    • SelectForUpdateExecutor根据查询主键的记录判断是否可以锁checkLock,如果不能锁则直接回滚,然后进行重试。

    相关文章

      网友评论

        本文标题:Fescar - RM SelectForUpdateExecu

        本文链接:https://www.haomeiwen.com/subject/ghydeqtx.html