美文网首页
Client-go客户端源码解析--EventRecorder

Client-go客户端源码解析--EventRecorder

作者: 队长100 | 来源:发表于2021-08-10 14:42 被阅读0次

    EventRecorder

    Kubernetes的事件是一种资源对像,用于展示集群内发生的情况,Kubernetes 中的各个组件都会将运行时的各种事件上报给Kubernetes API Server,并存储到Etcd集群中,为了避免磁盘空间被填满,对事件的保存强制执行保留策略:在最后一次事件发生后,删除1小时之前的事件。

    示例代码

        eventBroadcaster := record.NewBroadcaster()
        eventBroadcaster.StartLogging(klog.Infof)//注册事件消费者,将事件打印日志
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})//注册事件消费者,将事件记录到Kubernetes API Server,保存到Etcd中
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "myController"})
    ......
    //记录事件(事件生产者)
    recorder.Event(obj,corev1.EventTypeWarning,"test reason","test msg")
    

    eventRecorder机制

    eventRecorder机制
    1. EventRecorder: 事件生产者,各个组件(包括用户自定义的组件)会通过EventRecorder记录事件。

    2. EventBroadcaster: 事件消费者,也称为事件广播器。消费EventRecorder记录的事件并将事件分发给注册的所有的Watcher。分发过程有两种机制----阻塞和非阻塞两种分法机制。

    3. BroadcasterWatcher: Watcher管理器,用于定义事件的具体处理方式。

    EventRecorder

    //接口定义
    type EventRecorder interface {
        //对刚发生的事件进行记录
        Event(object runtime.Object, eventtype, reason, message string)
    
        // Eventf is just like Event, but with Sprintf for the message field.
      //格式化输出事件格式
        Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
    
        // PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
      //允许自定义事件发生的事件,以记录过去发生的事件
        PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})
    
        // AnnotatedEventf is just like eventf, but with annotations attached
      //同Eventf,附加了Annotations字段
        AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
    }
    ......
    ......
    ......
    //核心代码
    // NewRecorder returns an EventRecorder that records events with the given event source.
    func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
       return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}}
    }
    
    type recorderImpl struct {
       scheme *runtime.Scheme
       source v1.EventSource
       *watch.Broadcaster
       clock clock.Clock
    }
    
    func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
       ref, err := ref.GetReference(recorder.scheme, object)
       if err != nil {
          klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
          return
       }
    
       if !util.ValidateEventType(eventtype) {
          klog.Errorf("Unsupported event type: '%v'", eventtype)
          return
       }
    
       event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
       event.Source = recorder.source
    
       go func() {
          // NOTE: events should be a non-blocking operation
         //新建goroutine执行recorder.Action,以达到非阻塞效果(避免c.incoming channel阻塞,导致后续事件无法正常发送)
          defer utilruntime.HandleCrash()
          recorder.Action(watch.Added, event)
       }()
    }
    
    func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
       recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
    }
    
    
    ......
    //vendor/k8s.io/apimachinery/pkg/watch/mux.go
    // Action distributes the given event among all watchers.
    func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
        m.incoming <- Event{action, obj} //incoming 默认缓冲25条消息,后续的会阻塞
    }
    

    EventBroadcaster

    // Creates a new event broadcaster.
    func NewBroadcaster() EventBroadcaster {
       return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}
    }
    
    ///vendor/k8s.io/apimachinery/pkg/watch/mux.go
    func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
        m := &Broadcaster{
            watchers:            map[int64]*broadcasterWatcher{},
            incoming:            make(chan Event, incomingQueueLength),//25
            watchQueueLength:    queueLength,//1000
            fullChannelBehavior: fullChannelBehavior,//channel满则丢弃消息
        }
        m.distributing.Add(1)
        go m.loop() //新建goroutine,等待消息到来
        return m
    }
    ......
    // loop receives from m.incoming and distributes to all watchers.
    func (m *Broadcaster) loop() {
        // Deliberately not catching crashes here. Yes, bring down the process if there's a
        // bug in watch.Broadcaster.
        for event := range m.incoming {
            if event.Type == internalRunFunctionMarker { //添加watcher的时候会生成该类型事件,此事件不分发给watcher,只是内部处理
                event.Object.(functionFakeRuntimeObject)()//执行定义好的函数,生成并注册watcher
                continue
            }
            m.distribute(event)//分发消息到所有的watcher
        }
        m.closeAll()
        m.distributing.Done()
    }
    
    // distribute sends event to all watchers. Blocking.
    func (m *Broadcaster) distribute(event Event) {
        m.lock.Lock()
        defer m.lock.Unlock()
        if m.fullChannelBehavior == DropIfChannelFull { //若当前watcher的result channel缓冲已满,则丢弃后续消息
            for _, w := range m.watchers {
                select {
                case w.result <- event: //发送消息到watcher的result channel
                case <-w.stopped:
                default: // Don't block if the event can't be queued. 不阻塞,直接返回,丢弃后续的消息
                }
            }
        } else {
            for _, w := range m.watchers { //若当前watcher的result channel缓冲已满,则阻塞,直到watcher有从result内取出消息
                select {
                case w.result <- event:
                case <-w.stopped:
                }
            }
        }
    }
    

    BroacasterWatcher

    func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
       // The default math/rand package functions aren't thread safe, so create a
       // new Rand object for each StartRecording call.
       randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
       eventCorrelator := NewEventCorrelator(clock.RealClock{})
       return eventBroadcaster.StartEventWatcher(
          func(event *v1.Event) {
             recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
          })
    }
    ......
    func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
        return eventBroadcaster.StartEventWatcher(
            func(e *v1.Event) {
                logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
            })
    }
    
    ......
    func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
        watcher := eventBroadcaster.Watch() //生成watcher并注册
        //新建goroutine启动watcher,等待事件到来并处理
        go func() {
            defer utilruntime.HandleCrash()
            for watchEvent := range watcher.ResultChan() { //从watcher的result channel中读取事件
                event, ok := watchEvent.Object.(*v1.Event)
                if !ok {
                    // This is all local, so there's no reason this should
                    // ever happen.
                    continue
                }
                eventHandler(event)//调用定义好的事件处理函数
            }
        }()
        return watcher
    }
    
    ......
    //vendor/k8s.io/apimachinery/pkg/watch/mux.go
    //生成并注册watcher
    func (m *Broadcaster) Watch() Interface {
        var w *broadcasterWatcher
        m.blockQueue(func() {
            m.lock.Lock()
            defer m.lock.Unlock()
            id := m.nextWatcher
            m.nextWatcher++
        //生成watcher
            w = &broadcasterWatcher{
                result:  make(chan Event, m.watchQueueLength),
                stopped: make(chan struct{}),
                id:      id,
                m:       m,
            }
        //注册watcher到map中,后续分发事件的时候要用
            m.watchers[id] = w
        })
        return w
    }
    
    //生成并注册watcher的时候,阻塞incoming channel.
    //这样做的目的是,确保watcher在某个事件之后被添加,看不到该事件。但是能够看到watcher被添加之后的所有事件。
    func (b *Broadcaster) blockQueue(f func()) {
        var wg sync.WaitGroup
        wg.Add(1)
        b.incoming <- Event{
            Type: internalRunFunctionMarker,
            Object: functionFakeRuntimeObject(func() {
                defer wg.Done()
                f()
            }),
        }
        wg.Wait()
    }
    

    相关文章

      网友评论

          本文标题:Client-go客户端源码解析--EventRecorder

          本文链接:https://www.haomeiwen.com/subject/rznxbltx.html