代码库: https://github.com/dtm-labs/client
子事务屏障原理
在本地数据库(资源管理器,即执行try、confirm、cancel的子事务端),创建一个分支操作状态表,使用全局事务id-分支事务id-分支操作(try|confirm|cancel)作为唯一键。
屏障判断流程如下:
- 开启本地事务(主应用程序开启Tcc事务)
- 对于当前操作(try|confirm|cancel),插入一条数据,通过唯一键进行约束,如果插入不成功,提交事务返回成功。
- 如果当前操作是cancel,在执行cancel之前想分支状态表插入一个唯一键为gid-branchid-try 的数据,如果插入成功,这直接提交事务返回成功。(空回滚)
- 执行屏障内业务逻辑,如果业务返回成功则成功,失败则失败。
这个流程是如何解决空回滚、幂等和悬挂的异常情况呢?
- 空回滚产生的情况是执行cancel的时候try没有执行(一般情况下是try的时候超时了,被主程序捕捉异常然后触发了cancel),这个时候根据第3步,在cancel之前插入一个try的操作,如果插入成功则说明try没有正确执行直接返回事务成功不进行补偿了。
- 幂等的控制会被分支状态表的唯一键约束了。
- 悬挂的产生是因为try延迟了,然后执行了cancel,这个时候try执行了。这个情况下因为第三部的存在,及时try没有被执行,但是cancel检测到之后会插入一个try的操作在返回,所以当try再执行的时候也会产生主键冲突,也不会发生悬挂的情况了。
dtm 的 Tcc 模式下屏障源码
主程序调用:
gid := shortuuid.New()
err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
resp, err := tcc.CallBranch(&busi.ReqHTTP{Amount: 30}, busi.Busi+"/TccBTransOutTry",busi.Busi+"/TccBTransOutConfirm", busi.Busi+"/TccBTransOutCancel")
if err != nil {
return resp, err
}
return tcc.CallBranch(&busi.ReqHTTP{Amount:30},busi.Busi+"/TccBTransInTry",busi.Busi+"/TccBTransInConfirm",busi.Busi+"/TccBTransInCancel")})
资源管理器执行分支事务操作
req := reqFrom(c)
if req.TransOutResult != "" {
return string2DtmError(req.TransOutResult)
}
bb := MustBarrierFromGin(c)
if req.Store == Redis {
return bb.RedisCheckAdjustAmount(RedisGet(),GetRedisAccountKey(TransOutUID), req.Amount, 7*86400)
} else if req.Store == Mongo {
return bb.MongoCall(MongoGet(), func(sc mongo.SessionContext) error {
return SagaMongoAdjustBalance(sc, sc.Client(), TransOutUID, -req.Amount, "")
})
}
return bb.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
return tccAdjustTrading(tx, TransOutUID, -req.Amount)
})
主要的屏障逻辑代码在 bb.CallWithDB
中(逻辑注释在代码中)
func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) {
bid := bb.newBarrierID()
defer dtmimp.DeferDo(&rerr, func() error {
return tx.Commit()
}, func() error {
return tx.Rollback()
})
// 这里是一个操作映射,tcc模式下执行cancel会先检测try是否已经执行
originOp := map[string]string{
dtmimp.OpCancel: dtmimp.OpTry, // tcc
dtmimp.OpCompensate: dtmimp.OpAction, // saga
dtmimp.OpRollback: dtmimp.OpAction, // workflow
}[bb.Op]
// 这里也是主要用与cancel,执行cancel会插入try
originAffected, oerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, originOp, bid, bb.Op, bb.DBType, bb.BarrierTableName)
// 向本地分支状态表插入当前的op操作记录 以 gid-branchid-op 为唯一键约束
currentAffected, rerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op, bb.DBType, bb.BarrierTableName)
logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
// 幂等判断
if rerr == nil && bb.Op == dtmimp.MsgDoOp && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
return ErrDuplicated
}
if rerr == nil {
rerr = oerr
}
// 空回滚判断
if (bb.Op == dtmimp.OpCancel || bb.Op == dtmimp.OpCompensate || bb.Op == dtmimp.OpRollback) && originAffected > 0 || // null compensate
currentAffected == 0 { // repeated request or dangled request
return
}
// 没有错误
if rerr == nil {
rerr = busiCall(tx)
}
return
}
总的来说,Tcc模式下的子事务屏障原理还是相对简单,应该也是比较容易理解,也很巧妙的统一解决了分布式事务下容易产生的悬挂,幂等和空回滚问题,也可做学习借鉴!
网友评论