序
本文主要研究一下eventhorizon的EventBus
EventBus
eventhorizon/eventbus.go
type EventBus interface {
EventHandler
// AddHandler adds a handler for an event. Returns an error if either the
// matcher or handler is nil, the handler is already added or there was some
// other problem adding the handler (for networked handlers for example).
AddHandler(context.Context, EventMatcher, EventHandler) error
// Errors returns an error channel where async handling errors are sent.
Errors() <-chan EventBusError
// Wait wait for all handlers to be cancelled by their context.
Wait()
}
type EventHandler interface {
// HandlerType is the type of the handler.
HandlerType() EventHandlerType
// HandleEvent handles an event.
HandleEvent(context.Context, Event) error
}
type EventMatcher interface {
// Match returns true if the matcher matches an event.
Match(Event) bool
}
EventBus接口内嵌了EventHandler接口,定义了AddHandler、Errors、Wait方法
EventBus
eventhorizon/eventbus/local/eventbus.go
type EventBus struct {
group *Group
registered map[eh.EventHandlerType]struct{}
registeredMu sync.RWMutex
errCh chan eh.EventBusError
wg sync.WaitGroup
codec eh.EventCodec
}
// HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.
func (b *EventBus) HandleEvent(ctx context.Context, event eh.Event) error {
data, err := b.codec.MarshalEvent(ctx, event)
if err != nil {
return fmt.Errorf("could not marshal event: %w", err)
}
return b.group.publish(ctx, data)
}
EventBus定义了group、registered、registeredMu、errCh、wg、codec属性;HandleEvent方法先序列化event,然后通过group.publish发布event
Group
eventhorizon/eventbus/local/eventbus.go
type Group struct {
bus map[string]chan []byte
busMu sync.RWMutex
}
// NewGroup creates a Group.
func NewGroup() *Group {
return &Group{
bus: map[string]chan []byte{},
}
}
func (g *Group) publish(ctx context.Context, b []byte) error {
g.busMu.RLock()
defer g.busMu.RUnlock()
for _, ch := range g.bus {
// Marshal and unmarshal the context to both simulate only sending data
// that would be sent over a network bus and also break any relationship
// with the old context.
select {
case ch <- b:
default:
log.Printf("eventhorizon: publish queue full in local event bus")
}
}
return nil
}
// Handles all events coming in on the channel.
func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte) {
defer b.wg.Done()
for {
select {
case data := <-ch:
// Artificial delay to simulate network.
time.Sleep(10 * time.Millisecond)
event, ctx, err := b.codec.UnmarshalEvent(ctx, data)
if err != nil {
err = fmt.Errorf("could not unmarshal event: %w", err)
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}:
default:
log.Printf("eventhorizon: missed error in local event bus: %s", err)
}
return
}
// Ignore non-matching events.
if !m.Match(event) {
continue
}
// Handle the event if it did match.
if err := h.HandleEvent(ctx, event); err != nil {
err = fmt.Errorf("could not handle event (%s): %s", h.HandlerType(), err.Error())
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
default:
log.Printf("eventhorizon: missed error in local event bus: %s", err)
}
}
case <-ctx.Done():
return
}
}
}
Group的publish方法遍历bus的channel,通过select写入event;handle方法循环select读取event,然后通过m.Match(event)判断是符合,是的话执行h.HandleEvent
小结
eventhorizon的EventBus接口内嵌了EventHandler接口,定义了AddHandler、Errors、Wait方法。
网友评论