美文网首页GO与微服务
手撸golang GO与微服务 Saga模式之5

手撸golang GO与微服务 Saga模式之5

作者: 老罗话编程 | 来源:发表于2021-03-15 15:24 被阅读0次

    缘起

    最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
    本系列笔记拟采用golang练习之

    Saga模式

    • saga模式将分布式长事务切分为一系列独立短事务
    • 每个短事务是可通过补偿动作进行撤销的
    • 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用
    Saga由一系列的子事务“Ti”组成,
    每个Ti都有对应的补偿“Ci”,
    当Ti出现问题时Ci用于处理Ti执行带来的问题。
    
    可以通过下面的两个公式理解Saga模式。
    T = T1 T2 … Tn
    T = TCT
    
    Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,
    而应该将事务切分为一组按序依次提交的短事务,
    Saga模式满足ACD(原子性、一致性、持久性)特征。
    
    摘自 <<Go微服务实战>> 刘金亮, 2021.1
    

    目标

    • 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务
    • 事务消息队列服务的功能性要求
      • 消息不会丢失: 消息的持久化
      • 消息的唯一性: 要求每个消息有全局ID和子事务ID
      • 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试

    子目标(Day 5)

    • 重构和完善消息投递机制
      • iMsgHeap: 使用待投递消息堆缓存消息. 总是优先投递创建时间最小的消息
      • iMsgSource: 定义消息来源接口. 有两种消息来源, 1-数据库;2-eventbus
      • iMsgHistoryRing: 使用ring buffer记录近期已投递成功的消息, 防止重复投递
      • tConcurrentMsgHeap: 最小CreateTime优先的消息堆, 实现iMsgHeap接口, 并且是线程安全的.
      • tDBMsgSource: 从数据库拉取待投递消息, 实现iMsgSource接口
      • tLiveMsgSource: 监听eventbus即时推送的投递消息, 实现iMsgSource接口
      • tMsgHistoryRing: 历史消息的固定大小环形队列, 实现iMsgHistoryRing接口, 缓存近期已投递成功的消息
      • tDeliveryWorker:
        • 初始化时, 优先从数据库加载待投递消息
        • 使用iMsgHeap缓存待投递消息, 并确保有序
        • 使用iMsgSource接口, 分别从db和eventbus接收投递消息
        • 使用iMsgHistoryRing, 缓存已投递成功的消息, 防止重复投递

    iMsgHeap.go

    使用待投递消息堆缓存消息. 总是优先投递创建时间最小的消息

    package delivery
    
    import "learning/gooop/saga/mqs/models"
    
    type iMsgHeap interface {
        Size() int
        IsEmpty() bool
        IsNotEmpty() bool
    
        Push(msg *models.QueuedMsg)
        Peek() *models.QueuedMsg
        Pop() *models.QueuedMsg
    }
    

    iMsgSource.go

    定义消息来源接口. 有两种消息来源, 1-数据库;2-eventbus

    package delivery
    
    import "learning/gooop/saga/mqs/models"
    
    type iMsgSource interface {
        MsgChan() <- chan *models.QueuedMsg
    }
    
    type tSourceExpireFunc func() bool
    

    iMsgHistoryRing.go

    使用ring buffer记录近期已投递成功的消息, 防止重复投递

    package delivery
    
    import "learning/gooop/saga/mqs/models"
    
    type iMsgHistoryRing interface {
        Push(msg *models.QueuedMsg)
        Has(id int) bool
    }
    

    tConcurrentMsgHeap.go

    最小CreateTime优先的消息堆, 实现iMsgHeap接口, 并且是线程安全的.

    package delivery
    
    import (
        "learning/gooop/saga/mqs/models"
        "sync"
    )
    
    type tConcurrentMsgHeap struct {
        items []*models.QueuedMsg
        size int
        mutex *sync.Mutex
    }
    
    func newMsgHeap() iMsgHeap {
        it := new(tConcurrentMsgHeap)
        it.init()
        return it
    }
    
    func (me *tConcurrentMsgHeap) init() {
        me.items = make([]*models.QueuedMsg, 0)
        me.size = 0
        me.mutex = new(sync.Mutex)
    }
    
    func (me *tConcurrentMsgHeap) Size() int {
        return me.size
    }
    
    func (me *tConcurrentMsgHeap) IsEmpty() bool {
        return me.size <= 0
    }
    
    func (me *tConcurrentMsgHeap) IsNotEmpty() bool {
        return !me.IsEmpty()
    }
    
    
    func (me *tConcurrentMsgHeap) has(msgID int) bool {
        for _,it := range me.items {
            if it.MsgID == msgID {
                return true
            }
        }
        return false
    }
    
    func (me *tConcurrentMsgHeap) Push(msg *models.QueuedMsg) {
        me.mutex.Lock()
        defer me.mutex.Unlock()
    
        if me.has(msg.MsgID) {
            return
        }
    
        me.ensureSize(me.size + 1)
        me.items[me.size] = msg
        me.size++
    
        me.shiftUp(me.size - 1)
    }
    
    
    func (me *tConcurrentMsgHeap) ensureSize(size int) {
        for ;len(me.items) < size; {
            me.items = append(me.items, nil)
        }
    }
    
    func (me *tConcurrentMsgHeap) parentOf(i int) int {
        return (i - 1) / 2
    }
    
    func (me *tConcurrentMsgHeap) leftChildOf(i int) int {
        return i*2 + 1
    }
    
    func (me *tConcurrentMsgHeap) rightChildOf(i int) int {
        return me.leftChildOf(i) + 1
    }
    
    func (me *tConcurrentMsgHeap) last() (i int, v *models.QueuedMsg) {
        if me.IsEmpty() {
            return -1, nil
        }
    
        i = me.size - 1
        v = me.items[i]
        return i,v
    }
    
    func (me *tConcurrentMsgHeap) shiftUp(i int) {
        if i <= 0 {
            return
        }
        v := me.items[i]
    
        pi := me.parentOf(i)
        pv := me.items[pi]
    
        if me.less(v, pv) {
            me.items[pi], me.items[i] = v, pv
            me.shiftUp(pi)
        }
    }
    
    func (me *tConcurrentMsgHeap) less(a, b *models.QueuedMsg) bool {
        return a.CreateTime < b.CreateTime
    }
    
    func (me *tConcurrentMsgHeap) Pop() *models.QueuedMsg {
        me.mutex.Lock()
        defer me.mutex.Unlock()
    
        if me.IsEmpty() {
            return nil
        }
    
    
        top := me.items[0]
        li, lv := me.last()
        me.items[0] = nil
        me.size--
    
        if me.IsEmpty() {
            return top
        }
    
        me.items[0] = lv
        me.items[li] = nil
    
        me.shiftDown(0)
        return top
    }
    
    
    func (me *tConcurrentMsgHeap) Peek() *models.QueuedMsg {
        me.mutex.Lock()
        defer me.mutex.Unlock()
    
        if me.IsEmpty() {
            return nil
        }
    
        return me.items[0]
    }
    
    func (me *tConcurrentMsgHeap) shiftDown(i int) {
        pv := me.items[i]
        ok, ci, cv := me.minChildOf(i)
        if ok && me.less(cv, pv) {
            me.items[i], me.items[ci] = cv, pv
            me.shiftDown(ci)
        }
    }
    
    func (me *tConcurrentMsgHeap) minChildOf(p int) (ok bool, i int, v *models.QueuedMsg) {
        li := me.leftChildOf(p)
        if li >= me.size {
            return false, 0, nil
        }
        lv := me.items[li]
    
        ri := me.rightChildOf(p)
        if ri >= me.size {
            return true, li, lv
        }
        rv := me.items[ri]
    
        if me.less(lv, rv) {
            return true, li, lv
        } else {
            return true, ri, rv
        }
    }
    

    tDBMsgSource.go

    从数据库拉取待投递消息, 实现iMsgSource接口

    package delivery
    
    import (
        "github.com/jmoiron/sqlx"
        "learning/gooop/saga/mqs/database"
        "learning/gooop/saga/mqs/models"
        "time"
    )
    
    type tDBMsgSource struct {
        clientID string
        expireFunc tSourceExpireFunc
        msgChan chan *models.QueuedMsg
    }
    
    func newDBMsgSource(clientID string, expireFunc tSourceExpireFunc) iMsgSource {
        it := new(tDBMsgSource)
        it.init(clientID, expireFunc)
        return it
    }
    
    func (me *tDBMsgSource) init(clientID string, expireFunc tSourceExpireFunc) {
        me.clientID = clientID
        me.expireFunc = expireFunc
        me.msgChan = make(chan *models.QueuedMsg, 1)
        go me.beginPollDB()
    }
    
    func (me *tDBMsgSource) MsgChan() <- chan *models.QueuedMsg {
        return me.msgChan
    }
    
    func (me *tDBMsgSource) beginPollDB() {
        interval := time.Duration(1) * time.Second
        for !me.expireFunc() {
            if len(me.msgChan) <= 0 {
                ok, msg := me.poll()
                if ok {
                    me.msgChan <- msg
                    continue
                }
            }
    
            // poll failed, or chan full
            time.Sleep(interval)
        }
    
        close(me.msgChan)
    }
    
    func (me *tDBMsgSource) poll() (bool, *models.QueuedMsg) {
        msg := &models.QueuedMsg{}
        e := database.DB(func(db *sqlx.DB) error {
            rows, err := db.Queryx(
                "select * from delivery_queue where client_id=? order by create_time asc limit 1",
                me.clientID,
            )
            if err != nil {
                return err
            }
    
            if rows.Next() {
                err = rows.StructScan(msg)
                if err != nil {
                    return err
                }
                return nil
    
            } else {
                return gEmptyRowsErr
            }
        })
    
        if e != nil {
            return false, nil
        } else {
            return true, msg
        }
    }
    

    tLiveMsgSource.go

    监听eventbus即时推送的投递消息, 实现iMsgSource接口

    package delivery
    
    import (
        "fmt"
        "learning/gooop/saga/mqs/eventbus"
        "learning/gooop/saga/mqs/logger"
        "learning/gooop/saga/mqs/models"
        "learning/gooop/saga/mqs/models/events"
        "time"
    )
    
    type tLiveMsgSource struct {
        clientID string
        expireFunc tSourceExpireFunc
        msgChan chan *models.QueuedMsg
    }
    
    func newLiveMsgSource(clientID string, expireFunc tSourceExpireFunc) iMsgSource {
        it := new(tLiveMsgSource)
        it.init(clientID, expireFunc)
        return it
    }
    
    func (me *tLiveMsgSource) init(clientID string, expireFunc tSourceExpireFunc) {
        me.clientID = clientID
        me.expireFunc = expireFunc
        me.msgChan = make(chan *models.QueuedMsg, 1)
    
        eventbus.GlobalEventBus.Sub(events.MsgPublishedEvent,
            me.id(),
            me.handleMsgPublished)
        go me.beginWatchExpire()
    }
    
    
    func (me *tLiveMsgSource) id() string {
        return fmt.Sprintf("tLiveMsgSource.%s", me.clientID)
    }
    
    func (me *tLiveMsgSource) beginWatchExpire() {
        for range time.Tick(1 * time.Second) {
            if me.expireFunc() {
                me.afterExpired()
                return
            }
        }
    }
    
    func (me *tLiveMsgSource) afterExpired() {
        eventbus.GlobalEventBus.Unsub(events.MsgPublishedEvent, me.id())
        close(me.msgChan)
    }
    
    func (me *tLiveMsgSource) handleMsgPublished(_ string, args interface{}) {
        msg, ok := args.(*models.QueuedMsg)
        if !ok {
            return
        }
    
        if msg.ClientID != me.clientID {
            return
        }
    
        if len(me.msgChan) >= 0 {
            return
        }
    
        logger.Logf(
            "tLiveMsgSource.handleMsgPublished, clientID=%s, msg=%s/%s/%s",
            me.clientID, msg.GlobalID, msg.SubID, msg.Topic )
        me.msgChan <- msg
    }
    
    func (me *tLiveMsgSource) MsgChan() <- chan *models.QueuedMsg {
        return me.msgChan
    }
    

    tMsgHistoryRing.go

    历史消息的固定大小环形队列, 实现iMsgHistoryRing接口, 缓存近期已投递成功的消息

    package delivery
    
    import "learning/gooop/saga/mqs/models"
    
    
    type tMsgHistoryRing struct {
        items []*models.QueuedMsg
        capacity int
        index int
    }
    
    func newMsgHistoryRing(capacity int) iMsgHistoryRing {
        it := new(tMsgHistoryRing)
        it.init(capacity)
        return it
    }
    
    func (me *tMsgHistoryRing) init(capacity int) {
        me.items = make([]*models.QueuedMsg, capacity)
        me.capacity = capacity
        me.index = 0
    }
    
    
    func (me *tMsgHistoryRing) Has(id int) bool {
        for _,it := range me.items {
            if it != nil && it.ID == id {
                return true
            }
        }
    
        return false
    }
    
    func (me *tMsgHistoryRing) Push(msg *models.QueuedMsg) {
        me.items[me.index] = msg
    
        me.index++
        if me.index >= me.capacity {
            me.index = 0
        }
    }
    

    tDeliveryWorker.go

    • 初始化时, 优先从数据库加载待投递消息
    • 使用iMsgHeap缓存待投递消息, 并确保有序
    • 使用iMsgSource接口, 分别从db和eventbus接收投递消息
    • 使用iMsgHistoryRing, 缓存已投递成功的消息, 防止重复投递
    package delivery
    
    import (
        "bytes"
        "encoding/json"
        "errors"
        "github.com/jmoiron/sqlx"
        "io/ioutil"
        "learning/gooop/saga/mqs/database"
        "learning/gooop/saga/mqs/logger"
        "learning/gooop/saga/mqs/models"
        "net/http"
        "time"
    )
    
    type tDeliveryWorker struct {
        info        *tWorkerInfo
        successRing iMsgHistoryRing
        dbSource iMsgSource
        liveSource iMsgSource
        msgHeap iMsgHeap
    }
    
    func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker {
        it := new(tDeliveryWorker)
        it.init(info)
        return it
    }
    
    // init: do initialization, and start initial load
    func (me *tDeliveryWorker) init(info *tWorkerInfo) {
        me.info = info
        me.successRing = newMsgHistoryRing(64)
    
        me.dbSource = newDBMsgSource(info.ClientID, me.isExpired)
        me.liveSource = newLiveMsgSource(info.ClientID, me.isExpired)
        me.msgHeap = newMsgHeap()
    
        go me.beginInitialLoadFromDB()
    }
    
    // beginInitialLoadFromDB: initially, load queued msg from database
    func (me *tDeliveryWorker) beginInitialLoadFromDB() {
        buf := [][]*models.QueuedMsg{ nil }
        for !me.isExpired() {
            err := database.DB(func(db *sqlx.DB) error {
                e, rows := me.loadFromDB(db)
                if e != nil {
                    return e
                }
    
                buf[0] = rows
                return nil
            })
    
            if err != nil {
                logger.Logf("tDeliveryWorker.initialLoadFromDB, clientID=%s, err=%s", me.info.ClientID, err.Error())
                time.Sleep(3 * time.Second)
            } else {
                me.afterInitialLoad(buf[0])
            }
        }
    }
    
    // loadFromDB: load queued msg from database
    func (me *tDeliveryWorker) loadFromDB(db *sqlx.DB) (error, []*models.QueuedMsg) {
        rows, err := db.Queryx(
            "select * from delivery_queue where client_id=? order by create_time asc limit ?",
            me.info.ClientID,
            gInitialLoadRows,
        )
        if err != nil {
            return err, nil
        }
    
        msgList := []*models.QueuedMsg{}
        for rows.Next() {
            msg := &models.QueuedMsg{}
            err = rows.StructScan(msg)
            if err != nil {
                return err, nil
            }
            msgList = append(msgList, msg)
        }
    
        return nil, msgList
    }
    
    // afterInitialLoad: after initial load done, push msgs into heap, and start delivery loop
    func (me *tDeliveryWorker) afterInitialLoad(msgList []*models.QueuedMsg) {
        logger.Logf("tDeliveryWorker.afterInitialLoad, clientID=%s, rows=%d", me.info.ClientID, len(msgList))
        for _,it := range msgList {
            me.msgHeap.Push(it)
        }
    
        go me.beginPollAndDeliver()
    }
    
    // beginPollAndDeliver: poll msg from heap, and then deliver it
    func (me *tDeliveryWorker) beginPollAndDeliver() {
        for !me.isExpired() {
            select {
            case msg := <- me.dbSource.MsgChan():
                me.msgHeap.Push(msg)
                break
    
            case msg := <- me.liveSource.MsgChan():
                me.msgHeap.Push(msg)
                break
            }
    
            if me.msgHeap.IsEmpty() {
                continue
            }
    
            msg := me.msgHeap.Pop()
            if msg == nil {
                continue
            }
    
            switch msg.StatusFlag {
            case 0:
                // 未处理的消息
                me.handleUndeliveredMsg(msg)
                break
    
            case 1:
                // 处理中的消息
                me.handleDeliveringMsg(msg)
                break
            }
        }
    }
    
    // isExpired: is me expired?
    func (me *tDeliveryWorker) isExpired() bool {
        return time.Now().UnixNano() >= me.info.ExpireTime
    }
    
    // handleUndeliveredMsg: if msg unhandled, then try to deliver it
    func (me *tDeliveryWorker) handleUndeliveredMsg(msg *models.QueuedMsg) {
        err := database.DB(func(db *sqlx.DB) error {
            now := time.Now().UnixNano()
            r,e := db.Exec(
                "update delivery_queue set status_flag=1, update_time=? where id=? and status_flag=0 and update_time=?",
                now,
                msg.ID,
                msg.UpdateTime,
            )
            if e != nil {
                return e
            }
    
            rows, e := r.RowsAffected()
            if e != nil {
                return e
            }
            if rows != 1 {
                return gOneRowsErr
            }
    
            msg.StatusFlag = 1
            msg.UpdateTime = now
    
            return nil
        })
    
        if err != nil {
            logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
            return
        }
    
        if me.deliver(msg) {
            me.afterDeliverySuccess(msg)
    
        } else {
            me.afterDeliveryFailed(msg)
        }
    }
    
    // deliver: use http.Post function to delivery msg
    func (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool {
        if me.successRing.Has(msg.ID) {
            return true
        }
    
        t := &models.TxMsg{
            GlobalID: msg.GlobalID,
            SubID: msg.SubID,
            Topic: msg.Topic,
            CreateTime: msg.CreateTime,
            Content: msg.Content,
        }
        j,e := json.Marshal(t)
        if e != nil {
            logger.Logf("tDeliveryWorker.deliver, failed json.Marshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
            return false
        }
    
        r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j))
        if e != nil {
            logger.Logf("tDeliveryWorker.deliver, failed http.Post, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
            return false
        }
    
        defer r.Body.Close()
        rep, e := ioutil.ReadAll(r.Body)
        if e != nil {
            logger.Logf("tDeliveryWorker.deliver, failed ioutil.ReadAll, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
            return false
        }
    
        m := &models.OkMsg{}
        e = json.Unmarshal(rep, m)
        if e != nil {
            logger.Logf("tDeliveryWorker.deliver, failed json.Unmarshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
            return false
        }
    
        if m.OK {
            return true
        } else {
            logger.Logf("tDeliveryWorker.deliver, failed OkMsg.OK, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
            return false
        }
    }
    
    // handleDeliveringMsg: if delivery timeout, then retry delivery
    func (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) {
        now := time.Now().UnixNano()
        if msg.UpdateTime + gDeliveryTimeoutNanos > now {
            return
        }
    
        // delivery timeout
        me.afterDeliveryTimeout(msg)
    }
    
    // afterDeliverySuccess: if done, move msg to success queue
    func (me *tDeliveryWorker) afterDeliverySuccess(msg *models.QueuedMsg) {
        if me.successRing.Has(msg.ID) {
            return
        }
        me.successRing.Push(msg)
    
        err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {
            r,e := db.Exec(
        "delete from delivery_queue where id=? and update_time=? and status_flag=1",
                msg.ID,
                msg.UpdateTime,
            )
            if e != nil {
                return e
            }
    
            rows, e := r.RowsAffected()
            if e != nil {
                return e
            }
            if rows != 1 {
                return gOneRowsErr
            }
    
            r, e = db.Exec(
                "insert into success_queue (msg_id, client_id, create_time) values(?, ?, ?)",
                msg.ID,
                msg.ClientID,
                time.Now().UnixNano(),
            )
            if e != nil {
                return e
            }
    
            rows, e = r.RowsAffected()
            if e != nil {
                return e
            }
            if rows != 1 {
                return gOneRowsErr
            }
    
            return nil
        })
    
        if err != nil {
            logger.Logf("tDeliveryWorker.afterDeliverySuccess, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
        } else {
            logger.Logf("tDeliveryWorker.afterDeliverySuccess, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
        }
    }
    
    
    // afterDeliveryFailed: if failed, do nothing but just log it
    func (me *tDeliveryWorker) afterDeliveryFailed(msg *models.QueuedMsg) {
        logger.Logf("tDeliveryWorker.afterDeliveryFailed, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
    }
    
    // afterDeliveryTimeout: if timeout, then reset status and retry
    func (me *tDeliveryWorker) afterDeliveryTimeout(msg *models.QueuedMsg) {
        err := database.DB(func(db *sqlx.DB) error {
            r,e := db.Exec(
                "update delivery_queue set status_flag=0 where id=? and status_flag=1 and update_time=?",
                msg.ID,
                msg.UpdateTime,
            )
            if e != nil {
                return e
            }
    
            rows,e := r.RowsAffected()
            if e != nil {
                return e
            }
    
            if rows != 1 {
                return gOneRowsErr
            }
    
            return nil
        })
    
        if err != nil {
            logger.Logf("tDeliveryWorker.afterDeliveryTimeout, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
        } else {
            logger.Logf("tDeliveryWorker.afterDeliveryTimeout, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
        }
    }
    
    var gEmptyRowsErr = errors.New("empty rows")
    var gOneRowsErr = errors.New("expecting one row affected")
    var gDeliveryTimeoutNanos = int64(10 * (time.Second / time.Nanosecond))
    var gInitialLoadRows = 100
    

    (未完待续)

    相关文章

      网友评论

        本文标题:手撸golang GO与微服务 Saga模式之5

        本文链接:https://www.haomeiwen.com/subject/rtmkcltx.html