美文网首页GO与微服务
手撸golang GO与微服务 Saga模式之2

手撸golang GO与微服务 Saga模式之2

作者: 老罗话编程 | 来源:发表于2021-03-12 08:15 被阅读0次

    缘起

    最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
    本系列笔记拟采用golang练习之

    Saga模式

    • saga模式将分布式长事务切分为一系列独立短事务
    • 每个短事务是可通过补偿动作进行撤销的
    • 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用
    Saga由一系列的子事务“Ti”组成,
    每个Ti都有对应的补偿“Ci”,
    当Ti出现问题时Ci用于处理Ti执行带来的问题。
    
    可以通过下面的两个公式理解Saga模式。
    T = T1 T2 … Tn
    T = TCT
    
    Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,
    而应该将事务切分为一组按序依次提交的短事务,
    Saga模式满足ACD(原子性、一致性、持久性)特征。
    
    摘自 <<Go微服务实战>> 刘金亮, 2021.1
    

    目标

    • 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务
    • 事务消息队列服务的功能性要求
      • 消息不会丢失: 消息的持久化
      • 消息的唯一性: 要求每个消息有全局ID和子事务ID
      • 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试

    子目标(Day 2)

    • 消息的设计
    • 消息的持久化
    • 投递队列的持久化
    • 投递状态的持久化

    设计

    • TxMsg: 事务消息模型
    • database: 添加事务消息表, 消息投递表, 成功投递表, 失败投递表
    • publish: 消息发布api
    • routers: 添加/publish路由

    TxMsg.go

    事务消息模型

    package models
    
    // 事务消息体
    type TxMsg struct {
        // 全局事务ID
        GlobalID string
    
        // 子事务ID
        SubID string
    
        // 发送者ID
        SenderID string
    
        // 时间戳, 使用time.Now().UnixNano()
        CreateTime int64
    
        // 主题, 即消息类型
        Topic string
    
        // 消息内容, 一般是json
        Content string
    }
    

    database.go

    添加事务消息表, 消息投递表, 成功投递表, 失败投递表

    package database
    
    import "github.com/jmoiron/sqlx"
    import _ "github.com/mattn/go-sqlite3"
    
    type DBFunc func(db *sqlx.DB) error
    type TXFunc func(db *sqlx.DB, tx *sqlx.Tx) error
    
    func init() {
        // must prepare tables
        err := DB(initDB)
        if err != nil {
            panic(err)
        }
    }
    
    func initDB(db *sqlx.DB) error {
        // 订阅者/消费者: subscriber
        _, e := db.Exec(`create table if not exists subscriber(
        id integer primary key autoincrement,
        client_id varchar(50) unique not null,
        topic varchar(100) not null,
        notify_url varchar(500) not null,
        expire_time integer
    )`)
        if e != nil {
            return e
        }
    
        // 事务消息: tx_msg
        _, e = db.Exec(`create table if not exists tx_msg (
        id integer primary key autoincrement,
        global_id string varchar(50) not null,
        sub_id string varchar(50) unique not null,
        sender_id varchar(50) not null,
        create_time integer not null,
        topic varchar(100) not null,
        content nvarchar(2048) not null
    )`)
        if e != nil {
            return e
        }
    
        // 投递队列: delivery_queue
        _, e = db.Exec(`create table if not exists delivery_queue (
        id integer primary key autoincrement,
        msg_id integer not null,
        sub_id integer not null,
        status_flag integer not null,
        failed_count integer not null
    )`)
        if e != nil {
            return e
        }
    
        // 成功投递队列: success_queue
        _, e = db.Exec(`create table if not exists success_queue (
        id integer primary key autoincrement,
        msg_id integer not null,
        sub_id integer not null
    )`)
        if e != nil {
            return e
        }
    
        // 投递失败队列: failed_queue
        _, e = db.Exec(`create table if not exists failed_queue (
        id integer primary key autoincrement,
        msg_id integer not null,
        sub_id integer not null
    )`)
        if e != nil {
            return e
        }
    
        return nil
    }
    
    func open() (*sqlx.DB, error) {
        return sqlx.Open("sqlite3", "./mqs.db")
    }
    
    func DB(action DBFunc) error {
        db,err := open()
        if err != nil {
            return err
        }
        defer func() { _ = db.Close() }()
    
        return action(db)
    }
    
    func TX(action TXFunc) error {
        return DB(func(db *sqlx.DB) error {
            tx, err := db.Beginx()
            if err != nil {
                return err
            }
    
            err = action(db, tx)
            if err == nil {
                return tx.Commit()
            } else {
                return tx.Rollback()
            }
        })
    }
    

    publish.go

    消息发布api

    package handlers
    
    import (
        "github.com/gin-gonic/gin"
        "github.com/gin-gonic/gin/binding"
        "github.com/jmoiron/sqlx"
        "learning/gooop/saga/mqs/database"
        "learning/gooop/saga/mqs/logger"
        "learning/gooop/saga/mqs/models"
        "net/http"
        "time"
    )
    
    func Publish(c *gin.Context) {
        // parse request
        msg := &models.TxMsg{}
        if err := c.ShouldBindBodyWith(&msg, binding.JSON); err != nil {
            c.AbortWithStatusJSON(
                http.StatusInternalServerError,
                gin.H{ "ok": false, "error": err.Error()})
            return
        }
    
        // fixme: validate msg
    
        // save to db
        ids := []int64{0}
        err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {
            id, e := saveTxMsg(db, tx, msg)
            if e != nil {
                return e
            }
    
            ids[0] = id
            return nil
        })
    
        if ids[0] > 0 {
            newMsgId := ids[0]
            logger.Logf("publish new msg: %d", newMsgId)
    
            // todo: 新增消息, 开始投送
        }
    
        // reply
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{ "ok": false, "error": err.Error()})
        } else {
            c.JSON(http.StatusOK, gin.H { "ok": true })
        }
    }
    
    
    func saveTxMsg(db *sqlx.DB, tx *sqlx.Tx, msg *models.TxMsg) (int64,error) {
        // insert tx_msg
        r, e := db.Exec(
            `replace into tx_msg(global_id, sub_id, sender_id, create_time, topic, content) values(?,?,?,?,?,?)`,
            msg.GlobalID, msg.SubID, msg.SenderID, msg.CreateTime, msg.Topic, msg.Content,
        )
        if e != nil {
            return 0,e
        }
    
        // get last insert id
        id,e := r.LastInsertId()
        if e != nil {
            return 0,e
        }
    
        if id > 0 {
            // copy to delivery queue
            now := time.Now().UnixNano()
            r, e = db.Exec(`
    insert into delivery_queue(msg_id, sub_id, status_flag, failed_count) 
    select
        ?, s.id, 0, 0 
    from
        sub_info s
    where
        s.expire_time>?
    `, id, now)
            if e != nil {
                return 0,e
            }
        }
    
        return id, nil
    }
    

    routers.go

    添加/publish路由

    package routers
    
    import (
        "github.com/gin-gonic/gin"
        "learning/gooop/saga/mqs/handlers"
    )
    
    func RegisterRouters() *gin.Engine {
        r := gin.Default()
        r.Use(gin.Logger())
    
        r.GET("/ping", handlers.Ping)
        r.POST("/subscribe", handlers.Subscribe)
        r.POST("/publish", handlers.Publish)
    
        return r
    }
    

    (未完待续)

    相关文章

      网友评论

        本文标题:手撸golang GO与微服务 Saga模式之2

        本文链接:https://www.haomeiwen.com/subject/gdvaqltx.html