美文网首页
[istio源码分析][pilot] pilot之configC

[istio源码分析][pilot] pilot之configC

作者: nicktming | 来源:发表于2020-01-30 23:30 被阅读0次

    1. 前言

    转载请说明原文出处, 尊重他人劳动成果!

    源码位置: https://github.com/nicktming/istio
    分支: tming-v1.3.6 (基于1.3.6版本)

    1. [istio源码分析][galley] galley之上游(source)
    2. [istio源码分析][galley] galley之runtime
    3. [istio源码分析][galley] galley之下游(mcp)
    在前面几篇文章中已经分析了galley的整个流程, galley中最终把从source(fs, k8s) 中获得的数据会从mcp serverpushmcp client, 那本文将会分析pilotconfigController是如何使用mcp client来接收数据并如何处理的.

    2. ConfigController

    先看一下configControllerpilot是如何初始化的.

    // pilot/cmd/pilot-discovery/main.go
    var (
        serverArgs = bootstrap.PilotArgs{
            CtrlZOptions:     ctrlz.DefaultOptions(),
            KeepaliveOptions: keepalive.DefaultOption(),
        }
        ...
    )
    ...
    discoveryServer, err := bootstrap.NewServer(serverArgs)
    ...
    
    // pilot/pkg/bootstrap/server.go
    func NewServer(args PilotArgs) (*Server, error) {
        if err := s.initMesh(&args); err != nil {
            return nil, fmt.Errorf("mesh: %v", err)
        }
        ...
        if err := s.initConfigController(&args); err != nil {
            return nil, fmt.Errorf("config controller: %v", err)
        }
    }
    

    对分析不影响的代码直接删减了.

    func (s *Server) initConfigController(args *PilotArgs) error {
        if len(s.mesh.ConfigSources) > 0 {
            // 如果有config source的配置 则配置mcp client
            if err := s.initMCPConfigController(args); err != nil {
                return err
            }
        } 
        ...
        // Create the config store.
        s.istioConfigStore = model.MakeIstioStore(s.configController)
        return nil
    }
    

    1. 可以看到s.istioConfigStore实质上就是s.configController.
    2. 主要关注mcp configuration, 关于mesh配置信息可以参考 [istio源码分析] istio源码开发调试版简单安装 .

    2.1 initMCPConfigController

    func (s *Server) initMCPConfigController(args *PilotArgs) error {
        clientNodeID := ""
        collections := make([]sink.CollectionOptions, len(model.IstioConfigTypes))
        for i, t := range model.IstioConfigTypes {
            // 都是istio crd资源 没有原生的k8s资源 比如pod, service等
            collections[i] = sink.CollectionOptions{Name: t.Collection, Incremental: false}
        }
    
        options := coredatamodel.Options{
            DomainSuffix: args.Config.ControllerOptions.DomainSuffix,
            // 后面会用到
            ClearDiscoveryServerCache: func() {
                s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{Full: true})
            },
        }
        ...
        for _, configSource := range s.mesh.ConfigSources {
            if strings.Contains(configSource.Address, fsScheme+"://") {
                ...
            }
            // 设置安全访问的情况 在以后分析policy的时候会用到
            securityOption := grpc.WithInsecure()
            if configSource.TlsSettings != nil &&
                configSource.TlsSettings.Mode != istio_networking_v1alpha3.TLSSettings_DISABLE {
                ...
            }
            ...
            conn, err := grpc.DialContext(
                ctx, configSource.Address,
                securityOption, msgSizeOption, keepaliveOption, initialWindowSizeOption, initialConnWindowSizeOption)
            ...
            // 创建一个controller
            mcpController := coredatamodel.NewController(options)
            sinkOptions := &sink.Options{
                CollectionOptions: collections,
                Updater:           mcpController,
                ID:                clientNodeID,
                Reporter:          reporter,
            }
            // 创建mcp client
            cl := mcpapi.NewResourceSourceClient(conn)
            mcpClient := sink.NewClient(cl, sinkOptions)
            configz.Register(mcpClient)
            clients = append(clients, mcpClient)
    
            conns = append(conns, conn)
            // 将该controller加入到configStores
            configStores = append(configStores, mcpController)
        }
        ...
        // Wrap the config controller with a cache.
        aggregateMcpController, err := configaggregate.MakeCache(configStores)
        if err != nil {
            return err
        }
        s.configController = aggregateMcpController
        return nil
    }
    

    1. 关注options.ClearDiscoveryServerCache, 后面会用到.
    2. coredatamodel.NewController(options)创建一个controller.
    3. sink.NewClient(cl, sinkOptions)创建一个mcp client, 注意sinkOptions.Updater就是2.中创建的controller. 另外mcp server端在galley.
    4. configaggregate.MakeCache(configStores) 是将所有的controller按照其支持的collection(比如virtualService对应了哪些controller)进行分类起来.

    2.2 mcp client

    // pkg/mcp/sink/client_sink.go
    func NewClient(client mcp.ResourceSourceClient, options *Options) *Client {
        return &Client{
            Sink:     New(options),
            reporter: options.Reporter,
            client:   client,
        }
    }
    // pkg/mcp/sink/sink.go
    func New(options *Options) *Sink { // nolint: lll
        nodeInfo := &mcp.SinkNode{
            Id:          options.ID,
            Annotations: options.Metadata,
        }
        state := make(map[string]*perCollectionState)
        // state来自options.CollectionOptions
        for _, collection := range options.CollectionOptions {
            state[collection.Name] = &perCollectionState{
                versions:           make(map[string]string),
                requestIncremental: collection.Incremental,
            }
        }
        return &Sink{
            ...
        }
    }
    

    这里需要关注如下:
    1. 可以看到Sink中的state来自options.CollectionOptions, 往上追溯到initMCPConfigControllermodel.IstioConfigTypes.

    IstioConfigTypes = ConfigDescriptor{
            VirtualService,
            Gateway,
            ServiceEntry,
            DestinationRule,
            EnvoyFilter,
            Sidecar,
            HTTPAPISpec,
            HTTPAPISpecBinding,
            QuotaSpec,
            QuotaSpecBinding,
            AuthenticationPolicy,
            AuthenticationMeshPolicy,
            ServiceRole,
            ServiceRoleBinding,
            RbacConfig,
            ClusterRbacConfig,
        }
    

    可以看到model.IstioConfigTypes中看到的都是istio中的一些crd资源, 也就是说从galley中得到的config resource都是这些资源, 没有k8s中的原生资源, 比如Pod等.

    2.3 mcp client Start

    // pkg/mcp/sink/client_sink.go
    func (c *Client) Run(ctx context.Context) {
        ...
        for {
            // 建立连接
            for {
                ...
                stream, err := c.client.EstablishResourceStream(ctx)
                ...
            }
            // 处理
            err := c.ProcessStream(c.stream)
            ...
        }
    }
    // pkg/mcp/sink/sink.go
    func (sink *Sink) ProcessStream(stream Stream) error {
        // send initial requests for each supported type
        // 为每一个支持的类型发送一个初始的请求
        initialRequests := sink.createInitialRequests()
        for {
            var req *mcp.RequestResources
            if len(initialRequests) > 0 {
                // 发送初始request
                req = initialRequests[0]
                initialRequests = initialRequests[1:]
            } else {
                // 从server端接收response
                resources, err := stream.Recv()
                if err != nil {
                    if err != io.EOF {
                        sink.reporter.RecordRecvError(err, status.Code(err))
                        scope.Errorf("Error receiving MCP resource: %v", err)
                    }
                    return err
                }
                // client端处理后需要发送ACK/NACK
                // 所以处理response后组装了一个request
                req = sink.handleResponse(resources)
            }
    
            sink.journal.RecordRequestResources(req)
            // 向server端发送request
            if err := stream.Send(req); err != nil {
                sink.reporter.RecordSendError(err, status.Code(err))
                scope.Errorf("Error sending MCP request: %v", err)
                return err
            }
        }
    }
    

    关于mcp中的clientserver之间的交互在 [istio源码分析][galley] galley之下游(mcp) 中已经有介绍, 这里再次说明一下.

    关于mcp可以参考 https://github.com/istio/api/tree/master/mcp, 这里用此图可以增加理解

    mcp.png

    对比此图和ProcessStream来进行说明:
    1. client端为每一个支持的类型initialRequests发送一个初始的请求.
    2. server端会返回一个response.
    3. client端需要返回一个ACK/NACK, 所以ProcessStream中的sink.handleResponse(resources)中处理完response又构造了一个新的request来返回给server端.

    所以先看一下都发了哪些类型:

    func (sink *Sink) createInitialRequests() []*mcp.RequestResources {
        sink.mu.Lock()
    
        initialRequests := make([]*mcp.RequestResources, 0, len(sink.state))
        // sink.state 来源自 initMCPConfigController中的model.IstioConfigTypes
        for collection, state := range sink.state {
            var initialResourceVersions map[string]string
            if state.requestIncremental {
                ...
            }
            req := &mcp.RequestResources{
                SinkNode:                sink.nodeInfo,
                Collection:              collection,
                InitialResourceVersions: initialResourceVersions,
                Incremental:             state.requestIncremental,
            }
            initialRequests = append(initialRequests, req)
        }
        sink.mu.Unlock()
        return initialRequests
    }
    

    可以看到发送的类型就是initMCPConfigController中的model.IstioConfigTypes.

    2.4 handleResponse

    func (sink *Sink) handleResponse(resources *mcp.Resources) *mcp.RequestResources {
        if handleResponseDoneProbe != nil {
            defer handleResponseDoneProbe()
        }
        // 必须是支持的类型
        state, ok := sink.state[resources.Collection]
        if !ok {
            errDetails := status.Errorf(codes.Unimplemented, "unsupported collection %v", resources.Collection)
            return sink.sendNACKRequest(resources, errDetails)
        }
    
        change := &Change{
            Collection:        resources.Collection,
            Objects:           make([]*Object, 0, len(resources.Resources)),
            Removed:           resources.RemovedResources,
            Incremental:       resources.Incremental,
            SystemVersionInfo: resources.SystemVersionInfo,
        }
        
        for _, resource := range resources.Resources {
            var dynamicAny types.DynamicAny
            if err := types.UnmarshalAny(resource.Body, &dynamicAny); err != nil {
                return sink.sendNACKRequest(resources, err)
            }
    
            // TODO - use galley metadata to verify collection and type_url match?
            object := &Object{
                TypeURL:  resource.Body.TypeUrl,
                Metadata: resource.Metadata,
                Body:     dynamicAny.Message,
            }
            change.Objects = append(change.Objects, object)
        }
    
        if err := sink.updater.Apply(change); err != nil {
            // 发送NACK
            errDetails := status.Error(codes.InvalidArgument, err.Error())
            return sink.sendNACKRequest(resources, errDetails)
        }
        ...
        // ACK
        sink.reporter.RecordRequestAck(resources.Collection, 0)
        req := &mcp.RequestResources{
            SinkNode:      sink.nodeInfo,
            Collection:    resources.Collection,
            ResponseNonce: resources.Nonce,
            Incremental:   useIncremental,
        }
        return req
    }
    

    1. 根据response发送回来的数据组装成change, 并将该change作为参数调用sink.updater.Apply方法.
    2. 有任何错误会发送NACKserver端, 如果没有错误就发送ACKserver端.

    sink.updater是什么呢? 在initMCPConfigController中可以看到:

            options := coredatamodel.Options{
                DomainSuffix: args.Config.ControllerOptions.DomainSuffix,
                 // 后面会用到
                ClearDiscoveryServerCache: func() {
                    s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{Full: true})
                 },
            }
            ...
            mcpController := coredatamodel.NewController(options)
            sinkOptions := &sink.Options{
                CollectionOptions: collections,
                Updater:           mcpController,
                ID:                clientNodeID,
                Reporter:          reporter,
            }
            // 创建mcp client
            cl := mcpapi.NewResourceSourceClient(conn)
            mcpClient := sink.NewClient(cl, sinkOptions)
    

    sink.updater就是mcpController.

    3. Controller

    func NewController(options Options) CoreDataModel {
        descriptorsByMessageName := make(map[string]model.ProtoSchema, len(model.IstioConfigTypes))
        synced := make(map[string]bool)
        for _, descriptor := range model.IstioConfigTypes {
            // don't register duplicate descriptors for the same collection
            if _, ok := descriptorsByMessageName[descriptor.Collection]; !ok {
                descriptorsByMessageName[descriptor.Collection] = descriptor
                synced[descriptor.Collection] = false
            }
        }
        return &Controller{
            ...
        }
    }
    

    关注一下descriptorsByMessageName是如何生成的即可.

    3.1 Apply

    func (c *Controller) Apply(change *sink.Change) error {
        descriptor, ok := c.descriptorsByCollection[change.Collection]
        if !ok {
            return fmt.Errorf("apply type not supported %s", change.Collection)
        }
    
        schema, valid := c.ConfigDescriptor().GetByType(descriptor.Type)
        if !valid {
            return fmt.Errorf("descriptor type not supported %s", descriptor.Type)
        }
    
        c.syncedMu.Lock()
        c.synced[change.Collection] = true
        c.syncedMu.Unlock()
    
        // innerStore is [namespace][name]
        innerStore := make(map[string]map[string]*model.Config)
        // 根据change的信息生成以innerStore
        for _, obj := range change.Objects {
            //构造innerStore
        }
    
        var prevStore map[string]map[string]*model.Config
    
        c.configStoreMu.Lock()
        prevStore = c.configStore[descriptor.Type]
        c.configStore[descriptor.Type] = innerStore
        c.configStoreMu.Unlock()
    
        if descriptor.Type == model.ServiceEntry.Type {
            c.serviceEntryEvents(innerStore, prevStore)
        } else {
            c.options.ClearDiscoveryServerCache()
        }
    
        return nil
    }
    

    1. 根据change构造innerStore, 进而更新该类型在c.configStore中的内容.
    2. 根据旧内容prevStore和新内容innerStore来做分发工作.

    2.1 如果是ServiceEntry, 调用serviceEntryEvents方法.

    func (c *Controller) serviceEntryEvents(currentStore, prevStore map[string]map[string]*model.Config) {
        dispatch := func(model model.Config, event model.Event) {}
        if handlers, ok := c.eventHandlers[model.ServiceEntry.Type]; ok {
            dispatch = func(model model.Config, event model.Event) {
                log.Debugf("MCP event dispatch: key=%v event=%v", model.Key(), event.String())
                for _, handler := range handlers {
                    handler(model, event)
                }
            }
        }
    
        // add/update
        for namespace, byName := range currentStore {
            for name, config := range byName {
                if prevByNamespace, ok := prevStore[namespace]; ok {
                    if prevConfig, ok := prevByNamespace[name]; ok {
                        if config.ResourceVersion != prevConfig.ResourceVersion {
                            dispatch(*config, model.EventUpdate)
                        }
                    } else {
                        dispatch(*config, model.EventAdd)
                    }
                } else {
                    dispatch(*config, model.EventAdd)
                }
            }
        }
        ...
    }
    func (c *Controller) RegisterEventHandler(typ string, handler func(model.Config, model.Event)) {
        c.eventHandlers[typ] = append(c.eventHandlers[typ], handler)
    }
    

    通过注册好了的handler来处理这些生成的事件.

    2.2 如果不是ServiceEntry, 调用ClearDiscoveryServerCache方法.

    s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{Full: true})
    

    所以这个放到以后分析.

    4. 总结

    pilot.png

    mcp server中接收数据后通过handleResponse调用controllerApply方法, 通过类型来进行处理. 处理完向server端返回ACK/NACK.

    5. 参考

    1. istio 1.3.6源码
    2. https://cloud.tencent.com/developer/article/1409159

    相关文章

      网友评论

          本文标题:[istio源码分析][pilot] pilot之configC

          本文链接:https://www.haomeiwen.com/subject/dsvithtx.html