[k8s源码分析][client-go] informer之re

作者: nicktming | 来源:发表于2019-10-20 18:00 被阅读0次

1. 前言

转载请说明原文出处, 尊重他人劳动成果!

源码位置: https://github.com/nicktming/client-go/tree/tming-v13.0/tools/cache
分支: tming-v13.0 (基于v13.0版本)

[k8s源码分析][client-go] informer之store和index[k8s源码分析][client-go] informer之store和index 的基础上进行分析, 因为在informer体系中reflector属于一个反射器, 上面对接从k8s api获得信息的ListWatcher, 下面对接DeltaFIFO, 也就是把k8s api获得的信息通过reflector存储到DeltaFIFO中.

2. 类

type Reflector struct {
    // 名字
    name string
    metrics *reflectorMetrics
    // 该reflector接收的类型
    expectedType reflect.Type
    // 要存的地方 会是DeltaFIFO
    store Store
    // 与api-server打交道的listh和watcher
    listerWatcher ListerWatcher
    period       time.Duration
    resyncPeriod time.Duration
    ShouldResync func() bool
    clock clock.Clock
    // 最后一次sync的resourceversion
    lastSyncResourceVersion string
    // 用于resourceversion的锁
    lastSyncResourceVersionMutex sync.RWMutex
    // WatchListPageSize is the requested chunk size of initial and resync watch lists.
    WatchListPageSize int64
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    r := &Reflector{
        name:          name,
        listerWatcher: lw,
        store:         store,
        expectedType:  reflect.TypeOf(expectedType),
        period:        time.Second,
        resyncPeriod:  resyncPeriod,
        clock:         &clock.RealClock{},
    return r
var internalPackages = []string{"client-go/tools/cache/"}

可以看到Reflector中有一个listerWatcher ListerWatcher, 该对象是从api-server中获得元素和监控. 也有一个store Store对象, 这个在informers体系中是DeltaFIFO的一个对象.

3. 方法

直接从Run方法, 从Reflector的功能看, 它也得是个循环操作, 需要一直从api-server中的数据接到DeltaFIFO中.


func (r *Reflector) Run(stopCh <-chan struct{}) {
    klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
    wait.Until(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
    }, r.period, stopCh)

wait.Until方法就是每隔r.period时间去执行一下Until里面的方法, 当然下一次的执行要等到上一次执行完才会开始, 当stopCh中有元素进入或关闭时整个wait.Until才会退出.

所以可以简单理解为一直在执行ListAndWatch方法, 除非发信息给stopCh通知关闭. 整个Run才会结束.


在这里需要说明k8s的并发操作是通过ResourceVersion来实现的, 在api-server该对象有一次改动, ResourceVersion就会加1.

func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
    found := make([]interface{}, 0, len(items))
    for _, item := range items {
        found = append(found, item)
    return r.store.Replace(found, resourceVersion)
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
    var resourceVersion string
    // ResourceVersion从0开始 可以获得该对象在api-server全部操作的情况
    options := metav1.ListOptions{ResourceVersion: "0"}

    if err := func() error {
        initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
        defer initTrace.LogIfLong(10 * time.Second)
        var list runtime.Object
        var err error
        listCh := make(chan struct{}, 1)
        panicCh := make(chan interface{}, 1)
        go func() {
            defer func() {
                if r := recover(); r != nil {
                    panicCh <- r
            // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
            // list request will return the full response.
            pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
                return r.listerWatcher.List(opts)
            if r.WatchListPageSize != 0 {
                pager.PageSize = r.WatchListPageSize
            // Pager falls back to full list if paginated list calls fail due to an "Expired" error.
            list, err = pager.List(context.Background(), options)
        // 等待获得上面的list
        select {
        case <-stopCh:
            return nil
        case r := <-panicCh:
        case <-listCh:
        if err != nil {
            return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
        initTrace.Step("Objects listed")
        listMetaInterface, err := meta.ListAccessor(list)
        if err != nil {
            return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
        resourceVersion = listMetaInterface.GetResourceVersion()
        initTrace.Step("Resource version extracted")
        items, err := meta.ExtractList(list)
        if err != nil {
            return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
        initTrace.Step("Objects extracted")
        // 调用replace函数替换deltaFIFO中的元素
        if err := r.syncWith(items, resourceVersion); err != nil {
            return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
        initTrace.Step("SyncWith done")
        initTrace.Step("Resource version updated")
        return nil
    }(); err != nil {
        return err

    // 这里主要是启动一个异步goroutine
    // 每隔r.resyncPeriod时间调用DeltaFIFO的Resync
    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    go func() {
        resyncCh, cleanup := r.resyncChan()
        defer func() {
            cleanup() // Call the last one written into cleanup
        for {
            select {
            case <-resyncCh:
            case <-stopCh:
            case <-cancelCh:
            if r.ShouldResync == nil || r.ShouldResync() {
                klog.V(4).Infof("%s: forcing resync", r.name)
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
            resyncCh, cleanup = r.resyncChan()

    // 根据当前的ResourceVersion生成一个watch
    // 监控该ResourceVersion后面的一系列变化 然后对应加入到DeltaFIFO中
    for {
        // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
        select {
        case <-stopCh:
            return nil

        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,

        // 从当前resourceVersion后面开始监控
        w, err := r.listerWatcher.Watch(options)
        // 调用watchHandler
        if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                switch {
                case apierrs.IsResourceExpired(err):
                    klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
                    klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
            return nil

1. 获得所有对象list并调用DeltaFIFO.Replace(syncWith)list替代之前的元素. 关于Replace方法在 [k8s源码分析][client-go] informer之delta_fifo 已经有详细分析. 并获得了最新的ResourceVersion.
2. 启动一个异步goroutine每隔r.resyncPeriod时间调用DeltaFIFOResync. 在 [k8s源码分析][client-go] informer之delta_fifo 已经有分析.
3. 根据当前最新的ResourceVersion生成一个watch, 开始一直监控后面的变化.


func (r *Reflector) LastSyncResourceVersion() string {
    defer r.lastSyncResourceVersionMutex.RUnlock()
    return r.lastSyncResourceVersion

func (r *Reflector) setLastSyncResourceVersion(v string) {
    defer r.lastSyncResourceVersionMutex.Unlock()
    r.lastSyncResourceVersion = v
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    start := r.clock.Now()
    eventCount := 0

    // Stopping the watcher should be idempotent and if we return from this function there's no way
    // we're coming back in with the same watch interface.
    defer w.Stop()

    for {
        select {
        case <-stopCh:
            // stopCh被close了 退出watchHandler方法
            return errorStopRequested
        case err := <-errc:
            // DeltaFIFO的Resync出现错误,退出watchHandler方法
            return err
        case event, ok := <-w.ResultChan():
            if !ok {
                // watch这个channel已经被关闭 跳出loop
                break loop
            if event.Type == watch.Error {
                // 退出watchHandler方法
                return apierrs.FromObject(event.Object)
            // 得到的对象类型与该reflector监控的类型不一致
            // 比如该reflector负责的是pod对象 来了一个Service对象
            if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
                utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            // 获得新的ResourceVersion
            newResourceVersion := meta.GetResourceVersion()
            switch event.Type {
            case watch.Added:
                // 往Delta添加一个对象
                err := r.store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
            case watch.Modified:
                // 往Delta更新一个对象
                err := r.store.Update(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
            case watch.Deleted:
                // TODO: Will any consumers need access to the "last known
                // state", which is passed in event.Object? If so, may need
                // to change this.
                // 往Delta删除一个对象
                err := r.store.Delete(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
            case watch.Bookmark:
                // A `Bookmark` means watch has synced here, just update the resourceVersion
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            // 更新ResourceVersion
            // 处理的event总数加1
            *resourceVersion = newResourceVersion

    // 当前的watch channel被关闭了
    watchDuration := r.clock.Since(start)
    // 如果该watch一个event都没有处理 并且1秒钟都不到
    // 那有可能问题 所以会返回一个错误 此时退出watchHandler后, 会整个退出ListAndWatch, Run中的util会再次调用ListAndWatch方法.
    if watchDuration < 1*time.Second && eventCount == 0 {
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    // 退出watchHandler后 不会整个退出ListAndWatch 在for循环里面再生成一个watch再次调用watchHandler
    klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
    return nil

1. 这里可以看到watchHandler会把对应的状态加入到DeltaFIFO中.
2. watchHandler返回nil不会导致ListAndWatch返回. 如果watchHandler返回错误会导致ListAndWatch返回, 进而回到Run中通过wait.Until再次调用ListAndWatch.

4. 总结


分析完了整个方法后可以知道. 整个reflector所做的工作就是从list中获得所有对象, 然后根据当时拿到的resouceVersion开始进行监控后面的一系列操作, 然后加入到deltaFIFO中.

然后负责工作的是WatchHandler, 当该方法中出现不是nil的错误时, 会重新调用WatchList方法重新获得listreplace对接的deltaFIFO. 如果出现的错误是nil, 这种情况是因为watch被关闭了, 这个时候watchHandler会返回到WatchList重新再次生成一个watch对象重新调用watchHandler进行监控.


整个informer体系在k8s代码中占有重要一环, 理解informer可以更好理解k8s的工作机制.


1. [k8s源码分析][client-go] informer之store和index
2. [k8s源码分析][client-go] informer之delta_fifo
3. [k8s源码分析][client-go] informer之reflector
4. [k8s源码分析][client-go] informer之controller和shared_informer(1)
5. [k8s源码分析][client-go] informer之controller和shared_informer(2)
6. [k8s源码分析][client-go] informer之SharedInformerFactory



