美文网首页
聊聊eventhorizon的EventBus

聊聊eventhorizon的EventBus

作者: go4it | 来源:发表于2021-04-01 23:08 被阅读0次

    本文主要研究一下eventhorizon的EventBus

    EventBus

    eventhorizon/eventbus.go

    type EventBus interface {
        EventHandler
    
        // AddHandler adds a handler for an event. Returns an error if either the
        // matcher or handler is nil, the handler is already added or there was some
        // other problem adding the handler (for networked handlers for example).
        AddHandler(context.Context, EventMatcher, EventHandler) error
    
        // Errors returns an error channel where async handling errors are sent.
        Errors() <-chan EventBusError
    
        // Wait wait for all handlers to be cancelled by their context.
        Wait()
    }
    
    type EventHandler interface {
        // HandlerType is the type of the handler.
        HandlerType() EventHandlerType
    
        // HandleEvent handles an event.
        HandleEvent(context.Context, Event) error
    }
    
    type EventMatcher interface {
        // Match returns true if the matcher matches an event.
        Match(Event) bool
    }
    

    EventBus接口内嵌了EventHandler接口,定义了AddHandler、Errors、Wait方法

    EventBus

    eventhorizon/eventbus/local/eventbus.go

    type EventBus struct {
        group        *Group
        registered   map[eh.EventHandlerType]struct{}
        registeredMu sync.RWMutex
        errCh        chan eh.EventBusError
        wg           sync.WaitGroup
        codec        eh.EventCodec
    }
    
    // HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.
    func (b *EventBus) HandleEvent(ctx context.Context, event eh.Event) error {
        data, err := b.codec.MarshalEvent(ctx, event)
        if err != nil {
            return fmt.Errorf("could not marshal event: %w", err)
        }
    
        return b.group.publish(ctx, data)
    }
    

    EventBus定义了group、registered、registeredMu、errCh、wg、codec属性;HandleEvent方法先序列化event,然后通过group.publish发布event

    Group

    eventhorizon/eventbus/local/eventbus.go

    type Group struct {
        bus   map[string]chan []byte
        busMu sync.RWMutex
    }
    
    // NewGroup creates a Group.
    func NewGroup() *Group {
        return &Group{
            bus: map[string]chan []byte{},
        }
    }
    
    func (g *Group) publish(ctx context.Context, b []byte) error {
        g.busMu.RLock()
        defer g.busMu.RUnlock()
    
        for _, ch := range g.bus {
            // Marshal and unmarshal the context to both simulate only sending data
            // that would be sent over a network bus and also break any relationship
            // with the old context.
            select {
            case ch <- b:
            default:
                log.Printf("eventhorizon: publish queue full in local event bus")
            }
        }
    
        return nil
    }
    
    // Handles all events coming in on the channel.
    func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte) {
        defer b.wg.Done()
    
        for {
            select {
            case data := <-ch:
                // Artificial delay to simulate network.
                time.Sleep(10 * time.Millisecond)
    
                event, ctx, err := b.codec.UnmarshalEvent(ctx, data)
                if err != nil {
                    err = fmt.Errorf("could not unmarshal event: %w", err)
                    select {
                    case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}:
                    default:
                        log.Printf("eventhorizon: missed error in local event bus: %s", err)
                    }
                    return
                }
    
                // Ignore non-matching events.
                if !m.Match(event) {
                    continue
                }
    
                // Handle the event if it did match.
                if err := h.HandleEvent(ctx, event); err != nil {
                    err = fmt.Errorf("could not handle event (%s): %s", h.HandlerType(), err.Error())
                    select {
                    case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
                    default:
                        log.Printf("eventhorizon: missed error in local event bus: %s", err)
                    }
                }
            case <-ctx.Done():
                return
            }
        }
    }
    

    Group的publish方法遍历bus的channel,通过select写入event;handle方法循环select读取event,然后通过m.Match(event)判断是符合,是的话执行h.HandleEvent

    小结

    eventhorizon的EventBus接口内嵌了EventHandler接口,定义了AddHandler、Errors、Wait方法。

    doc

    相关文章

      网友评论

          本文标题:聊聊eventhorizon的EventBus

          本文链接:https://www.haomeiwen.com/subject/oknxkltx.html