美文网首页GO与微服务
手撸golang GO与微服务 Saga模式之4

手撸golang GO与微服务 Saga模式之4

作者: 老罗话编程 | 来源:发表于2021-03-14 22:47 被阅读0次

    缘起

    最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
    本系列笔记拟采用golang练习之

    Saga模式

    • saga模式将分布式长事务切分为一系列独立短事务
    • 每个短事务是可通过补偿动作进行撤销的
    • 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用
    Saga由一系列的子事务“Ti”组成,
    每个Ti都有对应的补偿“Ci”,
    当Ti出现问题时Ci用于处理Ti执行带来的问题。
    
    可以通过下面的两个公式理解Saga模式。
    T = T1 T2 … Tn
    T = TCT
    
    Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,
    而应该将事务切分为一组按序依次提交的短事务,
    Saga模式满足ACD(原子性、一致性、持久性)特征。
    
    摘自 <<Go微服务实战>> 刘金亮, 2021.1
    

    目标

    • 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务
    • 事务消息队列服务的功能性要求
      • 消息不会丢失: 消息的持久化
      • 消息的唯一性: 要求每个消息有全局ID和子事务ID
      • 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试

    子目标(Day 4)

    • 完善投递worker
      • 未处理消息: 标记, 并尝试投递
      • 已处理消息: 判断是否超时, 并重试投递
      • 投递成功: 移动到成功投递表
      • 投递失败: 重置标记, 下轮重试
    • 数据库表相应的细节调整
      • delivery_queue: 去掉failed_count, 增加update_time时间戳
      • success_queue: 去掉sub_id, 改为client_id, 并增加create_time时间戳
      • failed_queue: 因为不允许失败, 因此删除失败投递表

    tDeliveryWorker.go

    • 完善投递worker
      • 未处理消息: 标记, 并尝试投递
      • 已处理消息: 判断是否超时, 并重试投递
      • 投递成功: 移动到成功投递表
      • 投递失败: 重置标记, 下轮重试
    package delivery
    
    import (
        "bytes"
        "encoding/json"
        "errors"
        "github.com/jmoiron/sqlx"
        "io/ioutil"
        "learning/gooop/saga/mqs/database"
        "learning/gooop/saga/mqs/logger"
        "learning/gooop/saga/mqs/models"
        "net/http"
        "time"
    )
    
    type tDeliveryWorker struct {
        info *tWorkerInfo
    }
    
    func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker {
        it := new(tDeliveryWorker)
        it.info = info
        go it.beginMainLoop()
        return it
    }
    
    func (me *tDeliveryWorker) beginMainLoop() {
        for !me.isExpired() {
            ok, msg := me.peek()
            if ok {
                switch msg.StatusFlag {
                case 0:
                    // 未处理的消息
                    me.handleUndeliveredMsg(msg)
                    break
    
                case 1:
                    // 处理中的消息
                    me.handleDeliveringMsg(msg)
                    break
                }
    
            } else {
                time.Sleep(time.Duration(1) * time.Second)
            }
        }
    }
    
    
    func (me *tDeliveryWorker) isExpired() bool {
        return time.Now().UnixNano() >= me.info.ExpireTime
    }
    
    // peek: 从待投递队列中获取最早的一条记录
    func (me *tDeliveryWorker) peek() (bool, *models.QueuedMsg) {
        msg := &models.QueuedMsg{}
        e := database.DB(func(db *sqlx.DB) error {
            rows, err := db.Queryx(
                "select * from delivery_queue where client_id=? order by create_time asc limit 1",
                me.info.ClientID,
            )
            if err != nil {
                return err
            }
    
            if rows.Next() {
                err = rows.StructScan(msg)
                if err != nil {
                    return err
                }
                return nil
    
            } else {
                return gEmptyRowsErr
            }
        })
    
        if e != nil {
            return false, nil
        } else {
            return true, msg
        }
    }
    
    
    // handleUndeliveredMsg: if msg unhandled, then try to deliver it
    func (me *tDeliveryWorker) handleUndeliveredMsg(msg *models.QueuedMsg) {
        err := database.DB(func(db *sqlx.DB) error {
            now := time.Now().UnixNano()
            r,e := db.Exec(
                "update delivery_queue set status_flag=1, update_time=? where id=? and status_flag=0 and update_time=?",
                now,
                msg.ID,
                msg.UpdateTime,
            )
            if e != nil {
                return e
            }
    
            rows, e := r.RowsAffected()
            if e != nil {
                return e
            }
            if rows != 1 {
                return gOneRowsErr
            }
    
            msg.UpdateTime = now
            return nil
        })
    
        if err != nil {
            logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
            return
        }
    
        if me.deliver(msg) {
            me.afterDeliverySuccess(msg)
    
        } else {
            me.afterDeliveryFailed(msg)
        }
    }
    
    // deliver: use http.Post function to delivery msg
    func (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool {
        t := &models.TxMsg{
            GlobalID: msg.GlobalID,
            SubID: msg.SubID,
            Topic: msg.Topic,
            CreateTime: msg.CreateTime,
            Content: msg.Content,
        }
        j,e := json.Marshal(t)
        if e != nil {
            logger.Logf("tDeliveryWorker.deliver, failed json.Marshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
            return false
        }
    
        r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j))
        if e != nil {
            logger.Logf("tDeliveryWorker.deliver, failed http.Post, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
            return false
        }
    
        defer r.Body.Close()
        rep, e := ioutil.ReadAll(r.Body)
        if e != nil {
            logger.Logf("tDeliveryWorker.deliver, failed ioutil.ReadAll, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
            return false
        }
    
        m := &models.OkMsg{}
        e = json.Unmarshal(rep, m)
        if e != nil {
            logger.Logf("tDeliveryWorker.deliver, failed json.Unmarshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
            return false
        }
    
        if m.OK {
            return true
        } else {
            logger.Logf("tDeliveryWorker.deliver, failed OkMsg.OK, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
            return false
        }
    }
    
    // handleDeliveringMsg: if delivery timeout, then retry delivery
    func (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) {
        now := time.Now().UnixNano()
        if msg.UpdateTime + gDeliveryTimeoutNanos > now {
            return
        }
    
        // delivery timeout
        me.afterDeliveryTimeout(msg)
    }
    
    // afterDeliverySuccess: if done, move msg to success queue
    func (me *tDeliveryWorker) afterDeliverySuccess(msg *models.QueuedMsg) {
        err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {
            r,e := db.Exec(
        "delete from delivery_queue where id=? and update_time=? and status_flag=1",
                msg.ID,
                msg.UpdateTime,
            )
            if e != nil {
                return e
            }
    
            rows, e := r.RowsAffected()
            if e != nil {
                return e
            }
            if rows != 1 {
                return gOneRowsErr
            }
    
            r, e = db.Exec(
                "insert into success_queue (msg_id, client_id, create_time) values(?, ?, ?)",
                msg.ID,
                msg.ClientID,
                time.Now().UnixNano(),
            )
            if e != nil {
                return e
            }
    
            rows, e = r.RowsAffected()
            if e != nil {
                return e
            }
            if rows != 1 {
                return gOneRowsErr
            }
    
            return nil
        })
    
        if err != nil {
            logger.Logf("tDeliveryWorker.afterDeliverySuccess, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
        } else {
            logger.Logf("tDeliveryWorker.afterDeliverySuccess, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
        }
    }
    
    
    // afterDeliveryFailed: if failed, do nothing but just log it
    func (me *tDeliveryWorker) afterDeliveryFailed(msg *models.QueuedMsg) {
        logger.Logf("tDeliveryWorker.afterDeliveryFailed, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
    }
    
    // afterDeliveryTimeout: if timeout, then reset status and retry
    func (me *tDeliveryWorker) afterDeliveryTimeout(msg *models.QueuedMsg) {
        err := database.DB(func(db *sqlx.DB) error {
            r,e := db.Exec(
                "update delivery_queue set status_flag=0 where id=? and status_flag=1 and update_time=?",
                msg.ID,
                msg.UpdateTime,
            )
            if e != nil {
                return e
            }
    
            rows,e := r.RowsAffected()
            if e != nil {
                return e
            }
    
            if rows != 1 {
                return gOneRowsErr
            }
    
            return nil
        })
    
        if err != nil {
            logger.Logf("tDeliveryWorker.afterDeliveryTimeout, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
        } else {
            logger.Logf("tDeliveryWorker.afterDeliveryTimeout, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
        }
    }
    
    var gEmptyRowsErr = errors.New("empty rows")
    var gOneRowsErr = errors.New("expecting one row affected")
    var gDeliveryTimeoutNanos = int64(10 * (time.Second / time.Nanosecond))
    

    database.go

    • 数据库表相应的细节调整
      • delivery_queue: 去掉failed_count, 增加update_time时间戳
      • success_queue: 去掉sub_id, 改为client_id, 并增加create_time时间戳
      • failed_queue: 因为不允许失败, 因此删除失败投递表
    package database
    
    import "github.com/jmoiron/sqlx"
    import _ "github.com/mattn/go-sqlite3"
    
    type DBFunc func(db *sqlx.DB) error
    type TXFunc func(db *sqlx.DB, tx *sqlx.Tx) error
    
    func init() {
        // must prepare tables
        err := DB(initDB)
        if err != nil {
            panic(err)
        }
    }
    
    func initDB(db *sqlx.DB) error {
        // 订阅者/消费者: subscriber
        _, e := db.Exec(`create table if not exists subscriber(
        id integer primary key autoincrement,
        client_id varchar(50) unique not null,
        topic varchar(100) not null,
        notify_url varchar(500) not null,
        expire_time integer
    )`)
        if e != nil {
            return e
        }
    
        // 事务消息: tx_msg
        _, e = db.Exec(`create table if not exists tx_msg (
        id integer primary key autoincrement,
        global_id string varchar(50) not null,
        sub_id string varchar(50) unique not null,
        sender_id varchar(50) not null,
        create_time integer not null,
        topic varchar(100) not null,
        content nvarchar(2048) not null
    )`)
        if e != nil {
            return e
        }
    
        // 投递队列: delivery_queue
        _, e = db.Exec(`create table if not exists delivery_queue (
        id integer primary key autoincrement,
        
        client_id varchar(50) not null,
        notify_url varchar(500) not null,
        
        msg_id integer not null,
        global_id string varchar(50) not null,
        sub_id string varchar(50) unique not null,
        sender_id varchar(50) not null,
        create_time integer not null,
        topic varchar(100) not null,
        content nvarchar(2048) not null,
        
        status_flag integer not null,
        update_time integer not null
    )`)
        if e != nil {
            return e
        }
    
        // 成功投递队列: success_queue
        _, e = db.Exec(`create table if not exists success_queue (
        id integer primary key autoincrement,
        msg_id integer not null,
        client_id varchar(50) not null,
        create_time integer not null
    )`)
        if e != nil {
            return e
        }
    
    //  // 投递失败队列: failed_queue
    //  _, e = db.Exec(`create table if not exists failed_queue (
    //  id integer primary key autoincrement,
    //  msg_id integer not null,
    //  client_id varchar(50) not null,
    //    create_time integer not null
    //)`)
    //  if e != nil {
    //      return e
    //  }
    
        return nil
    }
    
    func open() (*sqlx.DB, error) {
        return sqlx.Open("sqlite3", "./mqs.db")
    }
    
    func DB(action DBFunc) error {
        db,err := open()
        if err != nil {
            return err
        }
        defer func() { _ = db.Close() }()
    
        return action(db)
    }
    
    func TX(action TXFunc) error {
        return DB(func(db *sqlx.DB) error {
            tx, err := db.Beginx()
            if err != nil {
                return err
            }
    
            err = action(db, tx)
            if err == nil {
                return tx.Commit()
            } else {
                return tx.Rollback()
            }
        })
    }
    

    (未完待续)

    相关文章

      网友评论

        本文标题:手撸golang GO与微服务 Saga模式之4

        本文链接:https://www.haomeiwen.com/subject/gothcltx.html