美文网首页GO与微服务
手撸golang GO与微服务 Saga模式之1

手撸golang GO与微服务 Saga模式之1

作者: 老罗话编程 | 来源:发表于2021-03-11 12:02 被阅读0次

    缘起

    最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
    本系列笔记拟采用golang练习之

    Saga模式

    • saga模式将分布式长事务切分为一系列独立短事务
    • 每个短事务是可通过补偿动作进行撤销的
    • 事务动作和补偿动作都是幂等的, 允许重复执行而不会有副作用
    Saga由一系列的子事务“Ti”组成,
    每个Ti都有对应的补偿“Ci”,
    当Ti出现问题时Ci用于处理Ti执行带来的问题。
    
    可以通过下面的两个公式理解Saga模式。
    T = T1 T2 … Tn
    T = TCT
    
    Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,
    而应该将事务切分为一组按序依次提交的短事务,
    Saga模式满足ACD(原子性、一致性、持久性)特征。
    
    摘自 <<Go微服务实战>> 刘金亮, 2021.1
    

    目标

    • 为实现saga模式的分布式事务, 先模拟实现一个简单的pub/sub消息服务
    • 使用sqlx + sqlite3持久化消息, 使用gin作为http框架, 使用toml配置文件

    代码结构

    $ tree saga
    saga
    └── mqs
        ├── cmd
        │   └── boot.go
        ├── config
        │   └── config.go
        ├── database
        │   └── database.go
        ├── handlers
        │   ├── ping.go
        │   └── subscribe.go
        ├── logger
        │   └── logger.go
        └── routers
            └── routers.go
    

    设计

    • config: 读取并解析toml配置文件
    • database: 封装sqlx + sqlite3
    • handlers/ping: 预留http服务保活探针接口
    • handlers/subscribe: 注册一个消息订阅者. 消息订阅者包含订阅者ID, 主题和回调地址(以便推送消息)
    • routers: 注册gin路由

    config.go

    读取并解析toml配置文件

    package config
    
    import  "github.com/BurntSushi/toml"
    
    type tomlConfig struct {
        MQS tServiceConfig
    }
    
    type tServiceConfig struct {
        Port int
        LogDir string
    }
    
    var gServiceConfig = &tServiceConfig{}
    
    func init() {
        cfg := &tomlConfig{}
        if _,err := toml.DecodeFile("./mqs.toml", cfg);err != nil {
            panic(err)
        }
        *gServiceConfig = cfg.MQS
    }
    
    func GetPort() int {
        return gServiceConfig.Port
    }
    
    func GetLogDir() string {
        return gServiceConfig.LogDir
    }
    

    database.go

    封装sqlx + sqlite3

    package database
    
    import "github.com/jmoiron/sqlx"
    import _ "github.com/mattn/go-sqlite3"
    
    type DBFunc func(db *sqlx.DB) error
    type TXFunc func(db *sqlx.DB, tx *sqlx.Tx) error
    
    func init() {
        // must prepare tables
        err := DB(func(db *sqlx.DB) error {
            // table: sub_info
            _,e := db.Exec(`
            create table if not exists sub_info(
                id integer primary key autoincrement,
                client_id varchar(50) unique not null,
                topic varchar(100) not null,
                callback_url varchar(500) not null
            )`)
            return e
        })
        if err != nil {
            panic(err)
        }
    }
    
    func open() (*sqlx.DB, error) {
        return sqlx.Open("sqlite3", "./mqs.db")
    }
    
    func DB(action DBFunc) error {
        db,err := open()
        if err != nil {
            return err
        }
        defer func() { _ = db.Close() }()
    
        return action(db)
    }
    
    func TX(action TXFunc) error {
        return DB(func(db *sqlx.DB) error {
            tx, err := db.Beginx()
            if err != nil {
                return err
            }
    
            err = action(db, tx)
            if err == nil {
                return tx.Commit()
            } else {
                return tx.Rollback()
            }
        })
    }
    

    ping.go

    预留http服务保活探针接口

    package handlers
    
    import (
        "github.com/gin-gonic/gin"
        "net/http"
        "time"
    )
    
    func Ping(c *gin.Context) {
        c.JSON(http.StatusOK, gin.H{ "ok": true, "time": time.Now().Format(time.RFC3339)})
    }
    

    subscribe.go

    注册一个消息订阅者. 消息订阅者包含订阅者ID, 主题和回调地址(以便推送消息)

    package handlers
    
    import (
        "github.com/gin-gonic/gin"
        "github.com/gin-gonic/gin/binding"
        "github.com/jmoiron/sqlx"
        "learning/gooop/saga/mqs/database"
        "net/http"
    )
    
    type tSubMsg struct {
        ClientID string
        Topic string
        CallbackUrl string
    }
    
    func Subscribe(c *gin.Context) {
        msg := &tSubMsg{}
        if err := c.ShouldBindBodyWith(&msg, binding.JSON); err != nil {
            c.AbortWithStatusJSON(
                http.StatusInternalServerError,
                gin.H{"error": err.Error()})
            return
        }
        
        err := database.DB(func(db *sqlx.DB) error {
            _,e := db.Exec(
                "replace into sub_info(client_id, topic, callback_url) values(?, ?, ?)",
                msg.ClientID,
                msg.Topic,
                msg.CallbackUrl)
            return e
        })
    
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
        } else {
            c.JSON(http.StatusOK, gin.H { "ok": true })
        }
    }
    

    routers.go

    注册gin路由

    package routers
    
    import (
        "github.com/gin-gonic/gin"
        "learning/gooop/saga/mqs/handlers"
    )
    
    func RegisterRouters() *gin.Engine {
        r := gin.Default()
        r.Use(gin.Logger())
        r.GET("/ping", handlers.Ping)
        r.POST("/subscribe", handlers.Subscribe)
        return r
    }
    

    (未完待续)

    相关文章

      网友评论

        本文标题:手撸golang GO与微服务 Saga模式之1

        本文链接:https://www.haomeiwen.com/subject/hzbnqltx.html