美文网首页Golang语言社区
领域事件在微服务内的一个应用案例

领域事件在微服务内的一个应用案例

作者: _张晓龙_ | 来源:发表于2018-02-04 13:33 被阅读168次

    领域事件

    Evans 在《领域驱动设计》这本书中并没有给出领域事件的正式定义,这种模式是在该书出版之后才提出来的:

    领域专家所关心的发生在领域中的一些事件
    将领域中发生的活动建模成一系列的离散事件。每个事件都用领域对象来表示......领域事件是领域模型的组成部分,表示领域中发生的事情[Evans, Ref, p20]

    events.png

    当团队成员对领域事件达成一致之后,领域事件便是通用语言的正式组成部分了。当领域事件到达目的地之后,无论目的地是本地 BC(Bounded Context,限界上下文) 还是远程 BC,我们都将领域事件用于维护数据的一致性。
    聚合有多个原则,其中的一个原则是在单个事务中,只允许对一个聚合实例进行修改,由此产生的其他修改都必须在该事务中完成。因此,本地 BC 的一个聚合实例和其他聚合实例的数据同步便可以通过领域事件来实现。另外,领域事件还可以用于本地 BC 和远程 BC 的数据同步,这时 BC 间的数据同步不再是强一致性,而是最终一致性。

    下图向我们展示了领域事件的产生、存储、分发和使用。领域事件既可以由本地的 BC 消费,也可以由远程的 BC 消费:


    domain-event.png

    在微服务的架构实践中,人们大量地借用了 DDD 中的概念和技术,比如一个微服务应该对应 DDD 中的一个 BC,在微服务设计中应该首先识别出 DDD 中的 AR(Aggregate Root,聚合根),等等。本文的关注点是领域事件在 BC 内的应用,即领域事件在微服务内的应用,下面将详细介绍一个应用案例。

    微服务内的一个应用案例

    问题域

    假设我们的问题域是性能的时间汇聚:Task1 是 5minute 汇聚任务,Task2 是 1day 汇聚任务,Task3 是 1week 汇聚任务,Task4 是 1month 汇聚任务,并且这四个任务的执行有一定的时序关系,即当 Task1 执行完后才能开始执行 Task2,当 Task2 执行完后才能开始并发执行 Task3 和 Task4。


    task-Δt.png

    解决方案

    使用领域事件的解决方案:每个任务是一个聚合实例,上游任务发布领域事件,下游任务订阅领域事件,多任务间通过领域事件进行数据同步,完全解耦


    task-domain-event.png

    代码实现

    定义领域事件

    我们定义两个领域事件:task1HasCompleted 和 task2HasCompleted

    const (
        task1HasCompleted eventhandlers.Event = "Task1 has completed"
        task2HasCompleted eventhandlers.Event = "Task2 has completed"
    )
    

    仅订阅领域事件

    Task3 仅订阅领域事件 task2HasCompleted:

    type Task3 struct {
    
    }
    
    func (t *Task3) Exec() {
        task3Handler := new(Task3Handler)
        ehs := eventhandlers.GetInstance()
        ehs.Sub(task2HasCompleted, task3Handler)
        fmt.Println("task3 sub task2HasCompleted")
    }
    
    type Task3Handler struct {
    
    }
    
    func (t *Task3Handler) Handle() {
        fmt.Println("task3 handler start")
        time.Sleep(200 * time.Millisecond)
        fmt.Println("task3 handler end")
    }
    
    

    Task4 的代码和 Task3 类似。

    即订阅又发布领域事件

    Task2 既订阅领域事件 task1HasCompleted,又发布领域事件 task2HasCompleted:

    type Task2 struct {
    
    }
    
    func (t *Task2) Exec() {
        task2Handler := new(Task2Handler)
        ehs := eventhandlers.GetInstance()
        ehs.Sub(task1HasCompleted, task2Handler)
        fmt.Println("task2 sub task1HasCompleted")
    }
    
    type Task2Handler struct {
    
    }
    
    func (t *Task2Handler) Handle() {
        fmt.Println("task2 handler start")
        time.Sleep(100 * time.Millisecond)
        fmt.Println("task2 handler end")
        ehs := eventhandlers.GetInstance()
        ehs.Pub(task2HasCompleted)
        fmt.Println("task2 pub task2HasCompleted")
    }
    

    仅发布领域事件

    Task1 仅发布领域事件 task1HasCompleted:

    type Task1 struct {
    
    }
    
    func (t *Task1) Exec() {
        task1Handler := new(Task1Handler)
        task1Handler.Handle()
    }
    
    type Task1Handler struct {
    
    }
    
    func (t *Task1Handler) Handle() {
        fmt.Println("task1 handler start")
        time.Sleep(50 * time.Millisecond)
        fmt.Println("task1 handler end")
        ehs := eventhandlers.GetInstance()
        ehs.Pub(task1HasCompleted)
        fmt.Println("task1 pub task1HasCompleted")
    }
    

    事件处理器

    BC 内的事件处理器通过一个 map 简单实现,key 为 Event,value 为 []Handler,提供了 Pub 和 Sub 两个方法:

    package event handlers
    
    import (
        "sync"
    )
    
    type Event string
    
    type Handler interface {
        Handle()
    }
    
    type EventHandlers struct {
        ehsMap map[Event][]Handler
        lock sync.RWMutex
    }
    
    var inst *EventHandlers
    var once sync.Once
    func GetInstance() *EventHandlers {
        once.Do(func() {
            inst = &EventHandlers{ehsMap: make(map[Event][]Handler)}
        })
        return inst
    }
    
    func (e *EventHandlers) Pub(event Event) {
        e.lock.RLock()
        defer e.lock.RUnlock()
        if handlers, ok := e.ehsMap[event]; ok {
            for _, handler := range handlers {
                go handler.Handle()
            }
        }
    
    }
    
    func (e *EventHandlers) Sub(event Event, handler Handler) {
        e.lock.Lock()
        defer e.lock.Unlock()
        e.ehsMap[event] = append(e.ehsMap[event], handler)
    }
    

    注:笔者本地环境的 go 版本是 1.8,所以使用了 sync.RWMutex。如果读者本地环境的 go 版本是 1.9,那么可以直接使用 sync.Map。

    模拟多个任务的运行

    在main函数中主协程启动四个子协程来处理四个任务,同时主协程休眠1秒来等待子协程将任务完成:

    package main
    
    import "domain/task"
    
    func main() {
        task1 := new(task.Task1)
        go task1.Exec()
        task2 := new(task.Task2)
        go task2.Exec()
        task3 := new(task.Task3)
        go task3.Exec()
        task4 := new(task.Task4)
        go task4.Exec()
        time.Sleep(time.Second)
    }
    

    运行程序:输出符合期望

    $go run main.go 
    task3 sub task2HasCompleted
    task2 sub task1HasCompleted
    task1 handler start
    task4 sub task2HasCompleted
    task1 handler end
    task1 pub task1HasCompleted
    task2 handler start
    task2 handler end
    task2 pub task2HasCompleted
    task4 handler start
    task3 handler start
    task3 handler end
    task4 handler end
    

    小结

    领域事件是DDD战术设计中的一种模式,人们常用于解耦微服务间的依赖,各个微服务将达到最终一致性,而本文关注的是领域事件在微服务内的应用,并给出了一个详细的案例,希望对读者熟练应用领域事件模式有一定的帮助。

    相关文章

      网友评论

        本文标题:领域事件在微服务内的一个应用案例

        本文链接:https://www.haomeiwen.com/subject/bkemzxtx.html