美文网首页Go语言实践GoGolang 入门资料+笔记
如何撸一个GO版本的基于CQRS的事件驱动框架

如何撸一个GO版本的基于CQRS的事件驱动框架

作者: PioneerYi | 来源:发表于2020-03-22 16:03 被阅读0次

    CQRS架构和事件驱动思想用的越来越广泛,Java开发的同学在实践CQRS架构时,应该基本都用过axon-framework;而GO语言,则没有这么一个框架,本文就一起探索下,如何撸一个类似axon的go版本的框架。

    一、背景介绍

    1、基于CQRS和事件驱动的架构

    如下为基于CQRS的事件驱动架构:


    CQRS框架

    在这个架构图中,最核心的概念是Command、Event。

    命令(Command)通常由比较简单的对象来表示,表明要做什么操作;当Command Bus(命令总线)收到command的时候会把他路由到对应的Command Handlers。每个Command Handler只会接收他指定类型的command并根据command里的内容来执行相应的业务逻辑。

    Command Handler从repository获取到领域对象(Aggregates,聚合),然后在领域对象执行对应的方法(method)从而来改变它们的状态。

    聚合的状态变化会导致产生领域事件。事件总线(Event bus)会将事件分发给所有感兴趣的事件监听者。他可以是同步方式也可以是异步方式。异步方式允许事件发送出去后就直接返回并将控制权交给用户,而此时的后台应用程序也会对事件进行处理。这样好处就是用户无需等待事件处理完成后才能做其他的事,所以应用程序的响应速度就会更快。另一方面,同步事件处理更简单且符合常理。默认情况下,同步处理方式会将command的处理和event的处理放进同一个事务中。

    Event listeners接收事件并处理它们。一些hanndler的操作是更新数据源的数据,而另一些是将消息发送到其他外部系统。您可能会注意到,command handlers完全不知道接下来会发生什么事。这意味着对扩展应用程序新功能是非入侵式的。所以你需要做的是添加另一个事件监听器(event listener)。事件将应用程序中的所有组件很松散地耦合在一起。

    仓储(repository)负责提供对聚合根的访问。通常情况下,这些仓储库只会针对通过其唯一标识符来查找聚合进行优化。一种方式是,仓储存储的是聚合本身的状态(例如使用对象关系映射),而另一种是,仓储通过Event Store存储聚合在各种行为中产生的真实事件。仓储还负责持久化聚合产生的事件。

    Axon提供了对直接持久化聚合(例如使用对象关系映射方式)和事件溯源 (event sourcing)的支持。

    在某些情况下,事件处理需要将新命令发送给应用程序。例如,当我们收到一个订单,这可能意味着客户的账户应该扣掉买商品的钱,并且必须告知运送方准备运送所购买的商品。在许多应用中,逻辑将变得比这更复杂:如果客户没有及时付款呢?您会立即寄出货物,还是先等待付款?saga是CQRS的概念,它负责管理这些复杂的业务事务。

    命令查询职责分离(CQRS)是构建软件系统的一种方式,它的思想是将对状态的查询部分与改变状态的部分进行分离。 Axon 框架是一个基于 Java 实现的 CQRS 框架,提供了对大多数重要构建块的实现,例如聚合、命令与事件总线、以及 repository,以帮助开发者在构建应用程序时使用 CQRS 架构模式。

    2、解决哪些问题

    CQRS 中的命令这方面最重要的部分包括:

    • 命令(Command),它负责捕获用户的意图,即接下来应该发生什么事。在 Axon 中,命令被实现为 POJO 对象,因而无需实现任何接口;
    • 命令处理器(Command Handler),它负责执行所发送的命令。在 Axon 中,可以选择通过实现某个接口的方式创建它,也可以通过注解符实现;
    • 命令总线(Command Bus),负责将命令传递给对应的命令处理器。Axon 总共提供了四种实现方式,Aderemi 选择了一种简单的同步总线用于传递命令。另一种实现方式是通过异步总线以异步的方式处理命令。

    事件最重要的部分包括:

    • 领域事件(Domain Event),它表示发生于过去的某事,由领域中的状态变化、命令及命令处理器中所初始化的变化所创建;
    • 事件总线(Event Bus),它负责将事件传递给查询方。Axon 中提供了多种实现方式,Aderemi 选择了一种较简单的实现;
    • 事件处理器(Event Handler),它负责侦听事件,通过事件中所包含的信息,在查询方反应出应用程序的状态。在 Axon 中,可通过注解符定义事件处理器;
    • 事件存储,负责持久化事件。

    二、命令接口设计和实现

    1、Command gateway

    Command gateway我们定义三个方法,分别为:1)单发送命令不关注结果;2)发送命令并且等待结果;3)发送命令并且等待结果,但是有超时控制。接口类定义如下:

    type CommandGateway interface {
        // Sends the given command
        Send(ctx context.Context, Command interface{}) error
    
        // Sends the given command and wait for it to execute. The result of the execution is returned when available.
        // This method will block indefinitely ,until a result is available,or until the Thread is interrupted. When the
        // thread is interrupted, this method returns nil and the detail error.
        SendAndWait(ctx context.Context, Command interface{}) (result interface{}, err error)
    
        // Sends the given command and wait for it to execute. The result of the execution is returned when available.
        // This method will block until a result is available, or the given {@code timeout} was reached,or until the
        // thread is interrupted. When the timeout is reached or the thread is interrupted,this method returns nil
        // and the detail error.
        SendAndWaitWithTimeout(ctx context.Context, Command interface{}, timeout int64) (result interface{}, err error)
    }
    

    Command Gateway的简单实现如下:

    type SimpleCommandGateway struct {
        CommandBus commandhandling2.CommandBus
    }
    
    func (a *SimpleCommandGateway) Send(ctx context.Context, command interface{}) error {
        commandMessage := commandhandling2.NewMiddleCommandMessage(command)
        a.CommandBus.Dispatch(ctx, commandMessage)
        return nil
    }
    
    func (a *SimpleCommandGateway) SendAndWait(ctx context.Context, command interface{}) (result interface{}, err error) {
        commandMessage := commandhandling2.NewMiddleCommandMessage(command)
        resp, err := a.CommandBus.DispatchWithReturn(ctx, commandMessage)
        return resp, err
    }
    
    func (a *SimpleCommandGateway) SendAndWaitWithTimeout(ctx context.Context, command interface{},
        timeout int64) (result interface{}, err error) {
        commandMessage := commandhandling2.NewMiddleCommandMessage(command)
        resp, err := a.CommandBus.DispatchWithReturnAndTimeout(ctx, commandMessage, timeout)
        return resp, err
    }
    

    Command Gateway是对外提供的接口类,内部是对于命令的实际路由工作是交给Command bus的,下面看看Command Bus实现。

    2、Command Bus

    CommandBus是命令总线,提供发布订阅命令功能,接口方法设计如下:

    type CommandBus interface {
        // Dispatch the given {@code CommandMessage} to the CommandHandler subscribed to the given {@code command}'s commandType.
        // No feedback is given about the status of the dispatching process. Implementations may return immediately after
        // asserting a valid handler is registered for the given command.
        Dispatch(ctx context.Context, command CommandMessage) error
    
        // Dispatch the given {@code CommandMessage} to the CommandHandler subscribed to the given {@code CommandMessage}'s commandType.
        // Waiting the result of the processing and return
        DispatchWithReturn(ctx context.Context, command CommandMessage) (interface{}, error)
    
        // Dispatch the given {@code CommandMessage} to the CommandHandler subscribed to the given {@code CommandMessage}'s commandType.
        // Waiting the result of the processing and return or time out
        DispatchWithReturnAndTimeout(ctx context.Context, command CommandMessage,timeout int64) (interface{}, error)
    
        // Subscribe the given {@code CommandHandler} to command {@code command}.
        Subscribe(command interface{},handler CommandHandler) error
    }
    

    Command Bus我们应该提供同步和异步两种模式,然后根据实际情况,通过配置决定到底使用同步还是异步模式。

    同步的Command Bus

    // Implementation of the CommandBus that dispatches commands to the handlers subscribed to that specific command's commandType.
    type SimpleCommandBus struct {
        handlers   map[string]CommandHandler
        handlersMu sync.RWMutex
    }
    
    func (bus *SimpleCommandBus) Dispatch(ctx context.Context, commandMessage CommandMessage) error {
        _, err := bus.doDispatch(ctx, commandMessage)
        return err
    }
    
    func (bus *SimpleCommandBus) DispatchWithReturn(ctx context.Context, commandMessage CommandMessage) (interface{}, error) {
        resp, err := bus.doDispatch(ctx, commandMessage)
        return resp, err
    }
    
    func (bus *SimpleCommandBus) DispatchWithReturnAndTimeout(ctx context.Context, commandMessage CommandMessage, timeout int64) (interface{}, error) {
        return bus.DispatchWithReturn(ctx, commandMessage)
    }
    
    // Subscribe the given {@code handler} to commands with given {@code command}.
    func (bus *SimpleCommandBus) Subscribe(command interface{}, handler CommandHandler) error {
        var commandName string
        if reflect.TypeOf(command).Kind() == reflect.Struct {
            commandName = GetCommandType(command)
        } else {
            commandName = command.(string)
        }
        return bus.SubscribeWithTopic(commandName, handler)
    }
    

    对于同步模式,DispatchWithReturnAndTimeout接口是不存在什么超不超时的,有结果了就立即返回。

    异步的Command Bus实现

    同步的Command Bus好实现,那么异步的该如何实现了?超时又怎么控制了?

    我们可以用go中的协程来实现异步,然后返回就用chan来实现。定义异步Command Bus类AsynchronousCommandBus如下:

    // Implementation of the CommandBus that processed Commands asynchronously in a new goroutines
    type AsynchronousCommandBus struct {
        SimpleCommandBus
        resp chan response
    }
    
    type response struct {
        data interface{}
        err  error
    }
    

    dispatch command如下:

    func (a *AsynchronousCommandBus) dispatch(ctx context.Context, command CommandMessage, handler CommandHandler) (interface{}, error) {
        // execute handle command with a new goroutines
        go func() {
            resp, err := handler.HandleCommand(ctx, command.Payload())
            ret := response{
                data: resp,
                err:  err,
            }
            a.resp <- ret
        }()
        out := <-a.resp
        return out.data, out.err
    }
    

    那么超时如何控制了,可以这么控制:

    select {
        case out := <-a.resp:
            return out.data, out.err
        case <-time.After(time.Duration(timeout) * time.Millisecond):
            return "command handle time out", errors.New("handle time out")
    }
    

    超时dispatch完整实现如下:

    func (a *AsynchronousCommandBus) dispatchWithTimeout(ctx context.Context, command CommandMessage, handler CommandHandler,
        timeout int64) (interface{}, error) {
        // execute handle command with a new goroutines
        go func() {
            resp, err := handler.HandleCommand(ctx, command.Payload())
            ret := response{
                data: resp,
                err:  err,
            }
            a.resp <- ret
        }()
    
        select {
        case out := <-a.resp:
            return out.data, out.err
        case <-time.After(time.Duration(timeout) * time.Millisecond):
            return "command handle time out", errors.New("handle time out")
        }
    }
    

    3、Command Handler

    // CommandHandler specialization for handlers of command.
    type CommandHandler interface {
        // Handles the given {@code command}.
        HandleCommand(ctx context.Context, command interface{}) (resp interface{}, err error)
    }
    
    // CommandHandlerFunc is a function that can be used as a command handler.
    type CommandHandlerFunc func(ctx context.Context, command interface{}) (resp interface{}, err error)
    
    // HandleEvent implements the HandleEvent method of the CommandHandler.
    func (handlerFunc CommandHandlerFunc) HandleCommand(ctx context.Context, command interface{}) (resp interface{}, err error) {
        return handlerFunc(ctx, command)
    }
    

    Command Handler框架提供接口定义,具体实现由用本框架的具体应用程序提供。

    三、事件接口设计和实现

    1、Event Bus

    Event Bus接口,定义了发布和订阅两个接口方法,如下:

    type EventBus interface {
        // Publishes the event on the bus.
        Publish(ctx context.Context, eventMessage EventMessage) error
    
        // Subscribe the event on the bus
        Subscribe(event interface{}, handler EventHandlerFunc) error
    }
    

    对事件感兴趣者,可能是本系统内的,也可能是其他系统的,因此Event Bus需要提供支持本地的和外部的实现。本地的较简单,我们可以用go的chan实现,但是外部的则相对复杂一些,我们需要用Kafka或者其他消息中间件实现。然后根据实际情况,选择是使用仅支持本地的Event Bus,还是选择使用支持外部的Event Bus。这里我们介绍一下本地Event Bus。

    // EventBus is a local event bus that delegates handling of published events
    // to all matching registered handlers, in order of registration.
    type LocalEventBus struct {
        messageBus messaging2.MessageBus
    }
    
    // NewEventBus creates a EventBus.
    func NewLocalEventBus() EventBus {
        return &LocalEventBus{
            messageBus: messaging2.New(runtime.NumCPU()),
        }
    }
    
    // Publish implements the method of the bus.EventBus interface.
    func (bus *LocalEventBus) Publish(ctx context.Context, eventMessage EventMessage) error {
        bus.messageBus.Publish(eventMessage.EventType(), ctx, eventMessage)
        return nil
    }
    
    // Subscribe implements the method of the bus.EventBus interface.
    func (bus *LocalEventBus) Subscribe(event interface{}, handler EventHandlerFunc) error {
        var eventName string
        if reflect.TypeOf(event).Kind() == reflect.Struct {
            eventName = GetEventType(event)
        } else {
            eventName = event.(string)
        }
        return bus.messageBus.Subscribe(eventName, handler)
    }
    

    其中核心的工作,交给了我们定义的MessageBus,看看怎么实现的:

    // MessageBus implements publish/subscribe messaging paradigm
    type MessageBus interface {
        // Publish publishes arguments to the given topic subscribers
        // Publish block only when the buffer of one of the subscribers is full.
        Publish(topic string, args ...interface{})
        // Close unsubscribe all handlers from given topic
        Close(topic string)
        // Subscribe subscribes to the given topic
        Subscribe(topic string, fn interface{}) error
        // Unsubscribe unsubscribe handler from the given topic
        Unsubscribe(topic string, fn interface{}) error
    }
    
    type handlersMap map[string][]*handler
    
    type handler struct {
        callback reflect.Value
        queue    chan []reflect.Value
    }
    
    type messageBus struct {
        handlerQueueSize int
        mtx              sync.RWMutex
        handlers         handlersMap
    }
    
    func (b *messageBus) Publish(topic string, args ...interface{}) {
        rArgs := buildHandlerArgs(args)
    
        b.mtx.RLock()
        defer b.mtx.RUnlock()
    
        if hs, ok := b.handlers[topic]; ok {
            for _, h := range hs {
                h.queue <- rArgs
            }
        }
    }
    
    func (b *messageBus) Subscribe(topic string, fn interface{}) error {
        if reflect.TypeOf(fn).Kind() != reflect.Func {
            return fmt.Errorf("%s is not a reflect.Func", reflect.TypeOf(fn))
        }
    
        h := &handler{
            callback: reflect.ValueOf(fn),
            queue:    make(chan []reflect.Value, b.handlerQueueSize),
        }
    
        go func() {
            for args := range h.queue {
                h.callback.Call(args)
            }
        }()
    
        b.mtx.Lock()
        defer b.mtx.Unlock()
    
        b.handlers[topic] = append(b.handlers[topic], h)
    
        return nil
    }
    

    其中定义了一个handlersMap,保存消息的topic和handler 的map对,其中handler的定义如下:

    type handler struct {
        callback reflect.Value
        queue    chan []reflect.Value
    }
    

    handler中保存了方法的反射值,以及方法的参数,这样我们即可待用 callback.call(args)即可调用处理方法。

    其实在CommandBus的处理上,我们也完全可以用这个封装的MessageBus,两种实现吧,反正没事瞎折腾着玩呗,哈哈!

    2、Event Handler

    Event Handler接口的定义如下:

    // EventHandler is a handler of event.
    type EventHandler interface {
        // HandleEvent handles an event.
        HandleEvent(ctx context.Context, eventMessage EventMessage)
    }
    
    // EventHandlerFunc is a function that can be used as a event handler.
    type EventHandlerFunc func(ctx context.Context, eventMessage EventMessage)
    
    // HandleEvent implements the HandleEvent method of the EventHandler.
    func (h EventHandlerFunc) HandleEvent(ctx context.Context, eventMessage EventMessage) {
        h(ctx, eventMessage)
    }
    

    同CommandHandler一样,EventHandler的实现由业务决定。

    3、Event Store

    Event Store接口定义如下:

    // EventStore is an interface for an event sourcing event store.
    type EventStore interface {
        // Save appends all events in the event stream to the store.
        Save(ctx context.Context, events []eventhandling.EventMessage) error
    
        // Load loads all events for the aggregate id from the store.
        Load(ctx context.Context, id string) ([]eventhandling.EventMessage, error)
    }
    

    根据不同存储,我们可以提供Event Store的不同实现,比如存Memory,以及存Mysql。内存EventStore比较简单,这里介绍下Mysql的EventStore如何实现,直接看代码比较好:

    
    type MysqlStore struct {
        dbHost    string
        dbPort    int
        user      string
        password  string
        dataBase  string
        tableName string
    }
    
    // New returns a new MysqlStore
    func NewMysqlStore(dbHost string, dbPort int, user string, password string, dataBase string, tableName string) (*MysqlStore, error) {
        store := &MysqlStore{
            dbHost:    dbHost,
            dbPort:    dbPort,
            user:      user,
            password:  password,
            dataBase:  dataBase,
            tableName: tableName,
        }
        return store, nil
    }
    
    func (m *MysqlStore) expand(statement string) string {
        return strings.Replace(statement, "${TABLE}", m.tableName, -1)
    }
    
    func (m *MysqlStore) Save(ctx context.Context, events []eventhandling.EventMessage) error {
        if len(events) == 0 {
            return ErrNoEventsToAppend
        }
        var records []EventRecord
        for _, event := range events {
            record, _ := serializer.MarshalEvent(event)
            records = append(records, record)
        }
        return m.doSave(ctx, records)
    }
    
    func (m *MysqlStore) doSave(ctx context.Context, records []EventRecord) error {
        if len(records) == 0 {
            return ErrNoEventsToAppend
        }
        dbSourceName := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", m.user, m.password, m.dbHost, m.dbPort, m.dataBase)
        db, err := sql.Open("mysql", dbSourceName)
    
        if err != nil {
            return errors.New("mysql save failed, unable to connect to db")
        }
        defer db.Close()
    
        stmt, err := db.PrepareContext(ctx, m.expand(INSERT_SQL))
        if err != nil {
            return errors.New("mysql save failed, unable to prepare statement")
        }
        defer stmt.Close()
    
        for _, record := range records {
            _, err = stmt.Exec(record.AggregateId, record.Data, record.Version)
            if err != nil {
                return errors.New("mysql save failed, execute statement fail")
            }
        }
        return nil
    }
    
    func (m *MysqlStore) Load(ctx context.Context, aggregateId string) ([]eventhandling.EventMessage, error) {
        records, err := m.doLoad(ctx, aggregateId)
        if err != nil {
            return nil, err
        }
    
        var events []eventhandling.EventMessage
        for _, record := range records {
            event, err := serializer.UnmarshalEvent(record)
            if err != nil {
                return nil, err
            }
            events = append(events, event)
        }
        return events, nil
    }
    
    func (m *MysqlStore) doLoad(ctx context.Context, aggregateId string) (EventRecords, error) {
        dbSourceName := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", m.user, m.password, m.dbHost, m.dbPort, m.dataBase)
        db, err := sql.Open("mysql", dbSourceName)
        if err != nil {
            return nil, errors.New("mysql load failed,unable to connect to db")
        }
        defer db.Close()
    
        rows, err := db.Query(m.expand(SELECT_SQL), aggregateId)
        if err != nil {
            return nil, errors.New("mysql load failed, unable to query rows")
        }
    
        records := EventRecords{}
        for rows.Next() {
            record := EventRecord{}
            if err := rows.Scan(&record.AggregateId, &record.Data, &record.Version); err != nil {
                return nil, errors.New("mysql load failed, unable to parse row")
            }
            records = append(records, record)
        }
        return records, nil
    }
    

    4、Event Sourcing

    通过事件朔源然后得出聚合,这是这种模式的精华所在,相对也是比较的复杂的,我们需要做的是:

    • 从Event Store中加载出聚合的所有的事件;
    • 将所有事件执行一遍,得出最新的聚合状态;
    • 为了提升性能,避免每次都从头开始朔源,要提供快照,然后下载朔源,从快照出朔源即可。
      我们先实现一个简单的,不带快照版本的:
    // EventSourcingRepository provides the primary abstraction to saving and loading events
    type EventSourcingRepository struct {
        eventExecutor reflect.Type
        eventStore    EventStore
        serializer    Serializer
        dbMu          sync.RWMutex
    }
    
    func (r *EventSourcingRepository) Load(ctx context.Context, uid modelling.UinId) (interface{}, error) {
        events, err := r.eventStore.Load(ctx, uid.Identifier)
        if err != nil {
            return nil, errors.New("load event sourcing aggregate fail")
        }
        aggregate := r.New()
        for _, eventItem := range events {
            err = aggregate.ExecuteEvent(eventItem)
        }
        return aggregate, nil
    }
    
    func (r *EventSourcingRepository) Save(ctx context.Context, aggregate interface{}) error {
        r.dbMu.RLock()
        defer r.dbMu.RUnlock()
    
        var Events []eventhandling.EventMessage
        if v, ok := aggregate.(modelling.IEvents); ok {
            Events = v.AggregateEvents()
        }
        if len(Events) > 0 {
            err := r.SaveMapEvents(ctx, Events)
            if err != nil {
                return err
            }
        }
        return  nil
    }
    

    四、快速开始

    1、创建聚合

    type CustomAggregate struct {
          modelling.BaseAggregate
        
          // 定义聚合成员
    }
    

    2、发布订阅命令

    定义Command

    如下为一个测试命令domainCommand,注意我们实现了自定义的CommandType接口,这个接口可以提供一个有别于Command struct名的命令名字。

    type domainCommand struct {
        Name string `json:"name"`
        Age  int    `json:"age"`
    }
    
    // optional
    func (t domainCommand) CommandType() string {
        return "my type"
    }
    

    发布Command

    domainCommand为业务自定义的命令,commandGateway为默认的配置,发布命令如下:

    resp, err := ddd.CommandGateway().SendAndWait(c, domainCommand)
    

    发布命令的topic为command type,如果command未实现CommandType()接口,那么默认就为struct名,如果实现了,那么就采用实现的接口提供的Command Type。

    订阅Command

    首先需要topic,topic为command type,订阅者为一个自定义的CommandHandler。

    _ = CommandBus().Subscribe(command.CreateAccountCommand{}, handler.CreateAccountCommandHandler())
    

    框架会自动获取command type,其内部逻辑就是如果是实现了CommandType()接口,那么优先采用接口提供的command type,否则默认使用Command Struct名。

    其中CommandHanlder接口定义如下:

    HandleCommand(ctx context.Context, command interface{}) (resp interface{}, err error)
    

    自定义的CommandHandler,一般形式如下:

    func CustomCommandHandler() commandhandling.CommandHandlerFunc {
        return func(ctx context.Context, commandMessage interface{}) (result interface{}, err error) {
    
            // 1) get command data
            
            // 2) load aggregate
    
            // 3)  operate aggregate    
    
            // 4) save aggregate
    
            return resp, nil
        }
    }
    

    3、发布订阅事件

    定义事件

    如下为一个领域事件customEvent,注意我们实现了自定义的EventType接口,这个接口可以提供一个有别于Event struct名的Event Type。

    type customEvent struct {
        Name string
        Age  int
    }
    
    // optional
    func (e customEvent) EventType() string {
        return "my type"
    }
    

    Apply和Publish事件

    框架将发出领域事件,分为两步,首先聚合生成领域事件,但是生成后并不立马发送,发送时机有业务决定并显式调用一个publish方法,一般的发布时机为成功存储聚合后。

    聚合发布Apply事件如下:

    aggregate.ApplyEvent(context, customEvent)
    

    其中,context为上下文,customEvent为业务自定义的事件,发布事件的topic为event type,如果event未实现EventType()接口,那么默认就为struct名,如果实现了接口,那么就采用实现的接口提供的Event Type。

    发布聚合方式如下:

    aggregate.PublishEvent(ctx)
    

    其中ctx为上下文。

    subscribe事件

    首先仍然需要topic,topic为event type,,订阅者为一个自定义的EventHandler。

    _ = EventBus().Subscribe(customEvent{}, customEventHandler)
    

    框架自动获取event type,其内部逻辑为:如果实现了GetEventType()接口,那么优先采用接口提供的event type,否则默认使用Event Struct名。

    the event handler is like this:

    func CustomeEventHandler() eventhandling.EventHandlerFunc {
        return func(ctx context.Context, eventMessage eventhandling.EventMessage) { 
            // 1) get event data
            
            // 2) do your business  
        }
    }
    

    五、后记

    本框架是我们中心在实践领域驱动设计时,我开发的一个GO语言版的事件驱动框架供大家使用,基本功能都有了,也已在正式服务开发使用。

    但是这个框架其实还有很多不足,需要继续开发完善。代码后续开源,如果有感兴趣者一起开发,或许可以开发出一个axon-go也不一定哦!

    相关文章

      网友评论

        本文标题:如何撸一个GO版本的基于CQRS的事件驱动框架

        本文链接:https://www.haomeiwen.com/subject/khbmyhtx.html