美文网首页GO与微服务
手撸golang GO与微服务 ES-CQRS模式之2

手撸golang GO与微服务 ES-CQRS模式之2

作者: 老罗话编程 | 来源:发表于2021-03-23 22:53 被阅读0次

    手撸golang GO与微服务 ES-CQRS模式之2

    缘起

    最近阅读 [Go微服务实战] (刘金亮, 2021.1)
    本系列笔记拟采用golang练习之
    gitee:

    ES-CQRS模式

    ES(Event Sourcing)事件溯源非常好理解,
    指的是将每次的事件都记录下来,
    而不是去记录对象的状态。
    比如新建、修改等都会作为事件记录下来,
    当需要最新的状态时,通过事件的堆叠来计算最新的状态。
    按照事件溯源的模式进行架构设计,
    就是事件驱动架构(Event DrivenArchitecture, EDA)。
    
    命令查询职责分离(CQRS)最早来自Betrand Meyer写的
    Object-OrientedSoftware Construction一书,
    指的是命令查询分离(Command Query Separation,CQS)。
    其基本思想是任何一个对象的方法都可以分为以下两大类:
    ▪ 命令(Command):不返回任何结果(void),但会改变对象的状态。
    ▪ 查询(Query):返回结果,但是不会改变对象的状态,对系统没有副作用。
    CQRS的核心出发点就是把系统分为读和写两部分,从而方便分别进行优化。
    

    目标(Day 2)

    • 根据ES-CQRS模式, 大幅重构Day 1的设计, 并进行单元测试

    设计

    • TodoDTO: 待办事宜数值对象
    • OperationTag: todo写入事件的类型标记
    • TodoEvent: todo写入事件
    • ClassTag: json序列化的类型标记
    • tJSONData: json序列化的数据容器
    • IEventBus: 事件总线接口
    • iTodoEventSerializer: 事件序列化到JSON数据的接口
    • iTodoReader: todo读取接口
    • iTodoWriter: todo写入接口
    • iJSONStore: json文件读写接口
    • ITodoService: todo待办事宜服务接口
    • tEventBus: 事件总线的实现
    • tTodoEventSerializer: 事件序列化到JSON的实现
    • tTodoWriter: 事件写入器的实现, 监听write指令, 并持久化到json存储
    • tMockJSONStore: 虚拟的JSON文件读写实现
    • tTodoReader: 待办事宜读取器, 监听write和load指令, 并计算todo列表的当前状态
    • tMockTodoService: 待办事宜服务的实现

    单元测试

    todo_app_test.go

    package es_cqrs
    
    import (
        td "learning/gooop/es_cqrs/todo_app"
        "testing"
    )
    
    func fnAssertTrue (t *testing.T, b bool, msg string) {
        if !b {
            t.Fatal(msg)
        }
    }
    
    func Test_TodoApp(t *testing.T) {
        t1 := &td.TodoDTO{ 1, "title-1", "content-1" }
        td.MockTodoService.Create(t1)
    
        all := td.MockTodoService.GetAll()
        fnAssertTrue(t, len(all) == 1, "expecting 1 item")
        fnAssertTrue(t, all[0].Title == t1.Title, "expecting " + t1.Title)
        t.Log("pass creating")
    
        t1.Content = "content-1 updated"
        t1.Title = "title-1 updated"
        td.MockTodoService.Update(t1)
        all = td.MockTodoService.GetAll()
        fnAssertTrue(t, len(all) == 1, "expecting 1 item")
        fnAssertTrue(t, all[0].Content == t1.Content, "expecting " + t1.Content)
        t.Log("pass updating")
    
        td.MockTodoService.Delete(t1)
        all = td.MockTodoService.GetAll()
        fnAssertTrue(t, len(all) == 0, "expecting 0 items")
        t.Log("pass deleting")
    }
    

    测试输出

    $ go test -v todo_app_test.go 
    === RUN   Test_TodoApp
    22:38:08.180382833 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.1
    22:38:08.180533659 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}
    22:38:08.180539669 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.2
    22:38:08.180552255 tTodoReader.items: [&{1 title-1 content-1}]
    22:38:08.180557245 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.2
    22:38:08.180560995 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService
        todo_app_test.go:21: pass creating
    22:38:08.180580644 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.1
    22:38:08.180604465 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}, {"Tag":2,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}
    22:38:08.180612665 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.2
    22:38:08.180618512 tTodoReader.items: [&{1 title-1 updated content-1 updated}]
    22:38:08.18062244 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.2
    22:38:08.180626445 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService
        todo_app_test.go:29: pass updating
    22:38:08.180642172 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.1
    22:38:08.180656612 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}, {"Tag":2,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}, {"Tag":3,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}
    22:38:08.180669129 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.2
    22:38:08.180672774 tTodoReader.items: []
    22:38:08.180675952 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.2
    22:38:08.180679309 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService
        todo_app_test.go:34: pass deleting
    --- PASS: Test_TodoApp (0.00s)
    PASS
    ok      command-line-arguments  0.002s
    

    TodoDTO.go

    待办事宜数值对象

    package todo_app
    
    type TodoDTO struct {
        NO      int
        Title   string
        Content string
    }
    
    func (me *TodoDTO) Clone() *TodoDTO {
        return &TodoDTO{
            me.NO, me.Title, me.Content,
        }
    }
    

    OperationTag.go

    todo写入事件的类型标记

    package todo_app
    
    
    type OperationTag int
    
    const OPCreated OperationTag = 1
    const OPUpdated OperationTag = 2
    const OPDeleted OperationTag = 3
    

    TodoEvent.go

    todo写入事件

    package todo_app
    
    type TodoEvent struct {
        Tag OperationTag
        Data *TodoDTO
    }
    

    ClassTag.go

    json序列化的类型标记

    package todo_app
    
    type ClassTag int
    
    const TodoEventClass ClassTag = 1
    
    

    tJSONData.go

    json序列化的数据容器

    package todo_app
    
    import "encoding/json"
    
    type tJSONData struct {
        Tag     ClassTag
        Content []byte
    }
    
    
    
    func (me *tJSONData) Set(tag ClassTag, it interface{}) error {
        me.Tag = tag
    
        j, e := json.Marshal(it)
        if e != nil {
            return e
        }
        me.Content = j
        return nil
    }
    
    func (me *tJSONData) Get(it interface{}) error {
        return json.Unmarshal(me.Content, it)
    }
    

    IEventBus.go

    事件总线接口

    package todo_app
    
    type EventHandleFunc func(e string, args interface{})
    type EventHandler struct {
        ID      string
        Handler EventHandleFunc
    }
    
    type IEventBus interface {
        Pub(e string, args interface{})
        Sub(e string, id string, handleFunc EventHandleFunc)
        Unsub(e string, id string)
    }
    
    const EventWriteTodoCmd = "todo.write.cmd"
    const EventReadTodoCmd = "todo.read.cmd"
    const EventReadTodoRet = "todo.read.ret"
    const EventLoadTodoCmd = "todo.load.cmd"
    

    iTodoEventSerializer.go

    事件序列化到JSON数据的接口

    package todo_app
    
    type iTodoEventSerializer interface {
        Serialize(it *TodoEvent) *tJSONData
    }
    
    

    iTodoReader.go

    todo读取接口

    package todo_app
    
    type iTodoReader interface {
        All() []*TodoDTO
        HandleTodoEvent(e *TodoEvent)
    }
    

    iTodoWriter.go

    todo写入接口

    package todo_app
    
    type iTodoWriter interface {
        HandleTodoEvent(e *TodoEvent)
    }
    
    

    iJSONStore.go

    json文件读写接口

    package todo_app
    
    type iJSONStore interface {
        Load()
        Append(it *tJSONData)
    }
    
    

    ITodoService.go

    todo待办事宜服务接口

    package todo_app
    
    type ITodoService interface {
        Create(it *TodoDTO)
        Update(it *TodoDTO)
        Delete(it *TodoDTO)
    
        GetAll() []*TodoDTO
    }
    

    tEventBus.go

    事件总线的实现

    package todo_app
    
    import (
        "learning/gooop/saga/mqs/logger"
        "sync"
    )
    
    type tEventBus struct {
        rwmutex *sync.RWMutex
        items   map[string][]*EventHandler
    }
    
    func newEventHandler(id string, handleFunc EventHandleFunc) *EventHandler {
        return &EventHandler{
            id, handleFunc,
        }
    }
    
    func newEventBus() IEventBus {
        it := new(tEventBus)
        it.init()
        return it
    }
    
    func (me *tEventBus) init() {
        me.rwmutex = new(sync.RWMutex)
        me.items = make(map[string][]*EventHandler)
    }
    
    func (me *tEventBus) Pub(e string, args interface{}) {
        me.rwmutex.RLock()
        defer me.rwmutex.RUnlock()
    
        handlers, ok := me.items[e]
        if ok {
            for _, it := range handlers {
                logger.Logf("eventbus.Pub, event=%s, handler=%s", e, it.ID)
                it.Handler(e, args)
            }
        }
    }
    
    func (me *tEventBus) Sub(e string, id string, handleFunc EventHandleFunc) {
        me.rwmutex.Lock()
        defer me.rwmutex.Unlock()
    
        handler := newEventHandler(id, handleFunc)
        handlers, ok := me.items[e]
    
        if ok {
            me.items[e] = append(handlers, handler)
        } else {
            me.items[e] = []*EventHandler{handler}
        }
    }
    
    func (me *tEventBus) Unsub(e string, id string) {
        me.rwmutex.Lock()
        defer me.rwmutex.Unlock()
    
        handlers, ok := me.items[e]
        if ok {
            for i, it := range handlers {
                if it.ID == id {
                    lastI := len(handlers) - 1
                    if i != lastI {
                        handlers[i], handlers[lastI] = handlers[lastI], handlers[i]
                    }
                    me.items[e] = handlers[:lastI]
                }
            }
        }
    }
    
    var GlobalEventBus = newEventBus()
    
    

    tTodoEventSerializer.go

    事件序列化到JSON的实现

    package todo_app
    
    type tTodoEventSerializer struct {
    }
    
    func newEventSeiralizer() iTodoEventSerializer {
        it := new(tTodoEventSerializer)
        return it
    }
    
    func (me *tTodoEventSerializer) Serialize(e *TodoEvent) *tJSONData {
        it := new(tJSONData)
        err := it.Set(TodoEventClass, e)
        if err != nil {
            return nil
        }
        return it
    }
    
    var gDefaultEventSerializer = newEventSeiralizer()
    
    

    tTodoWriter.go

    事件写入器的实现, 监听write指令, 并持久化到json存储

    package todo_app
    
    import (
        "fmt"
        "sync/atomic"
    )
    
    type tTodoWriter struct {
        id string
    }
    
    func newTodoWriter() iTodoWriter {
        it := new(tTodoWriter)
        it.init()
        return it
    }
    
    func (me *tTodoWriter) init() {
        me.id = fmt.Sprintf("tTodoWriter.%d", atomic.AddInt32(&gWriterCounter, 1))
    
        GlobalEventBus.Sub(EventWriteTodoCmd, me.id, me.handleWriteTodoCmd)
    }
    
    func (me *tTodoWriter) handleWriteTodoCmd(e string, args interface{}) {
        switch e {
        case EventWriteTodoCmd:
            if it, ok := args.(*TodoEvent); ok {
                me.HandleTodoEvent(it)
            }
            break
        }
    }
    
    func (me *tTodoWriter) HandleTodoEvent(e *TodoEvent) {
        j := gDefaultEventSerializer.Serialize(e)
        if j != nil {
            MockJSONStore.Append(j)
        }
    }
    
    var gWriterCounter int32 = 0
    

    tMockJSONStore.go

    虚拟的JSON文件读写实现

    package todo_app
    
    import (
        "fmt"
        "learning/gooop/saga/mqs/logger"
        "strings"
        "sync"
    )
    
    type tMockJSONStore struct {
        rwmutex *sync.RWMutex
        once    sync.Once
        items   []*tJSONData
    }
    
    func newMockJSONStore() iJSONStore {
        it := new(tMockJSONStore)
        it.init()
        return it
    }
    
    func (me *tMockJSONStore) init() {
        me.rwmutex = new(sync.RWMutex)
        me.items = []*tJSONData{}
    }
    
    func (me *tMockJSONStore) Load() {
        me.once.Do(func() {
            me.rwmutex.RLock()
            defer me.rwmutex.RUnlock()
    
            for _, it := range me.items {
                switch it.Tag {
                case TodoEventClass:
                    v := new(TodoEvent)
                    e := it.Get(v)
                    if e == nil {
                        GlobalEventBus.Pub(EventLoadTodoCmd, e)
                    }
                    break
                }
            }
        })
    }
    
    func (me *tMockJSONStore) Append(it *tJSONData) {
        me.rwmutex.Lock()
        defer me.rwmutex.Unlock()
    
        me.items = append(me.items, it)
    
        lines := []string{}
        for _,it := range me.items {
            lines = append(lines, fmt.Sprintf("%s", string(it.Content)))
        }
        logger.Logf("tMockJSONStore.items: %s", strings.Join(lines, ", "))
    }
    
    var MockJSONStore = newMockJSONStore()
    
    

    tTodoReader.go

    待办事宜读取器, 监听write和load指令, 并计算todo列表的当前状态

    package todo_app
    
    import (
        "fmt"
        "learning/gooop/saga/mqs/logger"
        "strings"
        "sync"
        "sync/atomic"
    )
    
    type tTodoReader struct {
        id string
        rwmutex *sync.RWMutex
        items []*TodoDTO
    }
    
    func newTodoReader() iTodoReader {
        it := new(tTodoReader)
        it.init()
        return it
    }
    
    func (me *tTodoReader) init() {
        id := fmt.Sprintf("tTodoReader.%d", atomic.AddInt32(&gReaderCounter, 1))
        me.id = id
        me.rwmutex = new(sync.RWMutex)
    
        GlobalEventBus.Sub(EventWriteTodoCmd, me.id, me.handleEvent)
        GlobalEventBus.Sub(EventLoadTodoCmd, me.id, me.handleEvent)
        GlobalEventBus.Sub(EventReadTodoCmd, me.id, me.handleEvent)
    }
    
    
    func (me *tTodoReader) handleEvent(e string, args interface{}) {
        switch e {
        case EventWriteTodoCmd:
            fallthrough
        case EventLoadTodoCmd:
            if v,ok := args.(*TodoEvent);ok {
                me.HandleTodoEvent(v)
            }
            break
    
        case EventReadTodoCmd:
            me.handleReadTodoList()
        }
    }
    
    func (me *tTodoReader) handleReadTodoList() {
        GlobalEventBus.Pub(EventReadTodoRet, me.All())
    }
    
    func (me *tTodoReader) All() []*TodoDTO {
        me.rwmutex.RLock()
        defer me.rwmutex.RUnlock()
    
        lst := make([]*TodoDTO, len(me.items))
        for i,it := range me.items {
            lst[i] = it
        }
        return lst
    }
    
    func (me *tTodoReader) HandleTodoEvent(e *TodoEvent) {
        me.rwmutex.Lock()
        defer me.rwmutex.Unlock()
    
        switch e.Tag {
        case OPCreated:
            me.items = append(me.items, e.Data.Clone())
            break
    
        case OPUpdated:
            for i,it := range me.items {
                if it.NO == e.Data.NO {
                    me.items[i] = e.Data.Clone()
                    break
                }
            }
            break
    
        case OPDeleted:
            for i,it := range me.items {
                if it.NO == e.Data.NO {
                    lastI := len(me.items) - 1
                    if i == lastI {
                        me.items[i] = nil
                    } else {
                        me.items[i], me.items[lastI] = me.items[lastI], nil
                    }
    
                    me.items = me.items[:lastI]
                    break
                }
            }
            break
        }
    
        lines := []string{}
        for _,it := range me.items {
            lines = append(lines, fmt.Sprintf("%v", it))
        }
        logger.Logf("tTodoReader.items: [%s]", strings.Join(lines, ", "))
    }
    
    var gReaderCounter int32 = 1
    

    tMockTodoService.go

    待办事宜服务的实现, 提供todo项的CRUD

    package todo_app
    
    type tMockTodoService struct {
        items []*TodoDTO
        writer iTodoWriter
        reader iTodoReader
    }
    
    func newMockTodoService() ITodoService {
        it := new(tMockTodoService)
        it.init()
        return it
    }
    
    func (me *tMockTodoService) init() {
        me.writer = newTodoWriter()
        me.reader = newTodoReader()
    
        GlobalEventBus.Sub(EventReadTodoRet, "tMockTodoService", me.handleReadTodoRet)
    }
    
    func (me *tMockTodoService) handleReadTodoRet(e string, args interface{}) {
        switch e {
        case EventReadTodoRet:
            if it,ok := args.([]*TodoDTO);ok {
                me.items = it
            }
            break
        }
    }
    
    func (me *tMockTodoService) Create(it *TodoDTO) {
        GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPCreated, it.Clone() })
    }
    
    func (me *tMockTodoService) Update(it *TodoDTO) {
        GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPUpdated, it.Clone() })
    }
    
    func (me *tMockTodoService) Delete(it *TodoDTO) {
        GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPDeleted, it.Clone() })
    }
    
    func (me *tMockTodoService) GetAll() []*TodoDTO {
        me.items = nil
        GlobalEventBus.Pub(EventReadTodoCmd, nil)
    
        lst := me.items
        me.items = nil
        return lst
    }
    
    var MockTodoService = newMockTodoService()
    
    

    (ES-CQRS end)

    相关文章

      网友评论

        本文标题:手撸golang GO与微服务 ES-CQRS模式之2

        本文链接:https://www.haomeiwen.com/subject/lmwqhltx.html