美文网首页Go语言实践GoGolang 入门资料+笔记
如何撸一个GO版本的基于CQRS的事件驱动框架

如何撸一个GO版本的基于CQRS的事件驱动框架

作者: PioneerYi | 来源:发表于2020-03-22 16:03 被阅读0次

CQRS架构和事件驱动思想用的越来越广泛,Java开发的同学在实践CQRS架构时,应该基本都用过axon-framework;而GO语言,则没有这么一个框架,本文就一起探索下,如何撸一个类似axon的go版本的框架。

一、背景介绍

1、基于CQRS和事件驱动的架构

如下为基于CQRS的事件驱动架构:


CQRS框架

在这个架构图中,最核心的概念是Command、Event。

命令(Command)通常由比较简单的对象来表示,表明要做什么操作;当Command Bus(命令总线)收到command的时候会把他路由到对应的Command Handlers。每个Command Handler只会接收他指定类型的command并根据command里的内容来执行相应的业务逻辑。

Command Handler从repository获取到领域对象(Aggregates,聚合),然后在领域对象执行对应的方法(method)从而来改变它们的状态。

聚合的状态变化会导致产生领域事件。事件总线(Event bus)会将事件分发给所有感兴趣的事件监听者。他可以是同步方式也可以是异步方式。异步方式允许事件发送出去后就直接返回并将控制权交给用户,而此时的后台应用程序也会对事件进行处理。这样好处就是用户无需等待事件处理完成后才能做其他的事,所以应用程序的响应速度就会更快。另一方面,同步事件处理更简单且符合常理。默认情况下,同步处理方式会将command的处理和event的处理放进同一个事务中。

Event listeners接收事件并处理它们。一些hanndler的操作是更新数据源的数据,而另一些是将消息发送到其他外部系统。您可能会注意到,command handlers完全不知道接下来会发生什么事。这意味着对扩展应用程序新功能是非入侵式的。所以你需要做的是添加另一个事件监听器(event listener)。事件将应用程序中的所有组件很松散地耦合在一起。

仓储(repository)负责提供对聚合根的访问。通常情况下,这些仓储库只会针对通过其唯一标识符来查找聚合进行优化。一种方式是,仓储存储的是聚合本身的状态(例如使用对象关系映射),而另一种是,仓储通过Event Store存储聚合在各种行为中产生的真实事件。仓储还负责持久化聚合产生的事件。

Axon提供了对直接持久化聚合(例如使用对象关系映射方式)和事件溯源 (event sourcing)的支持。

在某些情况下,事件处理需要将新命令发送给应用程序。例如,当我们收到一个订单,这可能意味着客户的账户应该扣掉买商品的钱,并且必须告知运送方准备运送所购买的商品。在许多应用中,逻辑将变得比这更复杂:如果客户没有及时付款呢?您会立即寄出货物,还是先等待付款?saga是CQRS的概念,它负责管理这些复杂的业务事务。

命令查询职责分离(CQRS)是构建软件系统的一种方式,它的思想是将对状态的查询部分与改变状态的部分进行分离。 Axon 框架是一个基于 Java 实现的 CQRS 框架,提供了对大多数重要构建块的实现,例如聚合、命令与事件总线、以及 repository,以帮助开发者在构建应用程序时使用 CQRS 架构模式。

2、解决哪些问题

CQRS 中的命令这方面最重要的部分包括:

  • 命令(Command),它负责捕获用户的意图,即接下来应该发生什么事。在 Axon 中,命令被实现为 POJO 对象,因而无需实现任何接口;
  • 命令处理器(Command Handler),它负责执行所发送的命令。在 Axon 中,可以选择通过实现某个接口的方式创建它,也可以通过注解符实现;
  • 命令总线(Command Bus),负责将命令传递给对应的命令处理器。Axon 总共提供了四种实现方式,Aderemi 选择了一种简单的同步总线用于传递命令。另一种实现方式是通过异步总线以异步的方式处理命令。

事件最重要的部分包括:

  • 领域事件(Domain Event),它表示发生于过去的某事,由领域中的状态变化、命令及命令处理器中所初始化的变化所创建;
  • 事件总线(Event Bus),它负责将事件传递给查询方。Axon 中提供了多种实现方式,Aderemi 选择了一种较简单的实现;
  • 事件处理器(Event Handler),它负责侦听事件,通过事件中所包含的信息,在查询方反应出应用程序的状态。在 Axon 中,可通过注解符定义事件处理器;
  • 事件存储,负责持久化事件。

二、命令接口设计和实现

1、Command gateway

Command gateway我们定义三个方法,分别为:1)单发送命令不关注结果;2)发送命令并且等待结果;3)发送命令并且等待结果,但是有超时控制。接口类定义如下:

type CommandGateway interface {
    // Sends the given command
    Send(ctx context.Context, Command interface{}) error

    // Sends the given command and wait for it to execute. The result of the execution is returned when available.
    // This method will block indefinitely ,until a result is available,or until the Thread is interrupted. When the
    // thread is interrupted, this method returns nil and the detail error.
    SendAndWait(ctx context.Context, Command interface{}) (result interface{}, err error)

    // Sends the given command and wait for it to execute. The result of the execution is returned when available.
    // This method will block until a result is available, or the given {@code timeout} was reached,or until the
    // thread is interrupted. When the timeout is reached or the thread is interrupted,this method returns nil
    // and the detail error.
    SendAndWaitWithTimeout(ctx context.Context, Command interface{}, timeout int64) (result interface{}, err error)
}

Command Gateway的简单实现如下:

type SimpleCommandGateway struct {
    CommandBus commandhandling2.CommandBus
}

func (a *SimpleCommandGateway) Send(ctx context.Context, command interface{}) error {
    commandMessage := commandhandling2.NewMiddleCommandMessage(command)
    a.CommandBus.Dispatch(ctx, commandMessage)
    return nil
}

func (a *SimpleCommandGateway) SendAndWait(ctx context.Context, command interface{}) (result interface{}, err error) {
    commandMessage := commandhandling2.NewMiddleCommandMessage(command)
    resp, err := a.CommandBus.DispatchWithReturn(ctx, commandMessage)
    return resp, err
}

func (a *SimpleCommandGateway) SendAndWaitWithTimeout(ctx context.Context, command interface{},
    timeout int64) (result interface{}, err error) {
    commandMessage := commandhandling2.NewMiddleCommandMessage(command)
    resp, err := a.CommandBus.DispatchWithReturnAndTimeout(ctx, commandMessage, timeout)
    return resp, err
}

Command Gateway是对外提供的接口类,内部是对于命令的实际路由工作是交给Command bus的,下面看看Command Bus实现。

2、Command Bus

CommandBus是命令总线,提供发布订阅命令功能,接口方法设计如下:

type CommandBus interface {
    // Dispatch the given {@code CommandMessage} to the CommandHandler subscribed to the given {@code command}'s commandType.
    // No feedback is given about the status of the dispatching process. Implementations may return immediately after
    // asserting a valid handler is registered for the given command.
    Dispatch(ctx context.Context, command CommandMessage) error

    // Dispatch the given {@code CommandMessage} to the CommandHandler subscribed to the given {@code CommandMessage}'s commandType.
    // Waiting the result of the processing and return
    DispatchWithReturn(ctx context.Context, command CommandMessage) (interface{}, error)

    // Dispatch the given {@code CommandMessage} to the CommandHandler subscribed to the given {@code CommandMessage}'s commandType.
    // Waiting the result of the processing and return or time out
    DispatchWithReturnAndTimeout(ctx context.Context, command CommandMessage,timeout int64) (interface{}, error)

    // Subscribe the given {@code CommandHandler} to command {@code command}.
    Subscribe(command interface{},handler CommandHandler) error
}

Command Bus我们应该提供同步和异步两种模式,然后根据实际情况,通过配置决定到底使用同步还是异步模式。

同步的Command Bus

// Implementation of the CommandBus that dispatches commands to the handlers subscribed to that specific command's commandType.
type SimpleCommandBus struct {
    handlers   map[string]CommandHandler
    handlersMu sync.RWMutex
}

func (bus *SimpleCommandBus) Dispatch(ctx context.Context, commandMessage CommandMessage) error {
    _, err := bus.doDispatch(ctx, commandMessage)
    return err
}

func (bus *SimpleCommandBus) DispatchWithReturn(ctx context.Context, commandMessage CommandMessage) (interface{}, error) {
    resp, err := bus.doDispatch(ctx, commandMessage)
    return resp, err
}

func (bus *SimpleCommandBus) DispatchWithReturnAndTimeout(ctx context.Context, commandMessage CommandMessage, timeout int64) (interface{}, error) {
    return bus.DispatchWithReturn(ctx, commandMessage)
}

// Subscribe the given {@code handler} to commands with given {@code command}.
func (bus *SimpleCommandBus) Subscribe(command interface{}, handler CommandHandler) error {
    var commandName string
    if reflect.TypeOf(command).Kind() == reflect.Struct {
        commandName = GetCommandType(command)
    } else {
        commandName = command.(string)
    }
    return bus.SubscribeWithTopic(commandName, handler)
}

对于同步模式,DispatchWithReturnAndTimeout接口是不存在什么超不超时的,有结果了就立即返回。

异步的Command Bus实现

同步的Command Bus好实现,那么异步的该如何实现了?超时又怎么控制了?

我们可以用go中的协程来实现异步,然后返回就用chan来实现。定义异步Command Bus类AsynchronousCommandBus如下:

// Implementation of the CommandBus that processed Commands asynchronously in a new goroutines
type AsynchronousCommandBus struct {
    SimpleCommandBus
    resp chan response
}

type response struct {
    data interface{}
    err  error
}

dispatch command如下:

func (a *AsynchronousCommandBus) dispatch(ctx context.Context, command CommandMessage, handler CommandHandler) (interface{}, error) {
    // execute handle command with a new goroutines
    go func() {
        resp, err := handler.HandleCommand(ctx, command.Payload())
        ret := response{
            data: resp,
            err:  err,
        }
        a.resp <- ret
    }()
    out := <-a.resp
    return out.data, out.err
}

那么超时如何控制了,可以这么控制:

select {
    case out := <-a.resp:
        return out.data, out.err
    case <-time.After(time.Duration(timeout) * time.Millisecond):
        return "command handle time out", errors.New("handle time out")
}

超时dispatch完整实现如下:

func (a *AsynchronousCommandBus) dispatchWithTimeout(ctx context.Context, command CommandMessage, handler CommandHandler,
    timeout int64) (interface{}, error) {
    // execute handle command with a new goroutines
    go func() {
        resp, err := handler.HandleCommand(ctx, command.Payload())
        ret := response{
            data: resp,
            err:  err,
        }
        a.resp <- ret
    }()

    select {
    case out := <-a.resp:
        return out.data, out.err
    case <-time.After(time.Duration(timeout) * time.Millisecond):
        return "command handle time out", errors.New("handle time out")
    }
}

3、Command Handler

// CommandHandler specialization for handlers of command.
type CommandHandler interface {
    // Handles the given {@code command}.
    HandleCommand(ctx context.Context, command interface{}) (resp interface{}, err error)
}

// CommandHandlerFunc is a function that can be used as a command handler.
type CommandHandlerFunc func(ctx context.Context, command interface{}) (resp interface{}, err error)

// HandleEvent implements the HandleEvent method of the CommandHandler.
func (handlerFunc CommandHandlerFunc) HandleCommand(ctx context.Context, command interface{}) (resp interface{}, err error) {
    return handlerFunc(ctx, command)
}

Command Handler框架提供接口定义,具体实现由用本框架的具体应用程序提供。

三、事件接口设计和实现

1、Event Bus

Event Bus接口,定义了发布和订阅两个接口方法,如下:

type EventBus interface {
    // Publishes the event on the bus.
    Publish(ctx context.Context, eventMessage EventMessage) error

    // Subscribe the event on the bus
    Subscribe(event interface{}, handler EventHandlerFunc) error
}

对事件感兴趣者,可能是本系统内的,也可能是其他系统的,因此Event Bus需要提供支持本地的和外部的实现。本地的较简单,我们可以用go的chan实现,但是外部的则相对复杂一些,我们需要用Kafka或者其他消息中间件实现。然后根据实际情况,选择是使用仅支持本地的Event Bus,还是选择使用支持外部的Event Bus。这里我们介绍一下本地Event Bus。

// EventBus is a local event bus that delegates handling of published events
// to all matching registered handlers, in order of registration.
type LocalEventBus struct {
    messageBus messaging2.MessageBus
}

// NewEventBus creates a EventBus.
func NewLocalEventBus() EventBus {
    return &LocalEventBus{
        messageBus: messaging2.New(runtime.NumCPU()),
    }
}

// Publish implements the method of the bus.EventBus interface.
func (bus *LocalEventBus) Publish(ctx context.Context, eventMessage EventMessage) error {
    bus.messageBus.Publish(eventMessage.EventType(), ctx, eventMessage)
    return nil
}

// Subscribe implements the method of the bus.EventBus interface.
func (bus *LocalEventBus) Subscribe(event interface{}, handler EventHandlerFunc) error {
    var eventName string
    if reflect.TypeOf(event).Kind() == reflect.Struct {
        eventName = GetEventType(event)
    } else {
        eventName = event.(string)
    }
    return bus.messageBus.Subscribe(eventName, handler)
}

其中核心的工作,交给了我们定义的MessageBus,看看怎么实现的:

// MessageBus implements publish/subscribe messaging paradigm
type MessageBus interface {
    // Publish publishes arguments to the given topic subscribers
    // Publish block only when the buffer of one of the subscribers is full.
    Publish(topic string, args ...interface{})
    // Close unsubscribe all handlers from given topic
    Close(topic string)
    // Subscribe subscribes to the given topic
    Subscribe(topic string, fn interface{}) error
    // Unsubscribe unsubscribe handler from the given topic
    Unsubscribe(topic string, fn interface{}) error
}

type handlersMap map[string][]*handler

type handler struct {
    callback reflect.Value
    queue    chan []reflect.Value
}

type messageBus struct {
    handlerQueueSize int
    mtx              sync.RWMutex
    handlers         handlersMap
}

func (b *messageBus) Publish(topic string, args ...interface{}) {
    rArgs := buildHandlerArgs(args)

    b.mtx.RLock()
    defer b.mtx.RUnlock()

    if hs, ok := b.handlers[topic]; ok {
        for _, h := range hs {
            h.queue <- rArgs
        }
    }
}

func (b *messageBus) Subscribe(topic string, fn interface{}) error {
    if reflect.TypeOf(fn).Kind() != reflect.Func {
        return fmt.Errorf("%s is not a reflect.Func", reflect.TypeOf(fn))
    }

    h := &handler{
        callback: reflect.ValueOf(fn),
        queue:    make(chan []reflect.Value, b.handlerQueueSize),
    }

    go func() {
        for args := range h.queue {
            h.callback.Call(args)
        }
    }()

    b.mtx.Lock()
    defer b.mtx.Unlock()

    b.handlers[topic] = append(b.handlers[topic], h)

    return nil
}

其中定义了一个handlersMap,保存消息的topic和handler 的map对,其中handler的定义如下:

type handler struct {
    callback reflect.Value
    queue    chan []reflect.Value
}

handler中保存了方法的反射值,以及方法的参数,这样我们即可待用 callback.call(args)即可调用处理方法。

其实在CommandBus的处理上,我们也完全可以用这个封装的MessageBus,两种实现吧,反正没事瞎折腾着玩呗,哈哈!

2、Event Handler

Event Handler接口的定义如下:

// EventHandler is a handler of event.
type EventHandler interface {
    // HandleEvent handles an event.
    HandleEvent(ctx context.Context, eventMessage EventMessage)
}

// EventHandlerFunc is a function that can be used as a event handler.
type EventHandlerFunc func(ctx context.Context, eventMessage EventMessage)

// HandleEvent implements the HandleEvent method of the EventHandler.
func (h EventHandlerFunc) HandleEvent(ctx context.Context, eventMessage EventMessage) {
    h(ctx, eventMessage)
}

同CommandHandler一样,EventHandler的实现由业务决定。

3、Event Store

Event Store接口定义如下:

// EventStore is an interface for an event sourcing event store.
type EventStore interface {
    // Save appends all events in the event stream to the store.
    Save(ctx context.Context, events []eventhandling.EventMessage) error

    // Load loads all events for the aggregate id from the store.
    Load(ctx context.Context, id string) ([]eventhandling.EventMessage, error)
}

根据不同存储,我们可以提供Event Store的不同实现,比如存Memory,以及存Mysql。内存EventStore比较简单,这里介绍下Mysql的EventStore如何实现,直接看代码比较好:


type MysqlStore struct {
    dbHost    string
    dbPort    int
    user      string
    password  string
    dataBase  string
    tableName string
}

// New returns a new MysqlStore
func NewMysqlStore(dbHost string, dbPort int, user string, password string, dataBase string, tableName string) (*MysqlStore, error) {
    store := &MysqlStore{
        dbHost:    dbHost,
        dbPort:    dbPort,
        user:      user,
        password:  password,
        dataBase:  dataBase,
        tableName: tableName,
    }
    return store, nil
}

func (m *MysqlStore) expand(statement string) string {
    return strings.Replace(statement, "${TABLE}", m.tableName, -1)
}

func (m *MysqlStore) Save(ctx context.Context, events []eventhandling.EventMessage) error {
    if len(events) == 0 {
        return ErrNoEventsToAppend
    }
    var records []EventRecord
    for _, event := range events {
        record, _ := serializer.MarshalEvent(event)
        records = append(records, record)
    }
    return m.doSave(ctx, records)
}

func (m *MysqlStore) doSave(ctx context.Context, records []EventRecord) error {
    if len(records) == 0 {
        return ErrNoEventsToAppend
    }
    dbSourceName := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", m.user, m.password, m.dbHost, m.dbPort, m.dataBase)
    db, err := sql.Open("mysql", dbSourceName)

    if err != nil {
        return errors.New("mysql save failed, unable to connect to db")
    }
    defer db.Close()

    stmt, err := db.PrepareContext(ctx, m.expand(INSERT_SQL))
    if err != nil {
        return errors.New("mysql save failed, unable to prepare statement")
    }
    defer stmt.Close()

    for _, record := range records {
        _, err = stmt.Exec(record.AggregateId, record.Data, record.Version)
        if err != nil {
            return errors.New("mysql save failed, execute statement fail")
        }
    }
    return nil
}

func (m *MysqlStore) Load(ctx context.Context, aggregateId string) ([]eventhandling.EventMessage, error) {
    records, err := m.doLoad(ctx, aggregateId)
    if err != nil {
        return nil, err
    }

    var events []eventhandling.EventMessage
    for _, record := range records {
        event, err := serializer.UnmarshalEvent(record)
        if err != nil {
            return nil, err
        }
        events = append(events, event)
    }
    return events, nil
}

func (m *MysqlStore) doLoad(ctx context.Context, aggregateId string) (EventRecords, error) {
    dbSourceName := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", m.user, m.password, m.dbHost, m.dbPort, m.dataBase)
    db, err := sql.Open("mysql", dbSourceName)
    if err != nil {
        return nil, errors.New("mysql load failed,unable to connect to db")
    }
    defer db.Close()

    rows, err := db.Query(m.expand(SELECT_SQL), aggregateId)
    if err != nil {
        return nil, errors.New("mysql load failed, unable to query rows")
    }

    records := EventRecords{}
    for rows.Next() {
        record := EventRecord{}
        if err := rows.Scan(&record.AggregateId, &record.Data, &record.Version); err != nil {
            return nil, errors.New("mysql load failed, unable to parse row")
        }
        records = append(records, record)
    }
    return records, nil
}

4、Event Sourcing

通过事件朔源然后得出聚合,这是这种模式的精华所在,相对也是比较的复杂的,我们需要做的是:

  • 从Event Store中加载出聚合的所有的事件;
  • 将所有事件执行一遍,得出最新的聚合状态;
  • 为了提升性能,避免每次都从头开始朔源,要提供快照,然后下载朔源,从快照出朔源即可。
    我们先实现一个简单的,不带快照版本的:
// EventSourcingRepository provides the primary abstraction to saving and loading events
type EventSourcingRepository struct {
    eventExecutor reflect.Type
    eventStore    EventStore
    serializer    Serializer
    dbMu          sync.RWMutex
}

func (r *EventSourcingRepository) Load(ctx context.Context, uid modelling.UinId) (interface{}, error) {
    events, err := r.eventStore.Load(ctx, uid.Identifier)
    if err != nil {
        return nil, errors.New("load event sourcing aggregate fail")
    }
    aggregate := r.New()
    for _, eventItem := range events {
        err = aggregate.ExecuteEvent(eventItem)
    }
    return aggregate, nil
}

func (r *EventSourcingRepository) Save(ctx context.Context, aggregate interface{}) error {
    r.dbMu.RLock()
    defer r.dbMu.RUnlock()

    var Events []eventhandling.EventMessage
    if v, ok := aggregate.(modelling.IEvents); ok {
        Events = v.AggregateEvents()
    }
    if len(Events) > 0 {
        err := r.SaveMapEvents(ctx, Events)
        if err != nil {
            return err
        }
    }
    return  nil
}

四、快速开始

1、创建聚合

type CustomAggregate struct {
      modelling.BaseAggregate
    
      // 定义聚合成员
}

2、发布订阅命令

定义Command

如下为一个测试命令domainCommand,注意我们实现了自定义的CommandType接口,这个接口可以提供一个有别于Command struct名的命令名字。

type domainCommand struct {
    Name string `json:"name"`
    Age  int    `json:"age"`
}

// optional
func (t domainCommand) CommandType() string {
    return "my type"
}

发布Command

domainCommand为业务自定义的命令,commandGateway为默认的配置,发布命令如下:

resp, err := ddd.CommandGateway().SendAndWait(c, domainCommand)

发布命令的topic为command type,如果command未实现CommandType()接口,那么默认就为struct名,如果实现了,那么就采用实现的接口提供的Command Type。

订阅Command

首先需要topic,topic为command type,订阅者为一个自定义的CommandHandler。

_ = CommandBus().Subscribe(command.CreateAccountCommand{}, handler.CreateAccountCommandHandler())

框架会自动获取command type,其内部逻辑就是如果是实现了CommandType()接口,那么优先采用接口提供的command type,否则默认使用Command Struct名。

其中CommandHanlder接口定义如下:

HandleCommand(ctx context.Context, command interface{}) (resp interface{}, err error)

自定义的CommandHandler,一般形式如下:

func CustomCommandHandler() commandhandling.CommandHandlerFunc {
    return func(ctx context.Context, commandMessage interface{}) (result interface{}, err error) {

        // 1) get command data
        
        // 2) load aggregate

        // 3)  operate aggregate    

        // 4) save aggregate

        return resp, nil
    }
}

3、发布订阅事件

定义事件

如下为一个领域事件customEvent,注意我们实现了自定义的EventType接口,这个接口可以提供一个有别于Event struct名的Event Type。

type customEvent struct {
    Name string
    Age  int
}

// optional
func (e customEvent) EventType() string {
    return "my type"
}

Apply和Publish事件

框架将发出领域事件,分为两步,首先聚合生成领域事件,但是生成后并不立马发送,发送时机有业务决定并显式调用一个publish方法,一般的发布时机为成功存储聚合后。

聚合发布Apply事件如下:

aggregate.ApplyEvent(context, customEvent)

其中,context为上下文,customEvent为业务自定义的事件,发布事件的topic为event type,如果event未实现EventType()接口,那么默认就为struct名,如果实现了接口,那么就采用实现的接口提供的Event Type。

发布聚合方式如下:

aggregate.PublishEvent(ctx)

其中ctx为上下文。

subscribe事件

首先仍然需要topic,topic为event type,,订阅者为一个自定义的EventHandler。

_ = EventBus().Subscribe(customEvent{}, customEventHandler)

框架自动获取event type,其内部逻辑为:如果实现了GetEventType()接口,那么优先采用接口提供的event type,否则默认使用Event Struct名。

the event handler is like this:

func CustomeEventHandler() eventhandling.EventHandlerFunc {
    return func(ctx context.Context, eventMessage eventhandling.EventMessage) { 
        // 1) get event data
        
        // 2) do your business  
    }
}

五、后记

本框架是我们中心在实践领域驱动设计时,我开发的一个GO语言版的事件驱动框架供大家使用,基本功能都有了,也已在正式服务开发使用。

但是这个框架其实还有很多不足,需要继续开发完善。代码后续开源,如果有感兴趣者一起开发,或许可以开发出一个axon-go也不一定哦!

相关文章

网友评论

    本文标题:如何撸一个GO版本的基于CQRS的事件驱动框架

    本文链接:https://www.haomeiwen.com/subject/khbmyhtx.html