一、问题
上一篇文章对fescar的方案做了一个大致的了解。接下来我们尝试针对上一篇文章提到的具体问题做一一的分析。这里先重点对undo log的实现机制做一个简单的分析。
二、分析
上一篇文章我们知道fescar对业务实现回滚是通过undo log来实现的,那么undo log具体是如何产生的呢?
我们来运行一下demo案例,并观察UndoLogManager产生的日志。
- account_tbl
2019-02-04 09:58:46,243 [FESCAR] [DubboServerHandler-172.19.5.94:20881-thread-2] INFO com.alibaba.fescar.rm.datasource.undo.UndoLogManager - Flushing UNDO LOG: {"branchId":2807291,"sqlUndoLogs":[{"afterImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"ID","type":4,"value":10},{"keyType":"NULL","name":"MONEY","type":4,"value":599}]}],"tableName":"account_tbl"},"beforeImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"ID","type":4,"value":10},{"keyType":"NULL","name":"MONEY","type":4,"value":999}]}],"tableName":"account_tbl"},"sqlType":"UPDATE","tableName":"account_tbl"}],"xid":"172.19.5.94:8091:2807289"}
- order_tbl
2019-02-04 09:58:46,812 [FESCAR] [DubboServerHandler-172.19.5.94:20883-thread-2] INFO com.alibaba.fescar.rm.datasource.undo.UndoLogManager - Flushing UNDO LOG: {"branchId":2807292,"sqlUndoLogs":[{"afterImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"ID","type":4,"value":20},{"keyType":"NULL","name":"USER_ID","type":12,"value":"U100001"},{"keyType":"NULL","name":"COMMODITY_CODE","type":12,"value":"C00321"},{"keyType":"NULL","name":"COUNT","type":4,"value":2},{"keyType":"NULL","name":"MONEY","type":4,"value":400}]}],"tableName":"order_tbl"},"beforeImage":{"rows":[],"tableName":"order_tbl"},"sqlType":"INSERT","tableName":"order_tbl"}],"xid":"172.19.5.94:8091:2807289"}
- storage_tbl
2019-02-04 09:58:45,824 [FESCAR] [DubboServerHandler-172.19.5.94:20882-thread-2] INFO com.alibaba.fescar.rm.datasource.undo.UndoLogManager - Flushing UNDO LOG: {"branchId":2807290,"sqlUndoLogs":[{"afterImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"ID","type":4,"value":10},{"keyType":"NULL","name":"COUNT","type":4,"value":98}]}],"tableName":"storage_tbl"},"beforeImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"ID","type":4,"value":10},{"keyType":"NULL","name":"COUNT","type":4,"value":100}]}],"tableName":"storage_tbl"},"sqlType":"UPDATE","tableName":"storage_tbl"}],"xid":"172.19.5.94:8091:2807289"}
实现这个功能主要是在RM里面实现,也就是会跟业务进程集成在一起实现。
包:rm-datasource
AbstractDMLBaseExecutor.java
protected T executeAutoCommitFalse(Object[] args) throws Throwable {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
statementProxy.getConnectionProxy().prepareUndoLog(sqlRecognizer.getSQLType(), sqlRecognizer.getTableName(), beforeImage, afterImage);
return result;
}
protected abstract TableRecords beforeImage() throws SQLException;
protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;
beforeImage为更新前的快照,afterImage为更新后的快照。
AbstractDMLBaseExecutor有3个字类,UpdateExecutor,InsertExecutor,DeleteExecutor,分别是针对不同的sql的实现类。这里我们只分析UpdateExecutor。
@Override
protected TableRecords beforeImage() throws SQLException {
SQLUpdateRecognizer visitor = (SQLUpdateRecognizer) sqlRecognizer;
TableMeta tmeta = getTableMeta();
List<String> updateColumns = visitor.getUpdateColumns();
StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
if (!tmeta.containsPK(updateColumns)) {
// PK should be included.
selectSQLAppender.append(tmeta.getPkName() + ", ");
}
for (int i = 0; i < updateColumns.size(); i++) {
selectSQLAppender.append(updateColumns.get(i));
if (i < (updateColumns.size() - 1)) {
selectSQLAppender.append(", ");
}
}
String whereCondition = null;
ArrayList<Object> paramAppender = new ArrayList<>();
if (statementProxy instanceof ParametersHolder) {
whereCondition = visitor.getWhereCondition((ParametersHolder) statementProxy, paramAppender);
} else {
whereCondition = visitor.getWhereCondition();
}
selectSQLAppender.append(" FROM " + tmeta.getTableName() + " WHERE " + whereCondition + " FOR UPDATE");
String selectSQL = selectSQLAppender.toString();
TableRecords beforeImage = null;
PreparedStatement ps = null;
Statement st = null;
ResultSet rs = null;
try {
if (paramAppender.isEmpty()) {
st = statementProxy.getConnection().createStatement();
rs = st.executeQuery(selectSQL);
} else {
ps = statementProxy.getConnection().prepareStatement(selectSQL);
for (int i = 0; i< paramAppender.size(); i++) {
ps.setObject(i + 1, paramAppender.get(i));
}
rs = ps.executeQuery();
}
beforeImage = TableRecords.buildRecords(tmeta, rs);
} finally {
if (rs != null) {
rs.close();
}
if (st != null) {
st.close();
}
if (ps != null) {
ps.close();
}
}
return beforeImage;
}
@Override
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
SQLUpdateRecognizer visitor = (SQLUpdateRecognizer) sqlRecognizer;
TableMeta tmeta = getTableMeta();
if (beforeImage == null || beforeImage.size() == 0) {
return TableRecords.empty(getTableMeta());
}
List<String> updateColumns = visitor.getUpdateColumns();
StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
if (!tmeta.containsPK(updateColumns)) {
// PK should be included.
selectSQLAppender.append(tmeta.getPkName() + ", ");
}
for (int i = 0; i < updateColumns.size(); i++) {
selectSQLAppender.append(updateColumns.get(i));
if (i < (updateColumns.size() - 1)) {
selectSQLAppender.append(", ");
}
}
List<Field> pkRows = beforeImage.pkRows();
selectSQLAppender.append(" FROM " + tmeta.getTableName() + " WHERE " + buildWhereConditionByPKs(pkRows) + " FOR UPDATE");
String selectSQL = selectSQLAppender.toString();
TableRecords afterImage = null;
PreparedStatement pst = null;
ResultSet rs = null;
try {
pst = statementProxy.getConnection().prepareStatement(selectSQL);
int index = 0;
for (Field pkField : pkRows) {
index++;
pst.setObject(index, pkField.getValue(), pkField.getType());
}
rs = pst.executeQuery();
afterImage = TableRecords.buildRecords(tmeta, rs);
} finally {
if (rs != null) {
rs.close();
}
if (pst != null) {
pst.close();
}
}
return afterImage;
}
其实实现也很简单,使用select for update来保证查询前的值的准确性。在更新后再查一次,得到更新后的值。
我在UpdateExecutor上增加日志来直观观察下更新前后的查询sql
2019-02-04 10:38:22,724 [FESCAR] [DubboServerHandler-172.19.5.94:20882-thread-2] INFO com.alibaba.fescar.rm.datasource.exec.UpdateExecutor - 查询beforeImage...selectSQL:SELECT ID, count FROM storage_tbl WHERE commodity_code = ? FOR UPDATE,param:["C00321"]
2019-02-04 10:38:22,739 [FESCAR] [DubboServerHandler-172.19.5.94:20882-thread-2] INFO com.alibaba.fescar.rm.datasource.exec.UpdateExecutor - 查询afterImage...selectSQL:SELECT ID, count FROM storage_tbl WHERE ID = ? FOR UPDATE,param:[{"name":"ID","keyType":"PrimaryKey","type":4,"value":11}]
事实上更新前后的sql查询实现并没有那么简单,涉及到多条sql的更新如何保存?多个表的更新?更新语句如果有别名的情况如何处理?这些细节都要经过严格的测试才行。然而这块我目前没看到对应的单元测试,感觉得自己补充完成才能验证这块逻辑的正确性了
三、总结
因此fescar在第一阶段提交的时候其实会增加了两次的查询成本。有没有办法通过类似binlog的方式来异步实现这个逻辑呢?如果涉及到异步的操作我们可能就需要考虑如何保证undo log能够写入成功的问题了
网友评论