美文网首页
[istio源码分析][pilot] pilot之Service

[istio源码分析][pilot] pilot之Service

作者: nicktming | 来源:发表于2020-01-31 23:56 被阅读0次

    1. 前言

    转载请说明原文出处, 尊重他人劳动成果!

    源码位置: https://github.com/nicktming/istio
    分支: tming-v1.3.6 (基于1.3.6版本)

    在上文 [istio源码分析][pilot] pilot之configController (mcp client) 分析了pilot中的configController的作用, 本文将分析pilotServiceController的作用.

    2. ServiceController

    看一下serviceController是如何初始化的.

    // pilot/cmd/pilot-discovery/main.go
    var (
        serverArgs = bootstrap.PilotArgs{
            CtrlZOptions:     ctrlz.DefaultOptions(),
            KeepaliveOptions: keepalive.DefaultOption(),
        }
        ...
    )
    ...
    discoveryServer, err := bootstrap.NewServer(serverArgs)
    ...
    func init() {
        discoveryCmd.PersistentFlags().StringSliceVar(&serverArgs.Service.Registries, "registries",
            []string{string(serviceregistry.KubernetesRegistry)},
            fmt.Sprintf("Comma separated list of platform service registries to read from (choose one or more from {%s, %s, %s, %s})",
                serviceregistry.KubernetesRegistry, serviceregistry.ConsulRegistry, serviceregistry.MCPRegistry, serviceregistry.MockRegistry))
        ...
    }
    
    // pilot/pkg/bootstrap/server.go
    func NewServer(args PilotArgs) (*Server, error) {
       ...
       if err := s.initServiceControllers(&args); err != nil {
            return nil, fmt.Errorf("service controllers: %v", err)
       }
       ...
    }
    

    可以看到是调用initServiceControllers方法来初始化s.ServiceController, 并且传入了参数args(来源自serverArgs).

    2.1 initServiceControllers

    // pilot/pkg/bootstrap/server.go
    func (s *Server) initServiceControllers(args *PilotArgs) error {
        // 创建一个aggregate controller
        serviceControllers := aggregate.NewController()
        registered := make(map[serviceregistry.ServiceRegistry]bool)
        for _, r := range args.Service.Registries {
            serviceRegistry := serviceregistry.ServiceRegistry(r)
            if _, exists := registered[serviceRegistry]; exists {
                log.Warnf("%s registry specified multiple times.", r)
                continue
            }
            registered[serviceRegistry] = true
            log.Infof("Adding %s registry adapter", serviceRegistry)
            switch serviceRegistry {
            case serviceregistry.MockRegistry:
                ...
            case serviceregistry.KubernetesRegistry:
                if err := s.createK8sServiceControllers(serviceControllers, args); err != nil {
                    return err
                }
            case serviceregistry.ConsulRegistry:
                ...
            case serviceregistry.MCPRegistry:
                ...
            }
        }
        // 利用configController创建serviceEntryStore
        serviceEntryStore := external.NewServiceDiscovery(s.configController, s.istioConfigStore)
        // 关于ServiceEntry的registry
        serviceEntryRegistry := aggregate.Registry{
            Name:             "ServiceEntries",
            Controller:       serviceEntryStore,
            ServiceDiscovery: serviceEntryStore,
        }
        serviceControllers.AddRegistry(serviceEntryRegistry)
        // serviceControllers是一些aggregate.Registry的集合
        s.ServiceController = serviceControllers
        // 运行ServiceController
        s.addStartFunc(func(stop <-chan struct{}) error {
            go s.ServiceController.Run(stop)
            return nil
        })
        return nil
    }
    

    1. 初始化一个aggregate.controller.

    // pilot/pkg/serviceregistry/aggregate/controller.go
    type Controller struct {
        registries []Registry
        storeLock  sync.RWMutex
    }
    func NewController() *Controller {
        return &Controller{
            registries: []Registry{},
        }
    }
    

    可以看到aggregate.controller主要是一些Registry的集合.

    2. 从最初的init方法可以知道args.Service.Registries在运行的时候没有指定的话默认是只有serviceregistry.KubernetesRegistry.

    func (s *Server) createK8sServiceControllers(serviceControllers *aggregate.Controller, args *PilotArgs) (err error) {
        clusterID := string(serviceregistry.KubernetesRegistry)
        log.Infof("Primary Cluster name: %s", clusterID)
        args.Config.ControllerOptions.ClusterID = clusterID
        // 创建一个controller2.Controller
        kubectl := controller2.NewController(s.kubeClient, args.Config.ControllerOptions)
        s.kubeRegistry = kubectl
        // 组装成Registry加入到serviceControllers中
        serviceControllers.AddRegistry(
            aggregate.Registry{
                Name:             serviceregistry.KubernetesRegistry,
                ClusterID:        clusterID,
                ServiceDiscovery: kubectl,
                Controller:       kubectl,
            })
    
        return
    }
    

    创建了一个controller2.Controller(后面会详细分析该Controller), 然后组装成一个Registry加入到serviceControllers中.

    3. 利用configController创建serviceEntryStore, 并组装成一个Registry加入到serviceControllers中.

    3. ServiceRegistry

    initServiceControllers中可以到所有的类涉及到pilog/pkg/serviceregistry.

    .
    ├── aggregate     // 所有controller的集合
    ├── consul        // consul
    ├── external      // serviceEntry使用
    ├── kube          // k8s环境
    ├── memory        // 测试使用
    └── platform.go   // 常量
    

    由于k8s比较有典型性, 所以这里先分析k8s环境, 然后别的部分就会比较好理解.

    先看一下Controller的定义:

    // pilot/pkg/model/controller.go
    type Controller interface {
        // AppendServiceHandler notifies about changes to the service catalog.
        AppendServiceHandler(f func(*Service, Event)) error
        // AppendInstanceHandler notifies about changes to the service instances
        // for a service.
        AppendInstanceHandler(f func(*ServiceInstance, Event)) error
        // Run until a signal is received
        Run(stop <-chan struct{})
    }
    

    总共三个方法:
    1. Run 运行该controller.
    2. AppendServiceHandlerAppendInstanceHandler动态为该controller添加handler.

    3.1 kube

    所以就从Controller实现这三个方法的角度分析.

    // pilot/pkg/serviceregistry/kube/controller/controller.go
    func NewController(client kubernetes.Interface, options Options) *Controller {
        log.Infof("Service controller watching namespace %q for services, endpoints, nodes and pods, refresh %s",
            options.WatchedNamespace, options.ResyncPeriod)
    
        // Queue requires a time duration for a retry delay after a handler error
        out := &Controller{
            domainSuffix:               options.DomainSuffix,
            client:                     client,
            queue:                      kube.NewQueue(1 * time.Second),
            ClusterID:                  options.ClusterID,
            XDSUpdater:                 options.XDSUpdater,
            servicesMap:                make(map[host.Name]*model.Service),
            externalNameSvcInstanceMap: make(map[host.Name][]*model.ServiceInstance),
        }
    
        sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))
    
        svcInformer := sharedInformers.Core().V1().Services().Informer()
        out.services = out.createCacheHandler(svcInformer, "Services")
    
        epInformer := sharedInformers.Core().V1().Endpoints().Informer()
        out.endpoints = out.createEDSCacheHandler(epInformer, "Endpoints")
    
        nodeInformer := sharedInformers.Core().V1().Nodes().Informer()
        out.nodes = out.createCacheHandler(nodeInformer, "Nodes")
    
        podInformer := sharedInformers.Core().V1().Pods().Informer()
        out.pods = newPodCache(out.createCacheHandler(podInformer, "Pod"), out)
    
        return out
    }
    

    可以看到Controller主要对Service, Endpoint, NodePod这几个资源的初始化.

    3.1.1 createCacheHandler

    另外通过createCacheHandlercreateEDSCacheHandler创建了cacheHandler.

    // pilot/pkg/serviceregistry/kube/controller/controller.go
    func (c *Controller) createCacheHandler(informer cache.SharedIndexInformer, otype string) cacheHandler {
        handler := &kube.ChainHandler{Funcs: []kube.Handler{c.notify}}
        informer.AddEventHandler(
            cache.ResourceEventHandlerFuncs{
                AddFunc: func(obj interface{}) {
                    incrementEvent(otype, "add")
                    c.queue.Push(kube.Task{Handler: handler.Apply, Obj: obj, Event: model.EventAdd})
                },
                ...
            })
        return cacheHandler{informer: informer, handler: handler}
    }
    

    可以看到这里使用了一个queue来做缓存带, 将对某个资源的Add update delete事件组装成一个kube.Task 放入到c.queue中, 另外c.queue取出来之后进行操作. 那怎么操作呢? 看一下ChainHandler是什么结构.

    // pilot/pkg/serviceregistry/kube/controller/controller.go
    type cacheHandler struct {
        informer cache.SharedIndexInformer
        handler  *ChainHandler
    }
    
    // pilot/pkg/serviceregistry/kube/queue.go
    type ChainHandler struct {
        Funcs []Handler
    }
    // Apply is the handler function
    func (ch *ChainHandler) Apply(obj interface{}, event model.Event) error {
        for _, f := range ch.Funcs {
            if err := f(obj, event); err != nil {
                return err
            }
        }
        return nil
    }
    // Append a handler as the last handler in the chain
    func (ch *ChainHandler) Append(h Handler) {
        ch.Funcs = append(ch.Funcs, h)
    }
    
    func (q *queueImpl) Run(stop <-chan struct{}) {
        ...
        for {
            ...
            var item Task
            item, q.queue = q.queue[0], q.queue[1:]
            q.cond.L.Unlock()
            if err := item.Handler(item.Obj, item.Event); err != nil {
                ...
            }
        }
    }
    

    所以从c.queue里面出来之后, 会执行ChainHandler.Apply方法, 然后可以看到Apply方法中是执行一系列通过Append方法注册在该ChainHandler中的Handler, 从createCacheHandler中可以知道初始化的时候就已经放了一个c.notify这个Handler.

    所以接下来分析外部如何可以动态的添加handler, 那controller的两个方法AppendServiceHandlerAppendInstanceHandler就是这个作用.

    3.1.2 AppendServiceHandler 和 AppendInstanceHandler
    func (c *Controller) AppendServiceHandler(f func(*model.Service, model.Event)) error {
        c.services.handler.Append(func(obj interface{}, event model.Event) error {
            svc, ok := obj.(*v1.Service)
            ...
            // 将k8s.Service转化为istio.Service
            svcConv := kube.ConvertService(*svc, c.domainSuffix, c.ClusterID)
            // 生成istio.ServiceInstance
            instances := kube.ExternalNameServiceInstances(*svc, svcConv)
            switch event {
            case model.EventDelete:
                c.Lock()
                delete(c.servicesMap, svcConv.Hostname)
                delete(c.externalNameSvcInstanceMap, svcConv.Hostname)
                c.Unlock()
            default:
                c.Lock()
                // fmt.Sprintf("%s.%s.svc.%s", name, namespace, domainSuffix) 为 key
                // istio.Service 为 value
                c.servicesMap[svcConv.Hostname] = svcConv
                if instances == nil {
                    delete(c.externalNameSvcInstanceMap, svcConv.Hostname)
                } else {
                    c.externalNameSvcInstanceMap[svcConv.Hostname] = instances
                }
                c.Unlock()
            }
            // 通过c.XDSUpdater更新
            c.XDSUpdater.SvcUpdate(c.ClusterID, hostname, ports, portsByNum)
            // 调用传入的handler
            f(svcConv, event)
            return nil
        })
        return nil
    }
    

    这里注意几个重点:
    1.k8s.Service 转化为 名为svcConvisito.Service.
    2. controller.servicesMap相当于本地的缓存保存着istio.service.
    3. 如果该Service类型是ExternalName, 需要获得其instances并保存到本地缓存到externalNameSvcInstanceMap.
    4. 通过XDSUpdater来告知下游更新数据.
    5. 调用传入的handler方法.
    6. 注意这个是增加在c.services中.

    func (c *Controller) AppendInstanceHandler(f func(*model.ServiceInstance, model.Event)) error {
        if c.endpoints.handler == nil {
            return nil
        }
        c.endpoints.handler.Append(func(obj interface{}, event model.Event) error {
            ep, ok := obj.(*v1.Endpoints)
            ...
            c.updateEDS(ep, event)
            return nil
        })
        return nil
    }
    func (c *Controller) updateEDS(ep *v1.Endpoints, event model.Event) {
        hostname := kube.ServiceHostname(ep.Name, ep.Namespace, c.domainSuffix)
        mixerEnabled := c.Env != nil && c.Env.Mesh != nil && (c.Env.Mesh.MixerCheckServer != "" || c.Env.Mesh.MixerReportServer != "")
    
        endpoints := make([]*model.IstioEndpoint, 0)
        if event != model.EventDelete {
            for _, ss := range ep.Subsets {
                for _, ea := range ss.Addresses {
                    pod := c.pods.getPodByIP(ea.IP)
                    ...
                    var labels map[string]string
                    locality, sa, uid := "", "", ""
                    if pod != nil {
                        locality = c.GetPodLocality(pod)
                        sa = kube.SecureNamingSAN(pod)
                        if mixerEnabled {
                            uid = fmt.Sprintf("kubernetes://%s.%s", pod.Name, pod.Namespace)
                        }
                        labels = map[string]string(configKube.ConvertLabels(pod.ObjectMeta))
                    }
                    // 组装成istio.endpoint
                    for _, port := range ss.Ports {
                        endpoints = append(endpoints, &model.IstioEndpoint{
                            Address:         ea.IP,
                            EndpointPort:    uint32(port.Port),
                            ServicePortName: port.Name,
                            Labels:          labels,
                            UID:             uid,
                            ServiceAccount:  sa,
                            Network:         c.endpointNetwork(ea.IP),
                            Locality:        locality,
                            Attributes:      model.ServiceAttributes{Name: ep.Name, Namespace: ep.Namespace},
                        })
                    }
                }
            }
        }
        ...
        _ = c.XDSUpdater.EDSUpdate(c.ClusterID, string(hostname), ep.Namespace, endpoints)
    }
    

    1. 将当前的endpoint按照Subsets, AddressesPorts组装成多个istioEndpoint, 也就是endpoints.
    2. 通过c.XDSUpdater.EDSUpdate来告知下游更新数据.

    3.1.3 Run

    很常规, 启动pod, Service, nodeendpointinformerRun方法.

    k8s.png

    3.2 external

    // pilot/pkg/bootstrap/server.go
    serviceEntryStore := external.NewServiceDiscovery(s.configController, s.istioConfigStore)
    
    // pilot/pkg/serviceregistry/external/servicediscovery.go
    func NewServiceDiscovery(callbacks model.ConfigStoreCache, store model.IstioConfigStore) *ServiceEntryStore {
        c := &ServiceEntryStore{
            serviceHandlers:  make([]serviceHandler, 0),
            instanceHandlers: make([]instanceHandler, 0),
            store:            store,
            ip2instance:      map[string][]*model.ServiceInstance{},
            instances:        map[host.Name]map[string][]*model.ServiceInstance{},
            updateNeeded:     true,
        }
        if callbacks != nil {
            callbacks.RegisterEventHandler(model.ServiceEntry.Type, func(config model.Config, event model.Event) {
                // Recomputing the index here is too expensive.
                c.changeMutex.Lock()
                c.lastChange = time.Now()
                c.updateNeeded = true
                c.changeMutex.Unlock()
                // 从config转化成model.Service(istio.Service)
                services := convertServices(config)
                for _, handler := range c.serviceHandlers {
                    for _, service := range services {
                        go handler(service, event)
                    }
                }
                // 从config转化成model.ServiceInstance
                instances := convertInstances(config)
                for _, handler := range c.instanceHandlers {
                    for _, instance := range instances {
                        go handler(instance, event)
                    }
                }
            })
        }
        return c
    }
    

    1. 可以看到callbacks就是s.configController, 从 [istio源码分析][pilot] pilot之configController (mcp client) 知道可以向其注册handler, 可以当类型是ServiceEntry的时候就是通过handler进行处理的.

    查看handlersRun方法

    func (d *ServiceEntryStore) AppendServiceHandler(f func(*model.Service, model.Event)) error {
        d.serviceHandlers = append(d.serviceHandlers, f)
        return nil
    }
    func (d *ServiceEntryStore) AppendInstanceHandler(f func(*model.ServiceInstance, model.Event)) error {
        d.instanceHandlers = append(d.instanceHandlers, f)
        return nil
    }
    func (d *ServiceEntryStore) Run(stop <-chan struct{}) {}
    

    可以看到Run方法并没有做什么, 这是因为mcp client在接收到mcp server(galley)response后会将数据交由s.configController处理, 当类型是serviceEntry的时候会调用其注册的handler, 也就是NewServiceDiscovery方法中的callbacks.RegisterEventHandler.

    3.3 aggregate

    aggregate本质就是一些controller的集合.

    4. 参考

    1. istio 1.3.6源码

    相关文章

      网友评论

          本文标题:[istio源码分析][pilot] pilot之Service

          本文链接:https://www.haomeiwen.com/subject/zdvgthtx.html