CQRS架构和事件驱动思想用的越来越广泛,Java开发的同学在实践CQRS架构时,应该基本都用过axon-framework;而GO语言,则没有这么一个框架,本文就一起探索下,如何撸一个类似axon的go版本的框架。
一、背景介绍
1、基于CQRS和事件驱动的架构
如下为基于CQRS的事件驱动架构:
CQRS框架
在这个架构图中,最核心的概念是Command、Event。
命令(Command)通常由比较简单的对象来表示,表明要做什么操作;当Command Bus(命令总线)收到command的时候会把他路由到对应的Command Handlers。每个Command Handler只会接收他指定类型的command并根据command里的内容来执行相应的业务逻辑。
Command Handler从repository获取到领域对象(Aggregates,聚合),然后在领域对象执行对应的方法(method)从而来改变它们的状态。
聚合的状态变化会导致产生领域事件。事件总线(Event bus)会将事件分发给所有感兴趣的事件监听者。他可以是同步方式也可以是异步方式。异步方式允许事件发送出去后就直接返回并将控制权交给用户,而此时的后台应用程序也会对事件进行处理。这样好处就是用户无需等待事件处理完成后才能做其他的事,所以应用程序的响应速度就会更快。另一方面,同步事件处理更简单且符合常理。默认情况下,同步处理方式会将command的处理和event的处理放进同一个事务中。
Event listeners接收事件并处理它们。一些hanndler的操作是更新数据源的数据,而另一些是将消息发送到其他外部系统。您可能会注意到,command handlers完全不知道接下来会发生什么事。这意味着对扩展应用程序新功能是非入侵式的。所以你需要做的是添加另一个事件监听器(event listener)。事件将应用程序中的所有组件很松散地耦合在一起。
仓储(repository)负责提供对聚合根的访问。通常情况下,这些仓储库只会针对通过其唯一标识符来查找聚合进行优化。一种方式是,仓储存储的是聚合本身的状态(例如使用对象关系映射),而另一种是,仓储通过Event Store存储聚合在各种行为中产生的真实事件。仓储还负责持久化聚合产生的事件。
Axon提供了对直接持久化聚合(例如使用对象关系映射方式)和事件溯源 (event sourcing)的支持。
在某些情况下,事件处理需要将新命令发送给应用程序。例如,当我们收到一个订单,这可能意味着客户的账户应该扣掉买商品的钱,并且必须告知运送方准备运送所购买的商品。在许多应用中,逻辑将变得比这更复杂:如果客户没有及时付款呢?您会立即寄出货物,还是先等待付款?saga是CQRS的概念,它负责管理这些复杂的业务事务。
命令查询职责分离(CQRS)是构建软件系统的一种方式,它的思想是将对状态的查询部分与改变状态的部分进行分离。 Axon 框架是一个基于 Java 实现的 CQRS 框架,提供了对大多数重要构建块的实现,例如聚合、命令与事件总线、以及 repository,以帮助开发者在构建应用程序时使用 CQRS 架构模式。
2、解决哪些问题
CQRS 中的命令这方面最重要的部分包括:
- 命令(Command),它负责捕获用户的意图,即接下来应该发生什么事。在 Axon 中,命令被实现为 POJO 对象,因而无需实现任何接口;
- 命令处理器(Command Handler),它负责执行所发送的命令。在 Axon 中,可以选择通过实现某个接口的方式创建它,也可以通过注解符实现;
- 命令总线(Command Bus),负责将命令传递给对应的命令处理器。Axon 总共提供了四种实现方式,Aderemi 选择了一种简单的同步总线用于传递命令。另一种实现方式是通过异步总线以异步的方式处理命令。
事件最重要的部分包括:
- 领域事件(Domain Event),它表示发生于过去的某事,由领域中的状态变化、命令及命令处理器中所初始化的变化所创建;
- 事件总线(Event Bus),它负责将事件传递给查询方。Axon 中提供了多种实现方式,Aderemi 选择了一种较简单的实现;
- 事件处理器(Event Handler),它负责侦听事件,通过事件中所包含的信息,在查询方反应出应用程序的状态。在 Axon 中,可通过注解符定义事件处理器;
- 事件存储,负责持久化事件。
二、命令接口设计和实现
1、Command gateway
Command gateway我们定义三个方法,分别为:1)单发送命令不关注结果;2)发送命令并且等待结果;3)发送命令并且等待结果,但是有超时控制。接口类定义如下:
type CommandGateway interface {
// Sends the given command
Send(ctx context.Context, Command interface{}) error
// Sends the given command and wait for it to execute. The result of the execution is returned when available.
// This method will block indefinitely ,until a result is available,or until the Thread is interrupted. When the
// thread is interrupted, this method returns nil and the detail error.
SendAndWait(ctx context.Context, Command interface{}) (result interface{}, err error)
// Sends the given command and wait for it to execute. The result of the execution is returned when available.
// This method will block until a result is available, or the given {@code timeout} was reached,or until the
// thread is interrupted. When the timeout is reached or the thread is interrupted,this method returns nil
// and the detail error.
SendAndWaitWithTimeout(ctx context.Context, Command interface{}, timeout int64) (result interface{}, err error)
}
Command Gateway的简单实现如下:
type SimpleCommandGateway struct {
CommandBus commandhandling2.CommandBus
}
func (a *SimpleCommandGateway) Send(ctx context.Context, command interface{}) error {
commandMessage := commandhandling2.NewMiddleCommandMessage(command)
a.CommandBus.Dispatch(ctx, commandMessage)
return nil
}
func (a *SimpleCommandGateway) SendAndWait(ctx context.Context, command interface{}) (result interface{}, err error) {
commandMessage := commandhandling2.NewMiddleCommandMessage(command)
resp, err := a.CommandBus.DispatchWithReturn(ctx, commandMessage)
return resp, err
}
func (a *SimpleCommandGateway) SendAndWaitWithTimeout(ctx context.Context, command interface{},
timeout int64) (result interface{}, err error) {
commandMessage := commandhandling2.NewMiddleCommandMessage(command)
resp, err := a.CommandBus.DispatchWithReturnAndTimeout(ctx, commandMessage, timeout)
return resp, err
}
Command Gateway是对外提供的接口类,内部是对于命令的实际路由工作是交给Command bus的,下面看看Command Bus实现。
2、Command Bus
CommandBus是命令总线,提供发布订阅命令功能,接口方法设计如下:
type CommandBus interface {
// Dispatch the given {@code CommandMessage} to the CommandHandler subscribed to the given {@code command}'s commandType.
// No feedback is given about the status of the dispatching process. Implementations may return immediately after
// asserting a valid handler is registered for the given command.
Dispatch(ctx context.Context, command CommandMessage) error
// Dispatch the given {@code CommandMessage} to the CommandHandler subscribed to the given {@code CommandMessage}'s commandType.
// Waiting the result of the processing and return
DispatchWithReturn(ctx context.Context, command CommandMessage) (interface{}, error)
// Dispatch the given {@code CommandMessage} to the CommandHandler subscribed to the given {@code CommandMessage}'s commandType.
// Waiting the result of the processing and return or time out
DispatchWithReturnAndTimeout(ctx context.Context, command CommandMessage,timeout int64) (interface{}, error)
// Subscribe the given {@code CommandHandler} to command {@code command}.
Subscribe(command interface{},handler CommandHandler) error
}
Command Bus我们应该提供同步和异步两种模式,然后根据实际情况,通过配置决定到底使用同步还是异步模式。
同步的Command Bus
// Implementation of the CommandBus that dispatches commands to the handlers subscribed to that specific command's commandType.
type SimpleCommandBus struct {
handlers map[string]CommandHandler
handlersMu sync.RWMutex
}
func (bus *SimpleCommandBus) Dispatch(ctx context.Context, commandMessage CommandMessage) error {
_, err := bus.doDispatch(ctx, commandMessage)
return err
}
func (bus *SimpleCommandBus) DispatchWithReturn(ctx context.Context, commandMessage CommandMessage) (interface{}, error) {
resp, err := bus.doDispatch(ctx, commandMessage)
return resp, err
}
func (bus *SimpleCommandBus) DispatchWithReturnAndTimeout(ctx context.Context, commandMessage CommandMessage, timeout int64) (interface{}, error) {
return bus.DispatchWithReturn(ctx, commandMessage)
}
// Subscribe the given {@code handler} to commands with given {@code command}.
func (bus *SimpleCommandBus) Subscribe(command interface{}, handler CommandHandler) error {
var commandName string
if reflect.TypeOf(command).Kind() == reflect.Struct {
commandName = GetCommandType(command)
} else {
commandName = command.(string)
}
return bus.SubscribeWithTopic(commandName, handler)
}
对于同步模式,DispatchWithReturnAndTimeout接口是不存在什么超不超时的,有结果了就立即返回。
异步的Command Bus实现
同步的Command Bus好实现,那么异步的该如何实现了?超时又怎么控制了?
我们可以用go中的协程来实现异步,然后返回就用chan来实现。定义异步Command Bus类AsynchronousCommandBus如下:
// Implementation of the CommandBus that processed Commands asynchronously in a new goroutines
type AsynchronousCommandBus struct {
SimpleCommandBus
resp chan response
}
type response struct {
data interface{}
err error
}
dispatch command如下:
func (a *AsynchronousCommandBus) dispatch(ctx context.Context, command CommandMessage, handler CommandHandler) (interface{}, error) {
// execute handle command with a new goroutines
go func() {
resp, err := handler.HandleCommand(ctx, command.Payload())
ret := response{
data: resp,
err: err,
}
a.resp <- ret
}()
out := <-a.resp
return out.data, out.err
}
那么超时如何控制了,可以这么控制:
select {
case out := <-a.resp:
return out.data, out.err
case <-time.After(time.Duration(timeout) * time.Millisecond):
return "command handle time out", errors.New("handle time out")
}
超时dispatch完整实现如下:
func (a *AsynchronousCommandBus) dispatchWithTimeout(ctx context.Context, command CommandMessage, handler CommandHandler,
timeout int64) (interface{}, error) {
// execute handle command with a new goroutines
go func() {
resp, err := handler.HandleCommand(ctx, command.Payload())
ret := response{
data: resp,
err: err,
}
a.resp <- ret
}()
select {
case out := <-a.resp:
return out.data, out.err
case <-time.After(time.Duration(timeout) * time.Millisecond):
return "command handle time out", errors.New("handle time out")
}
}
3、Command Handler
// CommandHandler specialization for handlers of command.
type CommandHandler interface {
// Handles the given {@code command}.
HandleCommand(ctx context.Context, command interface{}) (resp interface{}, err error)
}
// CommandHandlerFunc is a function that can be used as a command handler.
type CommandHandlerFunc func(ctx context.Context, command interface{}) (resp interface{}, err error)
// HandleEvent implements the HandleEvent method of the CommandHandler.
func (handlerFunc CommandHandlerFunc) HandleCommand(ctx context.Context, command interface{}) (resp interface{}, err error) {
return handlerFunc(ctx, command)
}
Command Handler框架提供接口定义,具体实现由用本框架的具体应用程序提供。
三、事件接口设计和实现
1、Event Bus
Event Bus接口,定义了发布和订阅两个接口方法,如下:
type EventBus interface {
// Publishes the event on the bus.
Publish(ctx context.Context, eventMessage EventMessage) error
// Subscribe the event on the bus
Subscribe(event interface{}, handler EventHandlerFunc) error
}
对事件感兴趣者,可能是本系统内的,也可能是其他系统的,因此Event Bus需要提供支持本地的和外部的实现。本地的较简单,我们可以用go的chan实现,但是外部的则相对复杂一些,我们需要用Kafka或者其他消息中间件实现。然后根据实际情况,选择是使用仅支持本地的Event Bus,还是选择使用支持外部的Event Bus。这里我们介绍一下本地Event Bus。
// EventBus is a local event bus that delegates handling of published events
// to all matching registered handlers, in order of registration.
type LocalEventBus struct {
messageBus messaging2.MessageBus
}
// NewEventBus creates a EventBus.
func NewLocalEventBus() EventBus {
return &LocalEventBus{
messageBus: messaging2.New(runtime.NumCPU()),
}
}
// Publish implements the method of the bus.EventBus interface.
func (bus *LocalEventBus) Publish(ctx context.Context, eventMessage EventMessage) error {
bus.messageBus.Publish(eventMessage.EventType(), ctx, eventMessage)
return nil
}
// Subscribe implements the method of the bus.EventBus interface.
func (bus *LocalEventBus) Subscribe(event interface{}, handler EventHandlerFunc) error {
var eventName string
if reflect.TypeOf(event).Kind() == reflect.Struct {
eventName = GetEventType(event)
} else {
eventName = event.(string)
}
return bus.messageBus.Subscribe(eventName, handler)
}
其中核心的工作,交给了我们定义的MessageBus,看看怎么实现的:
// MessageBus implements publish/subscribe messaging paradigm
type MessageBus interface {
// Publish publishes arguments to the given topic subscribers
// Publish block only when the buffer of one of the subscribers is full.
Publish(topic string, args ...interface{})
// Close unsubscribe all handlers from given topic
Close(topic string)
// Subscribe subscribes to the given topic
Subscribe(topic string, fn interface{}) error
// Unsubscribe unsubscribe handler from the given topic
Unsubscribe(topic string, fn interface{}) error
}
type handlersMap map[string][]*handler
type handler struct {
callback reflect.Value
queue chan []reflect.Value
}
type messageBus struct {
handlerQueueSize int
mtx sync.RWMutex
handlers handlersMap
}
func (b *messageBus) Publish(topic string, args ...interface{}) {
rArgs := buildHandlerArgs(args)
b.mtx.RLock()
defer b.mtx.RUnlock()
if hs, ok := b.handlers[topic]; ok {
for _, h := range hs {
h.queue <- rArgs
}
}
}
func (b *messageBus) Subscribe(topic string, fn interface{}) error {
if reflect.TypeOf(fn).Kind() != reflect.Func {
return fmt.Errorf("%s is not a reflect.Func", reflect.TypeOf(fn))
}
h := &handler{
callback: reflect.ValueOf(fn),
queue: make(chan []reflect.Value, b.handlerQueueSize),
}
go func() {
for args := range h.queue {
h.callback.Call(args)
}
}()
b.mtx.Lock()
defer b.mtx.Unlock()
b.handlers[topic] = append(b.handlers[topic], h)
return nil
}
其中定义了一个handlersMap,保存消息的topic和handler 的map对,其中handler的定义如下:
type handler struct {
callback reflect.Value
queue chan []reflect.Value
}
handler中保存了方法的反射值,以及方法的参数,这样我们即可待用 callback.call(args)即可调用处理方法。
其实在CommandBus的处理上,我们也完全可以用这个封装的MessageBus,两种实现吧,反正没事瞎折腾着玩呗,哈哈!
2、Event Handler
Event Handler接口的定义如下:
// EventHandler is a handler of event.
type EventHandler interface {
// HandleEvent handles an event.
HandleEvent(ctx context.Context, eventMessage EventMessage)
}
// EventHandlerFunc is a function that can be used as a event handler.
type EventHandlerFunc func(ctx context.Context, eventMessage EventMessage)
// HandleEvent implements the HandleEvent method of the EventHandler.
func (h EventHandlerFunc) HandleEvent(ctx context.Context, eventMessage EventMessage) {
h(ctx, eventMessage)
}
同CommandHandler一样,EventHandler的实现由业务决定。
3、Event Store
Event Store接口定义如下:
// EventStore is an interface for an event sourcing event store.
type EventStore interface {
// Save appends all events in the event stream to the store.
Save(ctx context.Context, events []eventhandling.EventMessage) error
// Load loads all events for the aggregate id from the store.
Load(ctx context.Context, id string) ([]eventhandling.EventMessage, error)
}
根据不同存储,我们可以提供Event Store的不同实现,比如存Memory,以及存Mysql。内存EventStore比较简单,这里介绍下Mysql的EventStore如何实现,直接看代码比较好:
type MysqlStore struct {
dbHost string
dbPort int
user string
password string
dataBase string
tableName string
}
// New returns a new MysqlStore
func NewMysqlStore(dbHost string, dbPort int, user string, password string, dataBase string, tableName string) (*MysqlStore, error) {
store := &MysqlStore{
dbHost: dbHost,
dbPort: dbPort,
user: user,
password: password,
dataBase: dataBase,
tableName: tableName,
}
return store, nil
}
func (m *MysqlStore) expand(statement string) string {
return strings.Replace(statement, "${TABLE}", m.tableName, -1)
}
func (m *MysqlStore) Save(ctx context.Context, events []eventhandling.EventMessage) error {
if len(events) == 0 {
return ErrNoEventsToAppend
}
var records []EventRecord
for _, event := range events {
record, _ := serializer.MarshalEvent(event)
records = append(records, record)
}
return m.doSave(ctx, records)
}
func (m *MysqlStore) doSave(ctx context.Context, records []EventRecord) error {
if len(records) == 0 {
return ErrNoEventsToAppend
}
dbSourceName := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", m.user, m.password, m.dbHost, m.dbPort, m.dataBase)
db, err := sql.Open("mysql", dbSourceName)
if err != nil {
return errors.New("mysql save failed, unable to connect to db")
}
defer db.Close()
stmt, err := db.PrepareContext(ctx, m.expand(INSERT_SQL))
if err != nil {
return errors.New("mysql save failed, unable to prepare statement")
}
defer stmt.Close()
for _, record := range records {
_, err = stmt.Exec(record.AggregateId, record.Data, record.Version)
if err != nil {
return errors.New("mysql save failed, execute statement fail")
}
}
return nil
}
func (m *MysqlStore) Load(ctx context.Context, aggregateId string) ([]eventhandling.EventMessage, error) {
records, err := m.doLoad(ctx, aggregateId)
if err != nil {
return nil, err
}
var events []eventhandling.EventMessage
for _, record := range records {
event, err := serializer.UnmarshalEvent(record)
if err != nil {
return nil, err
}
events = append(events, event)
}
return events, nil
}
func (m *MysqlStore) doLoad(ctx context.Context, aggregateId string) (EventRecords, error) {
dbSourceName := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", m.user, m.password, m.dbHost, m.dbPort, m.dataBase)
db, err := sql.Open("mysql", dbSourceName)
if err != nil {
return nil, errors.New("mysql load failed,unable to connect to db")
}
defer db.Close()
rows, err := db.Query(m.expand(SELECT_SQL), aggregateId)
if err != nil {
return nil, errors.New("mysql load failed, unable to query rows")
}
records := EventRecords{}
for rows.Next() {
record := EventRecord{}
if err := rows.Scan(&record.AggregateId, &record.Data, &record.Version); err != nil {
return nil, errors.New("mysql load failed, unable to parse row")
}
records = append(records, record)
}
return records, nil
}
4、Event Sourcing
通过事件朔源然后得出聚合,这是这种模式的精华所在,相对也是比较的复杂的,我们需要做的是:
- 从Event Store中加载出聚合的所有的事件;
- 将所有事件执行一遍,得出最新的聚合状态;
- 为了提升性能,避免每次都从头开始朔源,要提供快照,然后下载朔源,从快照出朔源即可。
我们先实现一个简单的,不带快照版本的:
// EventSourcingRepository provides the primary abstraction to saving and loading events
type EventSourcingRepository struct {
eventExecutor reflect.Type
eventStore EventStore
serializer Serializer
dbMu sync.RWMutex
}
func (r *EventSourcingRepository) Load(ctx context.Context, uid modelling.UinId) (interface{}, error) {
events, err := r.eventStore.Load(ctx, uid.Identifier)
if err != nil {
return nil, errors.New("load event sourcing aggregate fail")
}
aggregate := r.New()
for _, eventItem := range events {
err = aggregate.ExecuteEvent(eventItem)
}
return aggregate, nil
}
func (r *EventSourcingRepository) Save(ctx context.Context, aggregate interface{}) error {
r.dbMu.RLock()
defer r.dbMu.RUnlock()
var Events []eventhandling.EventMessage
if v, ok := aggregate.(modelling.IEvents); ok {
Events = v.AggregateEvents()
}
if len(Events) > 0 {
err := r.SaveMapEvents(ctx, Events)
if err != nil {
return err
}
}
return nil
}
四、快速开始
1、创建聚合
type CustomAggregate struct {
modelling.BaseAggregate
// 定义聚合成员
}
2、发布订阅命令
定义Command
如下为一个测试命令domainCommand,注意我们实现了自定义的CommandType接口,这个接口可以提供一个有别于Command struct名的命令名字。
type domainCommand struct {
Name string `json:"name"`
Age int `json:"age"`
}
// optional
func (t domainCommand) CommandType() string {
return "my type"
}
发布Command
domainCommand为业务自定义的命令,commandGateway为默认的配置,发布命令如下:
resp, err := ddd.CommandGateway().SendAndWait(c, domainCommand)
发布命令的topic为command type,如果command未实现CommandType()接口,那么默认就为struct名,如果实现了,那么就采用实现的接口提供的Command Type。
订阅Command
首先需要topic,topic为command type,订阅者为一个自定义的CommandHandler。
_ = CommandBus().Subscribe(command.CreateAccountCommand{}, handler.CreateAccountCommandHandler())
框架会自动获取command type,其内部逻辑就是如果是实现了CommandType()接口,那么优先采用接口提供的command type,否则默认使用Command Struct名。
其中CommandHanlder接口定义如下:
HandleCommand(ctx context.Context, command interface{}) (resp interface{}, err error)
自定义的CommandHandler,一般形式如下:
func CustomCommandHandler() commandhandling.CommandHandlerFunc {
return func(ctx context.Context, commandMessage interface{}) (result interface{}, err error) {
// 1) get command data
// 2) load aggregate
// 3) operate aggregate
// 4) save aggregate
return resp, nil
}
}
3、发布订阅事件
定义事件
如下为一个领域事件customEvent,注意我们实现了自定义的EventType接口,这个接口可以提供一个有别于Event struct名的Event Type。
type customEvent struct {
Name string
Age int
}
// optional
func (e customEvent) EventType() string {
return "my type"
}
Apply和Publish事件
框架将发出领域事件,分为两步,首先聚合生成领域事件,但是生成后并不立马发送,发送时机有业务决定并显式调用一个publish方法,一般的发布时机为成功存储聚合后。
聚合发布Apply事件如下:
aggregate.ApplyEvent(context, customEvent)
其中,context为上下文,customEvent为业务自定义的事件,发布事件的topic为event type,如果event未实现EventType()接口,那么默认就为struct名,如果实现了接口,那么就采用实现的接口提供的Event Type。
发布聚合方式如下:
aggregate.PublishEvent(ctx)
其中ctx为上下文。
subscribe事件
首先仍然需要topic,topic为event type,,订阅者为一个自定义的EventHandler。
_ = EventBus().Subscribe(customEvent{}, customEventHandler)
框架自动获取event type,其内部逻辑为:如果实现了GetEventType()接口,那么优先采用接口提供的event type,否则默认使用Event Struct名。
the event handler is like this:
func CustomeEventHandler() eventhandling.EventHandlerFunc {
return func(ctx context.Context, eventMessage eventhandling.EventMessage) {
// 1) get event data
// 2) do your business
}
}
五、后记
本框架是我们中心在实践领域驱动设计时,我开发的一个GO语言版的事件驱动框架供大家使用,基本功能都有了,也已在正式服务开发使用。
但是这个框架其实还有很多不足,需要继续开发完善。代码后续开源,如果有感兴趣者一起开发,或许可以开发出一个axon-go也不一定哦!
网友评论