fescar源码分析1

作者: leiwingqueen | 来源:发表于2019-01-24 17:54 被阅读25次

一、问题

上一篇文章对fescar的方案做了一个大致的了解。接下来我们尝试针对上一篇文章提到的具体问题做一一的分析。这里先重点对undo log的实现机制做一个简单的分析。

二、分析

上一篇文章我们知道fescar对业务实现回滚是通过undo log来实现的,那么undo log具体是如何产生的呢?
我们来运行一下demo案例,并观察UndoLogManager产生的日志。

  • account_tbl

2019-02-04 09:58:46,243 [FESCAR] [DubboServerHandler-172.19.5.94:20881-thread-2] INFO com.alibaba.fescar.rm.datasource.undo.UndoLogManager - Flushing UNDO LOG: {"branchId":2807291,"sqlUndoLogs":[{"afterImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"ID","type":4,"value":10},{"keyType":"NULL","name":"MONEY","type":4,"value":599}]}],"tableName":"account_tbl"},"beforeImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"ID","type":4,"value":10},{"keyType":"NULL","name":"MONEY","type":4,"value":999}]}],"tableName":"account_tbl"},"sqlType":"UPDATE","tableName":"account_tbl"}],"xid":"172.19.5.94:8091:2807289"}

  • order_tbl

2019-02-04 09:58:46,812 [FESCAR] [DubboServerHandler-172.19.5.94:20883-thread-2] INFO com.alibaba.fescar.rm.datasource.undo.UndoLogManager - Flushing UNDO LOG: {"branchId":2807292,"sqlUndoLogs":[{"afterImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"ID","type":4,"value":20},{"keyType":"NULL","name":"USER_ID","type":12,"value":"U100001"},{"keyType":"NULL","name":"COMMODITY_CODE","type":12,"value":"C00321"},{"keyType":"NULL","name":"COUNT","type":4,"value":2},{"keyType":"NULL","name":"MONEY","type":4,"value":400}]}],"tableName":"order_tbl"},"beforeImage":{"rows":[],"tableName":"order_tbl"},"sqlType":"INSERT","tableName":"order_tbl"}],"xid":"172.19.5.94:8091:2807289"}

  • storage_tbl

2019-02-04 09:58:45,824 [FESCAR] [DubboServerHandler-172.19.5.94:20882-thread-2] INFO com.alibaba.fescar.rm.datasource.undo.UndoLogManager - Flushing UNDO LOG: {"branchId":2807290,"sqlUndoLogs":[{"afterImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"ID","type":4,"value":10},{"keyType":"NULL","name":"COUNT","type":4,"value":98}]}],"tableName":"storage_tbl"},"beforeImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"ID","type":4,"value":10},{"keyType":"NULL","name":"COUNT","type":4,"value":100}]}],"tableName":"storage_tbl"},"sqlType":"UPDATE","tableName":"storage_tbl"}],"xid":"172.19.5.94:8091:2807289"}

实现这个功能主要是在RM里面实现,也就是会跟业务进程集成在一起实现。
包:rm-datasource
AbstractDMLBaseExecutor.java

protected T executeAutoCommitFalse(Object[] args) throws Throwable {
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        statementProxy.getConnectionProxy().prepareUndoLog(sqlRecognizer.getSQLType(), sqlRecognizer.getTableName(), beforeImage, afterImage);
        return result;
    }
    protected abstract TableRecords beforeImage() throws SQLException;

    protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;

beforeImage为更新前的快照,afterImage为更新后的快照。
AbstractDMLBaseExecutor有3个字类,UpdateExecutor,InsertExecutor,DeleteExecutor,分别是针对不同的sql的实现类。这里我们只分析UpdateExecutor。

@Override
    protected TableRecords beforeImage() throws SQLException {
        SQLUpdateRecognizer visitor = (SQLUpdateRecognizer) sqlRecognizer;
        TableMeta tmeta = getTableMeta();
        List<String> updateColumns = visitor.getUpdateColumns();

        StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
        if (!tmeta.containsPK(updateColumns)) {
            // PK should be included.
            selectSQLAppender.append(tmeta.getPkName() + ", ");
        }
        for (int i = 0; i < updateColumns.size(); i++) {
            selectSQLAppender.append(updateColumns.get(i));
            if (i < (updateColumns.size() - 1)) {
                selectSQLAppender.append(", ");
            }
        }
        String whereCondition = null;
        ArrayList<Object> paramAppender = new ArrayList<>();
        if (statementProxy instanceof ParametersHolder) {
            whereCondition = visitor.getWhereCondition((ParametersHolder) statementProxy, paramAppender);
        } else {
            whereCondition = visitor.getWhereCondition();
        }
        selectSQLAppender.append(" FROM " + tmeta.getTableName() + " WHERE " + whereCondition + " FOR UPDATE");
        String selectSQL = selectSQLAppender.toString();

        TableRecords beforeImage = null;
        PreparedStatement ps = null;
        Statement st = null;
        ResultSet rs = null;
        try {
            if (paramAppender.isEmpty()) {
                st = statementProxy.getConnection().createStatement();
                rs = st.executeQuery(selectSQL);
            } else {
                ps = statementProxy.getConnection().prepareStatement(selectSQL);
                for (int i = 0; i< paramAppender.size(); i++) {
                    ps.setObject(i + 1, paramAppender.get(i));
                }
                rs = ps.executeQuery();
            }
            beforeImage = TableRecords.buildRecords(tmeta, rs);

        } finally {
            if (rs != null) {
                rs.close();
            }
            if (st != null) {
                st.close();
            }
            if (ps != null) {
                ps.close();
            }
        }
        return beforeImage;
    }

    @Override
    protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
        SQLUpdateRecognizer visitor = (SQLUpdateRecognizer) sqlRecognizer;
        TableMeta tmeta = getTableMeta();
        if (beforeImage == null || beforeImage.size() == 0) {
            return TableRecords.empty(getTableMeta());
        }
        List<String> updateColumns = visitor.getUpdateColumns();

        StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
        if (!tmeta.containsPK(updateColumns)) {
            // PK should be included.
            selectSQLAppender.append(tmeta.getPkName() + ", ");
        }
        for (int i = 0; i < updateColumns.size(); i++) {
            selectSQLAppender.append(updateColumns.get(i));
            if (i < (updateColumns.size() - 1)) {
                selectSQLAppender.append(", ");
            }
        }
        List<Field> pkRows = beforeImage.pkRows();
        selectSQLAppender.append(" FROM " + tmeta.getTableName() + " WHERE " + buildWhereConditionByPKs(pkRows) + " FOR UPDATE");
        String selectSQL = selectSQLAppender.toString();

        TableRecords afterImage = null;
        PreparedStatement pst = null;
        ResultSet rs = null;
        try {
            pst = statementProxy.getConnection().prepareStatement(selectSQL);
            int index = 0;
            for (Field pkField : pkRows) {
                index++;
                pst.setObject(index, pkField.getValue(), pkField.getType());
            }
            rs = pst.executeQuery();
            afterImage = TableRecords.buildRecords(tmeta, rs);

        } finally {
            if (rs != null) {
                rs.close();
            }
            if (pst != null) {
                pst.close();
            }
        }
        return afterImage;
    }

其实实现也很简单,使用select for update来保证查询前的值的准确性。在更新后再查一次,得到更新后的值。
我在UpdateExecutor上增加日志来直观观察下更新前后的查询sql

2019-02-04 10:38:22,724 [FESCAR] [DubboServerHandler-172.19.5.94:20882-thread-2] INFO com.alibaba.fescar.rm.datasource.exec.UpdateExecutor - 查询beforeImage...selectSQL:SELECT ID, count FROM storage_tbl WHERE commodity_code = ? FOR UPDATE,param:["C00321"]
2019-02-04 10:38:22,739 [FESCAR] [DubboServerHandler-172.19.5.94:20882-thread-2] INFO com.alibaba.fescar.rm.datasource.exec.UpdateExecutor - 查询afterImage...selectSQL:SELECT ID, count FROM storage_tbl WHERE ID = ? FOR UPDATE,param:[{"name":"ID","keyType":"PrimaryKey","type":4,"value":11}]

事实上更新前后的sql查询实现并没有那么简单,涉及到多条sql的更新如何保存?多个表的更新?更新语句如果有别名的情况如何处理?这些细节都要经过严格的测试才行。然而这块我目前没看到对应的单元测试,感觉得自己补充完成才能验证这块逻辑的正确性了

三、总结

因此fescar在第一阶段提交的时候其实会增加了两次的查询成本。有没有办法通过类似binlog的方式来异步实现这个逻辑呢?如果涉及到异步的操作我们可能就需要考虑如何保证undo log能够写入成功的问题了

相关文章

网友评论

    本文标题:fescar源码分析1

    本文链接:https://www.haomeiwen.com/subject/czdujqtx.html