Golang事务模型

作者: _张晓龙_ | 来源:发表于2016-12-29 21:27 被阅读3066次

    序言

    笔者在《软件设计的演变过程》一文中,将通信系统软件的DDD分层模型最终演进为五层模型,即调度层(Schedule)、事务层(Transaction DSL)、环境层(Context)、领域层(Domain)和基础设施层(Infrastructure),我们简单回顾一下:

    ddd-layer-with-dci-dsl.png
    1. 调度层:维护UE的状态模型,只包括业务的本质状态,将接收到的消息派发给事务层。
    2. 事务层:对应一个业务流程,比如UE Attach,将各个同步消息或异步消息的处理组合成一个事务,当事务失败时,进行回滚。当事务层收到调度层的消息后,委托环境层的Action进行处理。
    3. 环境层:以Action为单位,处理一条同步消息或异步消息,将Domain层的领域对象cast成合适的role,让role交互起来完成业务逻辑。
    4. 领域层:不仅包括领域对象及其之间关系的建模,还包括对象的角色role的显式建模。
    5. 基础实施层:为其他层提供通用的技术能力,比如事务模型的框架、消息通信机制、对象持久化机制和通用的算法等

    本文将聚焦于事务层,主要讨论事务模型,代码抽象层次和业务流程图一一对应。

    同步模型

    毫无疑问,异步模型是复杂的。但在管理域的组件中,对实时性和性能并没有极致的要求,同时协程(比如,Goroutine)非常轻量级,所以使用同步模型是一种非常聪明且简单的处理方式,如下图所示:

    synchronous-model.png

    在一个同步模型里,一个系统一旦发出一个请求消息,并需要等待其应答,则当前协程就会进入休眠态,直到应答消息来临或超时为止。协程可以看做是用户态轻量级的线程,占用资源非常少,当前系统同时可以有成百上千个协程运行。

    假定Action是一条同步消息的交互,那么业务的流程图就对应一个Action序列。

    事务

    事务(Transaction,简写为Trans)一词来源于数据处理的概念,下面是Wikipedia 对事务的定义:

    In computer science, transaction processing is information processing thatis divided into individual, indivisible operations, called transactions. Each transaction must succeed or fail as a complete unit; it cannot remain in an intermediate state.

    一般情况下,一个单一场景的用户流程图就对应一个事务,而事务则由一个Action序列组成。

    transaction.png

    从S1到S2的一次同步请求处理过程中,站在S1的视角是一个Action,而站在S2的视角却是一个事务。

    事务过程控制

    基础数据结构

    TransInfo

    TransInfo是事务模型中一个非常重要的数据结构,用于事务执行过程中的数据传递,比如事务层注入到环境层的数据,Action之间串联的数据。
    TransInfo在基础设施层的trans-dsl框架中定义,如下:

    type TransInfo struct {
        // trans dsl framework params
        Times int
        RepeatIdx int
    
        // user app info
        AppInfo interface{}
    }
    

    S1Obj

    当前系统为S2,当收到来自S1的同步请求时,S2启动一个协程处理该请求。当该协程调用到调度层后,执行相关事务。如果该事务执行失败,则进行回滚。

    一个简化的事务接口调用代码,如下:

    func scheduleS1ReqTrans(req []byte) error {
        transInfo := &transdsl.TransInfo{AppInfo: &context.S2Info{}}
        s1ReqTrans := trans.NewS1ReqTrans()
        err = s1ReqTrans.Exec(transInfo)
        if err != nil {
            s1ReqTrans.RollBack(transInfo)
        }
        return err
    }
    

    Fragment

    从语义层次上看,一个Fragment是一个流程片段。
    从代码层次上看,一个Fragment是一个interface。

    Fragment在基础设施层的trans-dsl框架中定义,如下:

    type Fragment interface {
        Exec(transInfo *TransInfo) error
        RollBack(transInfo *TransInfo)
    }
    

    Action

    Action是一条同步消息的交互。所有具体的Action在环境层中定义,是提供给事务层的原子操作,是一个粒度最小的流程片段,需要实现Fragment接口,具体实现和业务紧密相关。

    Procedure

    Procedure是多条关系紧密的同步消息的交互,在基础设施层的trans-dsl框架中定义,是一个比Action更大的复用单元,也是一个流程片段,需要实现Fragment接口。
    Procedure本身又是一个由Action或Procedre组成的序列,其中Action是叶子节点,Procedure是中间节点,所以Procedre是一棵多叉树。

    Procedure的定义如下:

    type Procedure struct {
        Fragments  []Fragment
    }
    

    在事务层创建一个具体的Procedure的代码如下:

    func newProcedure1() transdsl.Fragment {
        procedure := &transdsl.Procedure {
            Fragments: []transdsl.Fragment {
                new(context.Action11),
                newProcedure2(),
                new(context.Action12),
            },
        }
        return procedure
    }
    
    func newProcedure2() transdsl.Fragment {
        procedure := &transdsl.Procedure {
            Fragments: []transdsl.Fragment {
                new(context.Action21),
                new(context.Action22),
            },
        }
        return procedure
    }
    

    Transaction

    Transaction对应一次业务处理,在基础设施层的trans-dsl框架中定义,是该业务中最大的Procedure,当然也是一个流程片段,需要实现Fragment接口。

    Transaction的定义如下:

    type Transaction struct {
        Fragments  []Fragment
        errIndex int
    }
    

    说明:errIndex用于事务回滚。

    在事务层创建一个具体的Transaction的代码如下:

    func NewS1ReqTrans() *transdsl.Transaction {
        trans := &transdsl.Transaction {
            Fragments: []transdsl.Fragment {
                new(context.Action1),
                newProcedure1(),
                new(context.Action2),
            },
        }
        return trans
    }
    

    Repeat

    从语义层次来看,Repeat用来修饰Action或Procedure,说明该Action或Procedure可以执行多次,并且至少执行一次,同时产生一个新的Procedure。
    从代码层次上看,Repeat在基础设施层的trans-dsl框架中定义,实现了接口Fragment。

    Repeat的定义如下:

    type Repeat struct {
        Fragments []Fragment
        FuncVar fun() Fragment
    }
    

    说明:Fragments用于事务回滚。

    Repeat的执行次数是动态确定的,即由上一个Action写入TransInfo。

    有了Repeat后,我们可以在事务层定义一个事务如下:

    func NewS1ReqTrans() *transdsl.Transaction {
        trans := &transdsl.Transaction {
            Fragments: []transdsl.Fragment {
                new(context.Action1),
                new(context.Action2),
                &transdsl.Repeat {
                    FuncVar: newProcedure1,
                },
                new(context.Action3),
            },
        }
        return trans
    }
    

    Optional

    Optional与Repeat类似-
    从语义层次来看,Optional用来修饰Action或Procedure,说明该Action或Procedure最多执行一次,并且可以不执行。
    从代码层次来看,Optional在基础设施层的trans-dsl框架中定义,实现了接口Fragment。

    Optional的定义如下:

    type Optional struct {
        Spec Specification
        Fragment Fragment
        isExec bool
    }
    

    说明:isExec用于事务回滚。

    Optional的执行次数由谓词Specification确定,Specification是一个interface,它也在环境层定义,如下:

    type Specification interface {
        Ok(transInfo *TransInfo) bool
    }
    

    谓词的实例来自两个方面的确认:

    1. 系统的某个开关是否打开,即开关打开时,谓词为真,执行一次Action或Procedure,否则执行零次。
    2. 系统的当前状态是否满足某个条件,即条件满足时,谓词为真,执行一次Action或Procedure,否则执行零次。

    有了Optional后,我们可以在事务层定义一个事务如下:

    func NewS1ReqTrans() *transdsl.Transaction {
        trans := &transdsl.Transaction {
            Fragments: []transdsl.Fragment {
                new(context.Action1),
                &transdsl.Optional {
                    Spec: new(context.ShouldExecAction2),
                    Fragment: new(context.Action2),
                },
                &transdsl.Repeat {
                    FuncVar: newProcedure1,
                },
                new(context.Action3),
            },
        }
        return trans
    }
    

    默认

    从语义层次来看, 没有Repeat或Optional修饰的Action或Procedure就是默认的情况,说明Action或Procedure仅且执行一次。

    事务回滚

    对于事务来说,执行要么成功,要么失败。当事务执行失败时,必须触发回滚,使得系统无资源泄露或残留。
    当事务执行失败时,肯定是在某一个Fragment执行时失败,我们记作fragments[i],事务回滚的过程为:

    1. fragments[i]完成自己已分配的资源的回收和自己已写入的数据的清理;
    2. 从fragments[i-1]到fragments[0],依次调用它的RollBack方法。

    Action

    Action是事务的原子执行者,从叶子节点来看,事务都是Action序列。
    当某个Action执行失败时,在Exec方法内进行该Action相关的资源回收或数据清理,不会调用该Action的RollBack函数。
    Action的RollBack方法实现很简单,仅进行该Action相关的所有资源回收和数据清理。

    举个例子:

    Action5在执行失败前,打开了文件file1,在表table1中写了一条记录,那么它在返回error前要删除表table1中的记录,并关闭文件file1,即逆序的进行资源回收和数据清理。
    至于Action1到Action4中打开了什么资源或写了什么数据,Action5一点都不care。
    Action5返回错误后,事务回滚框架会自动依次调用[Action4,Action3, Action2, Action1]的Rollback函数,从而完成事务的回滚。

    Procedure

    如果Procedure执行失败,则在Exec方法中进行“错误处理”:

    func (this *Procedure) Exec(transInfo *TransInfo) error {
        index, err := forEachFragments(this.Fragments, transInfo)
        if err != nil {
            if index <= 0 {
                return err
            }
            backEachFragments(this.Fragments, transInfo, index)
        }
        return err
    }
    

    Exec方法在实现中使用了事务层的原语forEachFragments和backEachFragments:

    1. 对于forEachFragments原语,正向遍历Fragments,依次调用它的Exec方法。
    2. 对于backEachFragments原语,从index - 1开始反向遍历Fragments,依次调用它的RollBack方法。

    如果Procedure执行成功,回滚时直接调用RollBack方法即可:

    func (this *Procedure) RollBack(transInfo *TransInfo) {
        backEachFragments(this.Fragments, transInfo, len(this.Fragments))
    }
    

    Repeat

    如果Repeat执行失败,则进行“错误处理”:

    func (this *Repeat) Exec(transInfo *TransInfo) error {
        this.Fragments = make([]Fragment, transInfo.Times)
        for i := 0; i < transInfo.Times; i++ {
            transInfo.RepeatIdx = i
            this.Fragments[i] = this.FuncVar()
            err := this.Fragments[i].Exec(transInfo)
            if err != nil {
                if IsErrorEqual(err, ErrContinue) {
                    continue
                }
                if i == 0 {
                    return err
                }
                i--
                for j := i; j >= 0; j-- {
                    transInfo.RepeatIdx = j
                    this.Fragments[j].RollBack(transInfo)
                }
                return err
            }
        }
        return nil
    }
    

    这里的transInfo.RepeatIdx需要解释一下:

    1. 在this.Fragments[i].Exec之前赋值为i,是为了Repeat在执行Action或Procedure时,找到对应的领域对象。
    2. this.Fragments[j].RollBack之前赋值为j,是为了Repeat在“错误处理”时,即回滚已经完成的Action或Procedure时,找到对应的领域对象。举个例子,比如Repeat的最大次数是5,当进行到第4次时发生了错误,这时需要回滚前3次的Action或Procedure。

    如果repeat执行成功,回滚时直接调用RollBack方法即可:

    func (this *Repeat) RollBack(transInfo *TransInfo) {
        for i := transInfo.Times; i >= 0; i-- {
            transInfo.RepeatIdx = i
            this.Fragments[i].RollBack(transInfo)
        }
    }
    

    Optional

    optional就比较简单了,如果执行过程中发生了错误,则啥也不用干,因为Action或Procedure已完成了错误处理,如下所示:

    func (this *Optional) Exec(transInfo *TransInfo) error {
        if this.Spec.Ok(s1Obj, transInfo) {
            this.isExec = true
            return this.Fragment.Exec(s1Obj, transInfo)
        }
        return nil
    }
    

    如果optional执行成功,回滚时需要根据是否执行过Action或Procedure来进行Action或Procedure的回滚,如下所示:

    func (this *Optional) RollBack(transInfo *TransInfo) {
        if this.isExec {
            this.Fragment.RollBack(transInfo)
        }
    }
    

    事务并发

    事务的执行过程是一个同步模型,而事务之间却是异步的。多个事务间可能共享资源,所以要对事务进行并发控制。
    在Golang中,协程之间的并发控制一般使用channel,非常简单且高效。
    假设一组协程使用一个共享资源,这时通过一个channel控制,那么多组协程就需要多个channel来控制。我们可以使用map,key为shareId,value为channel。

    读channel

    根据业务流程,要在某个Specification(谓词,Optional的第一个参数)中读channel。假设该谓词为IsSomethingNotExist,示例代码如下:

    func (this *IsSomethingNotExist) Ok(transInfo *transdsl.TransInfo) bool {
        ...
        s2Info := transInfo.AppInfo.(*S2Info)
        <- s2Info.Chan
        s2Info.ChanFlag = true
        ...
    }
    

    要读channel,必须先注入。根据局部化原则,我们在谓词IsSomethingNotExist中进行注入,而不在前面的Action或Specification中进行注入,于是示例代码变为:

    func (this *IsSomethingNotExist) Ok(transInfo *transdsl.TransInfo) bool {
        ...
        s2Info := transInfo.AppInfo.(*S2Info)
        concurrencyctrl.ChanMapLock.Lock()
        value, ok := concurrencyctrl.ChanMap[shareId]
        if ok {
            s2Info.Chan = value
        } else {
            s2Info.Chan = make(chan int, 1)
            s2Info.Chan <- 1
            concurrencyctrl.ChanMap[shareId] = s2Info.Chan
        }
        concurrencyctrl.ChanMapLock.Unlock()
    
        <- s2Info.Chan
        s2Info.ChanFlag = true
        ...
    }
    

    写channel

    根据业务流程,要在读channel的Specification之后的某个Action中写channel。假设该Action为DiscussAction,示例代码如下:

    func (this *DiscussAction) Exec(transInfo *transdsl.TransInfo) error {
        ...
        s2Info := transInfo.AppInfo.(*S2Info)
        s2Info.Chan <- 1
        s2Info.ChanFlag = false
        ...
    }
    

    细心的读者可能已经发现,上面的描述“要在读channel的Specification之后的某个Action中写channel”存在两种情况:

    1. 该Specification是optional的第一个参数,而该Action或包含该Action的Procedure是对应的第二个参数
    2. 该Action在该Specification对应的optional操作之后

    不管Specification的Ok方法是否返回true,第二种情况总是会进行写channel操作,而第一种情况则未必,即当Specification的Ok方法返回为false时,并不会进行写channel操作,所以有瑕疵。该瑕疵的修复方法是在该Specification的Ok方法内进行判断,如果返回值为false,则进行写channel操作。假设该谓词为IsSomethingNeedDel,则示例代码为:

    func (this *IsSomethingNeedDel) Ok(transInfo *transdsl.TransInfo) bool {
        ...
        s2Info := transInfo.AppInfo.(*S2Info)
        concurrencyctrl.ChanMapLock.Lock()
        value, ok := concurrencyctrl.ChanMap[shareId]
        if ok {
            s2Info.Chan = value
        } else {
            s2Info.Chan = make(chan int, 1)
            s2Info.Chan <- 1
            concurrencyctrl.ChanMap[shareId] = s2Info.Chan
        }
        concurrencyctrl.ChanMapLock.Unlock()
    
        <- s2Info.Chan
        s2Info.ChanFlag = true
        ...
    
        if flag {
            log.Infof("***IsSomethingNeedDel: true***")
        } else {
            s2Info.Chan <- 1
            s2Info.ChanFlag = false
            log.Infof("***IsSomethingNeedDel: false***")
        }
        return flag
    }
    

    错误和异常处理

    channel的闭合操作

    在事务执行过程中,不管是遇到错误还是发生了异常(panic),可能会出现对于channel读了没有写的情况,即在事务处理过程中没有实现channel的闭合操作,这将导致该组的其他协程(Goroutine)也阻塞了。

    该问题的解决思路是在事务调度的入口方法中使用defer修饰的闭包对异常进行捕获,同时针对错误或异常都对channel尝试闭合操作,示例代码如下:

    func scheduleS1ReqTrans(req []byte) (err error) {
        transInfo := &transdsl.TransInfo{AppInfo: &context.S2Info{}}
        defer func() {
            if p := recover(); p != nil {
                str, ok := p.(string)
                if ok {
                    err = errors.New(str)
                } else {
                    err = errors.New("panic")
                }
    
                log.Info("S1ReqTrans panic recover start!")
                log.Error("Stack:", string(debug.Stack()))
                log.Info("S1ReqTrans panic recover end!")
            }
                    s2Info := transInfo.AppInfo.(*S2Info)
            if s2Info.ChanFlag {
                s2Info.Chan <- 1
            }
        }()
    
        s1ReqTrans := trans.NewS1ReqTrans()
        err = s1ReqTrans.Exec(transInfo)
        if err != nil {
            s1ReqTrans.RollBack(transInfo)
        }
        return err
    }
    

    事务的闭合操作

    在事务执行的过程中,如果发生了panic,则会被调度层的recover函数恢复,然后将panic转换为error。虽然知道这次业务是失败的,但是却没有触发事务回滚操作,从而释放已经申请的资源,所以必须在Context层显式的对panic进行处理:

    1. 在Action的defer函数中释放已经成功申请的资源
    2. 在Specification或Action的defer函数中将panic转换为error,触发事务回滚流程

    小结

    在管理域的组件中,对实时性和性能并没有极致的要求,同时Goroutine非常轻量级,所以使用同步模型是一种非常聪明且简单的处理方式。本文所讨论的事务模型针对的就是同步过程,先详细阐述了事务的过程控制,然后对事务的回滚给出了通用的设计框架,最后对事务的并发控制给出了简单高效的解决方案。事务模型在DDD的分层架构中位于第四层,代码抽象层次高且表达力强,和业务流程图一一对应,同时代码可以以Action或Procedure为粒度进行复用。

    相关文章

      网友评论

        本文标题:Golang事务模型

        本文链接:https://www.haomeiwen.com/subject/jampvttx.html