先做下记录
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
}
BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
applicationData);
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch Rollbacked result: " + status);
}
}
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
StackTraceLogger.info(LOGGER, te,
"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}
@Override
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;
for (; ; ) {
try {
conn = dataSourceProxy.getPlainConnection();
// The entire undo process should run in a local transaction.
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
// Find UNDO LOG
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();
boolean exists = false;
while (rs.next()) {
exists = true;
// It is possible that the server repeatedly sends a rollback request to roll back
// the same branch transaction to multiple processes,
// ensuring that only the undo_log in the normal state is processed.
int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
if (!canUndo(state)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
}
return;
}
String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = parseContext(contextString);
byte[] rollbackInfo = getRollbackInfo(rs);
String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
: UndoLogParserFactory.getInstance(serializer);
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
try {
// put serializer name to local
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
sqlUndoLog.setTableMeta(tableMeta);
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
dataSourceProxy.getDbType(), sqlUndoLog);
undoExecutor.executeOn(conn);
}
} finally {
// remove serializer name
removeCurrentSerializer();
}
}
// If undo_log exists, it means that the branch transaction has completed the first phase,
// we can directly roll back and clean the undo_log
// Otherwise, it indicates that there is an exception in the branch transaction,
// causing undo_log not to be written to the database.
// For example, the business processing timeout, the global transaction is the initiator rolls back.
// To ensure data consistency, we can insert an undo_log with GlobalFinished state
// to prevent the local transaction of the first phase of other programs from being correctly submitted.
// See https://github.com/seata/seata/issues/489
if (exists) {
deleteUndoLog(xid, branchId, conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,
State.GlobalFinished.name());
}
} else {
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,
State.GlobalFinished.name());
}
}
return;
} catch (SQLIntegrityConstraintViolationException e) {
// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
}
} catch (Throwable e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException rollbackEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
}
}
throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
branchId, e.getMessage()), e);
} finally {
try {
if (rs != null) {
rs.close();
}
if (selectPST != null) {
selectPST.close();
}
if (conn != null) {
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
conn.close();
}
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
}
}
}
}
public void executeOn(Connection conn) throws SQLException {
if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {
return;
}
try {
String undoSQL = buildUndoSQL();
PreparedStatement undoPST = conn.prepareStatement(undoSQL);
TableRecords undoRows = getUndoRows();
for (Row undoRow : undoRows.getRows()) {
ArrayList<Field> undoValues = new ArrayList<>();
List<Field> pkValueList = getOrderedPkList(undoRows, undoRow, getDbType(conn));
for (Field field : undoRow.getFields()) {
if (field.getKeyType() != KeyType.PRIMARY_KEY) {
undoValues.add(field);
}
}
undoPrepare(undoPST, undoValues, pkValueList);
undoPST.executeUpdate();
}
} catch (Exception ex) {
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
throw new SQLException(ex);
}
}
}
protected boolean dataValidationAndGoOn(Connection conn) throws SQLException {
TableRecords beforeRecords = sqlUndoLog.getBeforeImage();
TableRecords afterRecords = sqlUndoLog.getAfterImage();
// Compare current data with before data
// No need undo if the before data snapshot is equivalent to the after data snapshot.
Result<Boolean> beforeEqualsAfterResult = DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords);
if (beforeEqualsAfterResult.getResult()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Stop rollback because there is no data change " +
"between the before data snapshot and the after data snapshot.");
}
// no need continue undo.
return false;
}
// Validate if data is dirty.
TableRecords currentRecords = queryCurrentRecords(conn);
// compare with current data and after image.
Result<Boolean> afterEqualsCurrentResult = DataCompareUtils.isRecordsEquals(afterRecords, currentRecords);
if (!afterEqualsCurrentResult.getResult()) {
// If current data is not equivalent to the after data, then compare the current data with the before
// data, too. No need continue to undo if current data is equivalent to the before data snapshot
Result<Boolean> beforeEqualsCurrentResult = DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords);
if (beforeEqualsCurrentResult.getResult()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Stop rollback because there is no data change " +
"between the before data snapshot and the current data snapshot.");
}
// no need continue undo.
return false;
} else {
if (LOGGER.isInfoEnabled()) {
if (StringUtils.isNotBlank(afterEqualsCurrentResult.getErrMsg())) {
LOGGER.info(afterEqualsCurrentResult.getErrMsg(), afterEqualsCurrentResult.getErrMsgParams());
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("check dirty datas failed, old and new data are not equal," +
"tableName:[" + sqlUndoLog.getTableName() + "]," +
"oldRows:[" + JSON.toJSONString(afterRecords.getRows()) + "]," +
"newRows:[" + JSON.toJSONString(currentRecords.getRows()) + "].");
}
throw new SQLException("Has dirty records when undo.");
}
}
return true;
}
// Validate if data is dirty.
TableRecords currentRecords = queryCurrentRecords(conn);
网友评论