美文网首页GO与微服务
手撸golang GO与微服务 Saga模式之7

手撸golang GO与微服务 Saga模式之7

作者: 老罗话编程 | 来源:发表于2021-03-17 10:52 被阅读0次

    缘起

    最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
    本系列笔记拟采用golang练习之

    Saga模式

    • saga模式将分布式长事务切分为一系列独立短事务
    • 每个短事务是可通过补偿动作进行撤销的
    • 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用
    Saga由一系列的子事务“Ti”组成,
    每个Ti都有对应的补偿“Ci”,
    当Ti出现问题时Ci用于处理Ti执行带来的问题。
    
    可以通过下面的两个公式理解Saga模式。
    T = T1 T2 … Tn
    T = TCT
    
    Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,
    而应该将事务切分为一组按序依次提交的短事务,
    Saga模式满足ACD(原子性、一致性、持久性)特征。
    
    摘自 <<Go微服务实战>> 刘金亮, 2021.1
    

    目标

    • 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务
    • 事务消息队列服务的功能性要求
      • 消息不会丢失: 消息的持久化
      • 消息的唯一性: 要求每个消息有全局ID和子事务ID
      • 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试

    子目标(Day 7)

    • MQS已基本可用, 现在实现一个模拟的订单微服务, 并与MQ联动
      • 长事务: 订单创建后, 联动库存服务, 扣减库存
      • 补偿动作
        • 如果扣库成功, 更新订单状态为已出库(实际系统中, 可能还涉及物流发货等复杂流程)
        • 否则(库存不足), 更新订单状态为出库失败(实际系统中, 可能还涉及退款和通知客户等复杂流程)
    • 流程
      • 创建订单后, 向MQ发布[销售订单.创建]消息
      • 订阅MQ的[销售订单.出库.成功], [销售订单.出库.失败]消息
      • 接收到MQ的出库消息后, 更新订单状态

    设计

    • ISaleOrderService: 订单服务接口
    • SaleOrder: 销售订单实体
    • tSaleOrderService: 模拟订单服务, 实现ISaleOrderService接口
    • NotifyStockOutbound: 接收库存服务的扣库结果消息

    ISaleOrderService.go

    订单服务接口

    package order
    
    // ISaleOrderService to manage sale order creation and modification
    type ISaleOrderService interface {
        // get order info
        Get(orderID string) *SaleOrder
    
        // create new order
        Create(it *SaleOrder) error
    
        // update order status
        Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder)
    }
    

    SaleOrder.go

    销售订单实体

    package order
    
    type SaleOrder struct {
        OrderID string
        CustomerID string
        ProductID string
        Quantity int
        Price float64
        Amount float64
        CreateTime int64
        StatusFlag int32
    }
    
    const StatusNotDelivered int32 = 0
    const StatusStockOutboundDone int32 = 1
    const StatusStockOutboundFailed int32 = 2
    const StatusMQServiceFailed int32 = 3
    

    tSaleOrderService.go

    模拟订单服务, 实现ISaleOrderService接口

    package order
    
    import (
        "bytes"
        "encoding/json"
        "errors"
        "io/ioutil"
        "learning/gooop/saga/mqs/logger"
        "learning/gooop/saga/mqs/models"
        "net/http"
        "sync"
        "sync/atomic"
        "time"
    )
    
    type tSaleOrderService struct {
        rwmutex *sync.RWMutex
        orders map[string]*SaleOrder
        bMQReady bool
        publishQueue chan *SaleOrder
    }
    
    func newSaleOrderService() ISaleOrderService {
        it := new(tSaleOrderService)
        it.init()
        return it
    }
    
    func (me *tSaleOrderService) init() {
        me.rwmutex = new(sync.RWMutex)
        me.orders = make(map[string]*SaleOrder)
        me.bMQReady = false
        me.publishQueue = make(chan *SaleOrder, gMQMaxQueuedMsg)
    
        go me.beginSubscribeMQ()
        go me.beginPublishMQ()
    }
    
    func (me *tSaleOrderService) beginSubscribeMQ() {
        expireDuration := int64(1 * time.Hour)
        subscribeDuration := 20 * time.Minute
        pauseDuration := 3*time.Second
        lastSubscribeTime := int64(0)
    
        for {
            now := time.Now().UnixNano()
            if now - lastSubscribeTime >= int64(subscribeDuration) {
                expireTime := now + expireDuration
                err := fnSubscribeMQ(expireTime)
    
                if err != nil {
                    me.bMQReady = false
                    logger.Logf("tSaleOrderService.beginSubscribeMQ, failed, err=%v", err)
    
                } else {
                    lastSubscribeTime = now
                    me.bMQReady = true
                    logger.Logf("tSaleOrderService.beginSubscribeMQ, done")
                }
            }
            time.Sleep(pauseDuration)
        }
    }
    
    func fnSubscribeMQ(expireTime int64) error {
        msg := &models.SubscribeMsg{
            ClientID: gMQClientID,
            Topic: gMQSubscribeTopic,
            NotifyUrl: gMQServerURL + PathOfNotifyStockOutbound,
            ExpireTime: expireTime,
        }
        url := gMQServerURL + "/subscribe"
        return fnPost(msg, url)
    }
    
    
    func fnPost(msg interface{}, url string) error {
        body,_ := json.Marshal(msg)
        rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))
        if err != nil {
            return err
        }
    
        defer rsp.Body.Close()
        j, err := ioutil.ReadAll(rsp.Body)
        if err != nil {
            return err
        }
        ok := &models.OkMsg{}
        err = json.Unmarshal(j, ok)
        if err != nil {
            return err
        }
    
        if !ok.OK {
            return gMQReplyFalse
        }
    
        return nil
    }
    
    
    func (me *tSaleOrderService) beginPublishMQ() {
        for {
            select {
            case msg := <- me.publishQueue :
                me.publishMQ(msg)
                break
            }
        }
    }
    
    
    func (me *tSaleOrderService) Get(orderID string) *SaleOrder {
        me.rwmutex.RLock()
        defer me.rwmutex.RUnlock()
    
        it,ok := me.orders[orderID]
        if ok {
            return it
        } else {
            return nil
        }
    }
    
    func (me *tSaleOrderService) Create(it *SaleOrder) error {
        me.rwmutex.Lock()
        defer me.rwmutex.Unlock()
    
        if len(me.publishQueue) >= gMQMaxQueuedMsg {
            return gMQNotAvailableError
        }
        me.orders[it.OrderID] = it
        me.publishQueue <- it
    
        return nil
    }
    
    
    func (me *tSaleOrderService) publishMQ(it *SaleOrder) {
        url := gMQServerURL + "/publish"
    
        j,_ := json.Marshal(it)
        msg := &models.TxMsg{
            GlobalID: it.OrderID,
            SubID: it.OrderID,
            SenderID: gMQClientID,
            Topic: gMQPublishTopic,
            CreateTime: it.CreateTime,
            Content: string(j),
        }
    
        for i := 0;i < gMQMaxPublishRetry;i++ {
            err := fnPost(msg, url)
            if err != nil {
                logger.Logf("tSaleOrderService.publishMQ, failed, err=%v, order=%v", err, it)
                time.Sleep(gMQPublishInterval)
    
            } else {
                logger.Logf("tSaleOrderService.publishMQ, done, order=%v", it)
                return
            }
        }
    
        // publish failed
        logger.Logf("tSaleOrderService.publishMQ, failed max retries, order=%v", it)
        _,_ = me.Update(it.OrderID, StatusNotDelivered, StatusMQServiceFailed)
    }
    
    
    func (me *tSaleOrderService) Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder) {
        me.rwmutex.RLock()
        defer me.rwmutex.RUnlock()
    
        it, ok := me.orders[orderID]
        if !ok {
            return gNotFoundError, nil
        }
    
        if !atomic.CompareAndSwapInt32(&it.StatusFlag, oldStatusFlag, newStatusFlag) {
            return gStatusChangedError, it
        }
    
        it.StatusFlag = newStatusFlag
        return nil, it
    }
    
    var gMQReplyFalse = errors.New("mq reply false")
    var gMQNotAvailableError = errors.New("mq not ready")
    var gNotFoundError = errors.New("order not found")
    var gStatusChangedError = errors.New("status changed")
    var gMQMaxPublishRetry = 3
    var gMQPublishInterval = 1*time.Second
    var gMQSubscribeTopic = "sale-order.stock.outbound"
    var gMQPublishTopic = "sale-order.created"
    var gMQClientID = "sale-order-service"
    var gMQServerURL = "http://localhost:333"
    var gMQMaxQueuedMsg = 1024
    
    var SaleOrderService = newSaleOrderService()
    

    NotifyStockOutbound.go

    接收库存服务的扣库结果消息

    package order
    
    import (
        "encoding/json"
        "github.com/gin-gonic/gin"
        "io/ioutil"
        "learning/gooop/saga/mqs/logger"
        "learning/gooop/saga/mqs/models"
        "net/http"
    )
    
    func NotifyStockOutbound(c *gin.Context) {
        body := c.Request.Body
        defer body.Close()
    
        j, e := ioutil.ReadAll(body)
        if e != nil {
            logger.Logf("order.NotifyStockOutbound, failed ioutil.ReadAll")
            c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
            return
        }
    
        msg := &models.TxMsg{}
        e = json.Unmarshal(j, msg)
        if e != nil {
            logger.Logf("order.NotifyStockOutbound, failed json.Unmarshal")
            c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
            return
        }
    
        orderID := msg.GlobalID
        succeeded := msg.Content == "1"
        logger.Logf("order.NotifyStockOutbound, orderID=%v, succeeded=%s", orderID, succeeded)
    
        var newStatusFlag int32
        if succeeded {
            newStatusFlag = StatusStockOutboundDone
        } else {
            newStatusFlag = StatusStockOutboundFailed
        }
    
        err, order := SaleOrderService.Update(orderID, StatusNotDelivered, newStatusFlag)
        if err != nil {
            logger.Logf("order.NotifyStockOutbound, failed SaleOrderService.Update, err=%v, order=%v", err, order)
        }
    
        c.JSON(http.StatusOK, gin.H{ "ok": true })
    }
    
    var PathOfNotifyStockOutbound = "/notify/sale-order.stock.outbound"
    

    (未完待续)

    相关文章

      网友评论

        本文标题:手撸golang GO与微服务 Saga模式之7

        本文链接:https://www.haomeiwen.com/subject/grvscltx.html