美文网首页kubernetes架构
kubernetes1.9源码阅读 replication co

kubernetes1.9源码阅读 replication co

作者: 范彬2017 | 来源:发表于2018-02-03 20:56 被阅读53次

    replication controller是kube-controller-manager中一个重要的控制器,主要是rs进行控制,确保pods的数量恰好和rs的规定一致。因此replication controller主要对这两类进行watch,一类是replicationset,另一类是pods。本文是replication controller的源码阅读笔记,会包括client-go的Informer机制,希望帮助开始阅读kubernetes源码的小伙伴们,更希望与对kubernetes源码阅读感兴趣的小伙伴儿们交流,有错误的地方也希望能指出,共同进步。

    入口程序

    cmd/kube-controller-manager/controller-manager.go main

    1. 调用options.NewCMServer,构建CMServer;

    2. 调用app.Run方法,运行CMServer;

    启动CMServer

    cmd/kube-controller-manager/app/controllermanager.go Run

    1. 调用createClients, 创建apiserver客户端,通过REST方式访问APIserver提供的API服务;

    2. 启动协程调用go startHTTP,运行http Server;

    3. 调用record.NewBroadcaster,创建eventBroadcaster对象,接收EventBroadcaster发送的event,输出到logging中,并输出到EventSink,并使用recorder记录"controller-mananger"的事件;

    4. 调用CreateControllerContext,在CreateControllerContext方法中,会调用informers.NewSharedInformerFactory,创建sharedInformerFactory(client-go/informers/factory.go)对象;

    5. 调用StartControllers,启动Controllers;

    6. 调用ctx.InformerFactory.Start,在这里是调用sharedInformerFactory.start(client-go/informers/factory.go);

    7. saTokenControllerInitFunc和NewControllerInitializers定义了controllers的InitFunc;

    startReplicationController

    cmd/kube-controller-manager/app/core.go startReplicationController

    1. 协程启动调用replicationcontroller.NewReplicationManager,构建ReplicationManager;

    (1) 调用ctx.InformerFactory.Core().V1().Pods(),这里调用sharedInformerFactory(client-go/informers/factory.go)对象的Core().V1().Pods()方法,将会构建PodInformer对象,

    (2) 以此方式,创建ReplicationControllersInformer;

    2. 运行ReplicationManager;

    NewReplicationManager

    pkg/controller/replication/replication_set.go NewReplicationManager

    1. 在NewReplicationManager中,

    (1) 首先,调用record.NewBroadcaster,创建eventBroadcaster对象,调用eventBroadcaster.StartLogging,接收EventBroadcaster发送的event,输出到logging中;调用 eventBroadcaster.StartRecordingToSink,event输出到EventSink,并调用eventBroadcaster.NewRecorder记录"replication-controller"的事件;

    (2) 将调用NewBaseController;

    2. 在NewBaseController方法中,

    (1) 构建ReplicaSetController对象,包括了podControl,它定义了对Pod的操作,是由RealPodControl去调用apiserver完成创建实现;

    (2) 将调用 rsInformer.Informer().AddEventHandler,这将调用rsInformer的构造函数NewReplicaSetInformer,rsInformer将event handler包装成listerner,然后添加到s.processor.listeners中,并定义对象处理的回调函数AddFunc、UpdateFunc、DeleteFunc;

    (3) 同时,调用rsInformer的Lister方法;

    (4) 最后,调用rsInformer.Informer().HasSynced,判断是否缓存完成;

    (5) 以此方式,调用podInformer.Informer().AddEventHandler、podInformer的Lister方法及podInformer.Informer().HasSynced;

    (6) 设置rsc.syncHandler;syncHandler负责pod与rc的同步,确保Pod副本数与rc规定的相同;

    PodInformer

    client-go/informers/core/v1/pod.go NewPodInformer

    1. 构建cache.listWatch对象,定义了ListFunc和WatchFunc;

    2. 调用cache.NewSharedIndexInformer;

    NewSharedIndexInformer

    client-go/tools/cache/shared_informer.go NewSharedIndexInformer

    运行ReplicationManager

    pkg/controller/replicaset/replica_set.go Run

    1. 调用controller.WaitForCacheSync方法,在controller.WaitForCacheSync中,将调用ca che.WaitForCacheSync;

    2. 调用rsc.worker, 将启动workers调用rsc.syncHandler,syncHandler负责pod与rc的同步,确保Pod副本数与rc规定的相同;

    sharedInformerFactory.Start

    client-go/informers/factory.go Start

    1. 调用Informer.Run,这里调用SharedIndexInformer.Run;

    SharedIndexInformer.Run

    client-go/tools/cache/shared_informer.go Run

    1. 调用NewDeltaFIFO,创建queue;

    2. 定义Deltas处理函数s.HandleDeltas;

    3. 调用New(cfg),构建sharedIndexInformer的controller;

    4. 调用s.cacheMutationDetector.Run,检查缓存对象是否变化;

    5. 调用s.processor.run,将调用sharedProcessor.run,会调用Listener.run和Listener.pop,执行处理queue的函数;

    6. 调用s.controller.Run,构建Reflector,进行对etcd的缓存;

    sharedIndexedInformer.controller.Run

    client-go/tools/cache/controller.go Run

    1. 调用NewReflector,构建Reflector;

    (1) Reflector对象,包括ListerWatcher、ObjectType、Queue、FullResyncPeriod;

    2. 调用r.run,将调用reflector.ListAndWatch,执行r.List、r.watch、r.watchHandler,进行对etcd的缓存;

    3. 调用c.processLoop,reflector向queue里面添加数据,processLoop会不停去消费这里这些数据;

    controller.processLoop

    client-go/tools/cache/controller.go processLoop

    1. cache.PopProcessFunc(c.config.Process)将前面Process函数传递进去;

    DeltaFIFO.Pop

    client-go/tools/cache/delta_fifo.go Pop

    1. 主要从f.items取出object,然后调用process函数进行处理;

    处理DeltaFIFO

    client-go/tools/cache/shared_informer.go HandleDeltas

    1. 调用s.process.distribute,将调用Listener.add,负责将watch的资源传到listener;

    Listener.add/pop/run

    client-go/tools/cache/shared_informer.go sharedProcessor.run/add/pop;

    1. listenser的add函数负责将notify装进pendingNotifications;

    2. pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel;

    3. run函数则负责取出notify,然后根据notify的类型(增加、删除、更新)触发相应的处理函数,这些函数在ReplicaSetController注册,分别是:rsc.addPod、rsc.updatePod、rsc.deletePod、rsc.enqueueReplicaSet、rsc.updateRS、rsc.enqueueReplicaSet

    rsc.addPod

    pkg/controller/replicaset/replica_set.go addPod

    1. 首先会根据pod返回rc,当pod不属于任何rc时,则返回。找到rc以后,更新rm.expectations.CreationObserved这个rc的期望值,也就是假如一个rc有4个pod,现在检测到创建了一个pod,则会将这个rc的期望值减少,变为3。然后将这个rc放入队列;

    2. 调用rsc.enqueueReplicaSet,将调用rsc.queue.Add;

    rsc.worker

    pkg/controller/replicaset/replica_set.go worker()

    1. 调用rsc.syncHandler,这里会调用rsc.syncReplicaSet,syncReplicaSet负责pod与rc的同步,确保Pod副本数与rc规定的相同;

    相关阅读:

    1. http://licyhust.com/

    相关文章

      网友评论

        本文标题:kubernetes1.9源码阅读 replication co

        本文链接:https://www.haomeiwen.com/subject/gthpzxtx.html