美文网首页
Dtm子事务屏障

Dtm子事务屏障

作者: airmy丶 | 来源:发表于2022-10-12 18:40 被阅读0次

    代码库: https://github.com/dtm-labs/client

    子事务屏障原理

    在本地数据库(资源管理器,即执行try、confirm、cancel的子事务端),创建一个分支操作状态表,使用全局事务id-分支事务id-分支操作(try|confirm|cancel)作为唯一键。

    屏障判断流程如下:

    1. 开启本地事务(主应用程序开启Tcc事务)
    2. 对于当前操作(try|confirm|cancel),插入一条数据,通过唯一键进行约束,如果插入不成功,提交事务返回成功。
    3. 如果当前操作是cancel,在执行cancel之前想分支状态表插入一个唯一键为gid-branchid-try 的数据,如果插入成功,这直接提交事务返回成功。(空回滚)
    4. 执行屏障内业务逻辑,如果业务返回成功则成功,失败则失败。

    这个流程是如何解决空回滚、幂等和悬挂的异常情况呢?

    • 空回滚产生的情况是执行cancel的时候try没有执行(一般情况下是try的时候超时了,被主程序捕捉异常然后触发了cancel),这个时候根据第3步,在cancel之前插入一个try的操作,如果插入成功则说明try没有正确执行直接返回事务成功不进行补偿了。
    • 幂等的控制会被分支状态表的唯一键约束了。
    • 悬挂的产生是因为try延迟了,然后执行了cancel,这个时候try执行了。这个情况下因为第三部的存在,及时try没有被执行,但是cancel检测到之后会插入一个try的操作在返回,所以当try再执行的时候也会产生主键冲突,也不会发生悬挂的情况了。

    dtm 的 Tcc 模式下屏障源码

    主程序调用:

    gid := shortuuid.New()
    err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
    resp, err := tcc.CallBranch(&busi.ReqHTTP{Amount: 30}, busi.Busi+"/TccBTransOutTry",busi.Busi+"/TccBTransOutConfirm", busi.Busi+"/TccBTransOutCancel")
    if err != nil {
        return resp, err
    }
    return tcc.CallBranch(&busi.ReqHTTP{Amount:30},busi.Busi+"/TccBTransInTry",busi.Busi+"/TccBTransInConfirm",busi.Busi+"/TccBTransInCancel")})
    

    资源管理器执行分支事务操作

    req := reqFrom(c)
    if req.TransOutResult != "" {
        return string2DtmError(req.TransOutResult)
    }
    bb := MustBarrierFromGin(c)
    if req.Store == Redis {
        return bb.RedisCheckAdjustAmount(RedisGet(),GetRedisAccountKey(TransOutUID), req.Amount, 7*86400)
    } else if req.Store == Mongo {
        return bb.MongoCall(MongoGet(), func(sc mongo.SessionContext) error {
            return SagaMongoAdjustBalance(sc, sc.Client(), TransOutUID, -req.Amount, "")
        })
    }
    
    return bb.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
        return tccAdjustTrading(tx, TransOutUID, -req.Amount)
    })
    

    主要的屏障逻辑代码在 bb.CallWithDB 中(逻辑注释在代码中)

    func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) {
        bid := bb.newBarrierID()
        defer dtmimp.DeferDo(&rerr, func() error {
            return tx.Commit()
        }, func() error {
            return tx.Rollback()
        })
      
        // 这里是一个操作映射,tcc模式下执行cancel会先检测try是否已经执行
        originOp := map[string]string{
            dtmimp.OpCancel:     dtmimp.OpTry,    // tcc
            dtmimp.OpCompensate: dtmimp.OpAction, // saga
            dtmimp.OpRollback:   dtmimp.OpAction, // workflow
        }[bb.Op]
    
        // 这里也是主要用与cancel,执行cancel会插入try
        originAffected, oerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, originOp, bid, bb.Op, bb.DBType, bb.BarrierTableName)
        // 向本地分支状态表插入当前的op操作记录 以 gid-branchid-op 为唯一键约束
        currentAffected, rerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op, bb.DBType, bb.BarrierTableName)
        logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
    
        // 幂等判断
        if rerr == nil && bb.Op == dtmimp.MsgDoOp && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
            return ErrDuplicated
        }
    
        if rerr == nil {
            rerr = oerr
        }
      
        // 空回滚判断
        if (bb.Op == dtmimp.OpCancel || bb.Op == dtmimp.OpCompensate || bb.Op == dtmimp.OpRollback) && originAffected > 0 || // null compensate
            currentAffected == 0 { // repeated request or dangled request
            return
        }
      
        // 没有错误
        if rerr == nil {
            rerr = busiCall(tx)
        }
        return
    }
    

    总的来说,Tcc模式下的子事务屏障原理还是相对简单,应该也是比较容易理解,也很巧妙的统一解决了分布式事务下容易产生的悬挂,幂等和空回滚问题,也可做学习借鉴!

    相关文章

      网友评论

          本文标题:Dtm子事务屏障

          本文链接:https://www.haomeiwen.com/subject/lewfzrtx.html