美文网首页
[istio源码分析][galley] galley之上游(so

[istio源码分析][galley] galley之上游(so

作者: nicktming | 来源:发表于2020-01-29 22:00 被阅读0次

    1. 前言

    转载请说明原文出处, 尊重他人劳动成果!

    源码位置: https://github.com/nicktming/istio
    分支: tming-v1.3.6 (基于1.3.6版本)

    1. [istio源码分析][galley] galley之上游(source)
    2. [istio源码分析][galley] galley之runtime
    3. [istio源码分析][galley] galley之下游(mcp)

    [istio源码分析][galley] galley之runtime 中分析了galley整个机制中一个承上启下的组件, 在 [istio源码分析][galley] galley之下游(mcp) 中分析了galley中负责下游处理的mcp, 本文将分析galley的上游, 也就是信息来源source.

    2. source

    cd $GOPATH/src/istio.io/istio/galley/pkg/source
    > tree -L 1
    ├── fs
    ├── kube
    

    可以看到当前source支持两个来源fs(文件) 和 kube(k8s集群).

    3. fs

    galley的来源是文件, 也就是增/删/改文件来进行对象的添加删除修改操作. 另外galley可以感知道其操作.

    3.1 例子

    如果没有k8s集群, 则可以使用文件来进行测试, 这里写了个简单的例子https://github.com/nicktming/istio/tree/tming-1.3.6/galley/pkg/source/test来帮助理解.

    // main.go
    ...
    func main()  {
        dir := "./fs"
        shutdown := make(chan os.Signal, 1)
        // 监控文件变化
        appsignals.FileTrigger(dir, syscall.SIGUSR1, shutdown)
        // 创建source
        s := newOrFail(dir)
        // 启动source
        ch := startOrFail(s)
        // 下游接收到的事件
        receive(ch)
    }
    

    例子中的文件夹fs里面保存了一些yaml文件, 文件里面有一些对象包括VirtualServiceService等. 运行:

    receive event:[Event](Added: [VKey](istio/networking/v1alpha3/virtualservices:route-for-myapp @v0)), entry:{{0001-01-01 00:00:00 +0000 UTC map[] map[]} [VKey](istio/networking/v1alpha3/virtualservices:route-for-myapp @v0) hosts:"some.example.com" gateways:"some-ingress" http:<route:<destination:<host:"some.example.internal" > > > }
    receive event:[Event](Added: [VKey](k8s/core/v1/services:kube-system/kube-dns @v0)), entry:{{2018-02-12 23:48:44 +0800 CST map[lk1:lv1] map[ak1:av1]} [VKey](k8s/core/v1/services:kube-system/kube-dns @v0) &ServiceSpec{Ports:[{dns-tcp TCP 53 {0 53 } 0}],Selector:map[string]string{},ClusterIP:10.43.240.10,Type:ClusterIP,ExternalIPs:[],SessionAffinity:,LoadBalancerIP:,LoadBalancerSourceRanges:[],ExternalName:,ExternalTrafficPolicy:,HealthCheckNodePort:0,PublishNotReadyAddresses:false,SessionAffinityConfig:nil,}}
    receive event:[Event](FullSync), entry:{{0001-01-01 00:00:00 +0000 UTC map[] map[]} [VKey](: @) <nil>}
    
    

    可以看到有两个Added事件和一个FullSync事件. 接着手动修改文件中的某个值, 比如在名字为kube-dnsService中添加了一个labelenv: test,此时再次查看日志:

    receive event:[Event](Updated: [VKey](k8s/core/v1/services:kube-system/kube-dns @v1)), entry:{{2018-02-12 23:48:44 +0800 CST map[env:test lk1:lv1] map[ak1:av1]} [VKey](k8s/core/v1/services:kube-system/kube-dns @v1) &ServiceSpec{Ports:[{dns-tcp TCP 53 {0 53 } 0}],Selector:map[string]string{},ClusterIP:10.43.240.10,Type:ClusterIP,ExternalIPs:[],SessionAffinity:,LoadBalancerIP:,LoadBalancerSourceRanges:[],ExternalName:,ExternalTrafficPolicy:,HealthCheckNodePort:0,PublishNotReadyAddresses:false,SessionAffinityConfig:nil,}}
    

    添加了一个关于kube-system/kube-dns这个Service的更新事件(Updated).

    3.2 分析

    初始化一个文件类型的source.

    func New(root string, schema *schema.Instance, config *converter.Config) (runtime.Source, error) {
        fs := &source{
            config:  config,
            root:    root,
            kinds:   map[string]bool{},
            shas:    map[fileResourceKey][sha1.Size]byte{},
            worker:  util.NewWorker("fs source", log.Scope),
            version: 0,
        }
        // 支持的schema 比如VirtualService, Service, Pod等
        for _, spec := range schema.All() {
            fs.kinds[spec.Kind] = true
        }
        return fs, nil
    }
    

    1. root为根文件夹path.
    2. schema为该source支持的类型, 比如VirtualService, ServicePod等.
    3. config在将yaml文件转成对象时会用到.
    4. shas在内存中保存着所有yaml文件的内容.

    3.2.2 Start
    func (s *source) Start(handler resource.EventHandler) error {
        return s.worker.Start(nil, func(ctx context.Context) {
            // 初始化s.handler处理event
            s.handler = handler
            // 初始加载所有文件
            s.initialCheck()
            // 注册一个signal 可以通过FileTrigger来监控文件 这样文件变化就发送signal到此channel c
            c := make(chan appsignals.Signal, 1)
            appsignals.Watch(c)
    
            for {
                select {
                case <-ctx.Done():
                    return
                case trigger := <-c:
                    if trigger.Signal == syscall.SIGUSR1 {
                        log.Scope.Infof("Triggering reload in response to: %v", trigger.Source)
                        s.reload()
                    }
                }
            }
        })
    }
    

    1. 初始化s.handler
    2. 初始加载所有文件, 并生成事件发送出去.
    3. 注册一个signal可以通过FileTrigger来监控文件 这样文件变化就发送signal到此c(channel)
    4. 如果c(channel)中接收一个syscall.SIGUSR1信号, 就是表明监控的文件夹中的文件有变化, 所以调用s.reload()发送新事件.

    3.2.2 initialCheck

    func (s *source) initialCheck() {
        // 得到该文件夹下所有文件转化成的对象 以map形式存储
        newData := s.readFiles(s.root)
        s.mu.Lock()
        defer s.mu.Unlock()
        for k, r := range newData {
            s.process(resource.Added, k, r)
            s.shas[k] = r.sha
        }
        s.handler(resource.FullSyncEvent)
    }
    

    readFiles方法可以将某个目录下面的所有yaml文件转化成当前source支持的schema, 以map形式保存在newData中, 关于readFiles的实现这里就不分析了.

    3.2.3 process

    func (s *source) process(eventKind resource.EventKind, key fileResourceKey, r *fileResource) {
        version := resource.Version(fmt.Sprintf("v%d", s.version))
    
        var event resource.Event
        switch eventKind {
        case resource.Added, resource.Updated:
            event = resource.Event{
                Kind: eventKind,
                Entry: resource.Entry{
                    ID: resource.VersionedKey{
                        Key: resource.Key{
                            Collection: r.spec.Target.Collection,
                            FullName:   key.fullName,
                        },
                        // 当前版本
                        Version: version,
                    },
                    Item:     r.entry.Resource,
                    Metadata: r.entry.Metadata,
                },
            }
        case resource.Deleted:
            spec := kubeMeta.Types.Get(key.kind)
            event = resource.Event{
                Kind: eventKind,
                Entry: resource.Entry{
                    ID: resource.VersionedKey{
                        Key: resource.Key{
                            Collection: spec.Target.Collection,
                            FullName:   key.fullName,
                        },
                        Version: version,
                    },
                },
            }
        }
    
        log.Scope.Debugf("Dispatching source event: %v", event)
        s.handler(event)
    }
    

    这里的处理方式也很简单, 组装成resource.Event交由s.handler发到runtime中.
    这里需要注意一下Version, 这个版本号在什么时候会变化? 再次回到Start方法, 当文件发生变化时会触发reload方法, 接下来看一下reload方法.

    3.2.4 reload

    func (s *source) reload() {
        // 再次读取所有文件
        newData := s.readFiles(s.root)
        s.mu.Lock()
        defer s.mu.Unlock()
        newShas := map[fileResourceKey][sha1.Size]byte{}
        // Compute the deltas using sha comparisons
        nextVersion := s.version + 1
        // sha 为上一个版本的数据内容
        // newData为当前版本的数据内容
        // 用sha和newData对比就可以得到所有事件内容
        // 最后更新sha为当前版本的数据内容
        for k, r := range newData {
            newShas[k] = r.sha
            sha, exists := s.shas[k]
            if exists && sha != r.sha {
                if s.version != nextVersion {
                    s.version = nextVersion
                }
                s.process(resource.Updated, k, r)
            } else if !exists {
                if s.version != nextVersion {
                    s.version = nextVersion
                }
                s.process(resource.Added, k, r)
            }
        }
        for k := range s.shas {
            if _, exists := newShas[k]; !exists {
                s.process(resource.Deleted, k, nil)
            }
        }
        s.shas = newShas
    }
    

    主要内容如下:
    1. sha 为上一个版本的数据内容
    2. newData为当前版本的数据内容
    3.shanewData对比就可以得到所有事件内容并通过process方法发送给runtime.
    4. 最后更新sha为当前版本的数据内容.

    3.2.5 总结

    source.png

    1. 初始的时候通过initalCheck -> readFile -> process -> handler -> runtime.
    2. 初始完的时候会发送一个FullSync事件表明第一次初始化结束.
    3. 通过FileTrigger监控文件变化, 如有变化通过reload方法重新加载文件并更新内容对象, 并且根据当前内容和上一个版本内容对比发送对应事件给runtime.

    3. k8s

    有了针对文件作为source的理解, 对于k8s的理解就会更简单了, 从原理上讲, fs监控文件的变化, k8s使用informer机制监控k8s中原生资源和crd资源的变化即可.

    3.1 source

    // galley/pkg/source/kube/source.go
    func New(interfaces client.Interfaces, resyncPeriod time.Duration, schema *schema.Instance,
        cfg *kubeConverter.Config) (runtime.Source, error) {
    
        var err error
        var cl kubernetes.Interface
        var dynClient kubeDynamic.Interface
        var sharedInformers informers.SharedInformerFactory
    
        log.Scope.Info("creating sources for kubernetes resources")
        sources := make([]runtime.Source, 0)
        for i, spec := range schema.All() {
            log.Scope.Infof("[%d]", i)
            log.Scope.Infof("  Source:      %s", spec.CanonicalResourceName())
            log.Scope.Infof("  Collection:  %s", spec.Target.Collection)
    
            // If it's a known type, use a custom (optimized) source.
            if builtin.IsBuiltIn(spec.Kind) {
                // Lazy create the kube client.
                if cl, err = getKubeClient(cl, interfaces); err != nil {
                    return nil, err
                }
                sharedInformers = getSharedInformers(sharedInformers, cl, resyncPeriod)
                // 创建k8s原生资源的source 比如pod, service
                source, err := builtin.New(sharedInformers, spec)
                if err != nil {
                    return nil, err
                }
                sources = append(sources, source)
            } else {
                // Lazy-create the dynamic client
                if dynClient, err = getDynamicClient(dynClient, interfaces); err != nil {
                    return nil, err
                }
                // Unknown types use the dynamic source.
                // 创建crd资源的source 比如virtualService, gateway等等
                source, err := dynamic.New(dynClient, resyncPeriod, spec, cfg)
                if err != nil {
                    return nil, err
                }
                sources = append(sources, source)
            }
        }
        return &aggregate{
            sources: sources,
        }, nil
    }
    

    可以看到k8ssource使用了aggregate结构体来收集到所有的source. 主要来自两大类:
    1. k8s原生资源比如pod, Service, 使用原生client-go api即可. 每个资源都是一个source.
    2. crd资源比如VirtualService, gateway, 使用dynamic client. 每个资源都是一个source.

    3.2 Start

    func (s *aggregate) Start(handler resource.EventHandler) error {
        s.mu.Lock()
        defer s.mu.Unlock()
        syncGroup := sync.WaitGroup{}
        syncGroup.Add(len(s.sources))
        syncHandler := func(e resource.Event) {
            if e.Kind == resource.FullSync {
                // 如果某一个资源同步完了 就减少一个
                syncGroup.Done()
            } else {
                // Not a sync event, just pass on to the real handler.
                // 不是同步操作 调用传入的handler
                handler(e)
            }
        }
        for _, source := range s.sources {
            // 为每个资源调用各自的Start方法
            if err := source.Start(syncHandler); err != nil {
                return err
            }
        }
        go func() {
            // 等待所有资源同步完
            syncGroup.Wait()
            // 发送一个FullSync给runtime 表明所有资源以及同步完成
            handler(resource.FullSyncEvent)
        }()
        return nil
    }
    

    1. 可以看到Start方法是将所有的source全部启动, 也就是list and watch所有的资源.
    2. 等到所有的资源都已经同步完了, 也就是通过handler(e)发送给runtime了, 该Start才会发送一个FullSync事件给runtime.
    3. 关于某个资源的Start就不多说了, 了解informer机制原理就明白了.

    4. server

    这里结合一个galleyserver端启动程序看看是如何调用的.

    // galley/pkg/server/components/processing.go
    func (p *Processing) Start() (err error) {
          var mesh meshconfig.Cache
        var src runtime.Source
    
        if mesh, err = newMeshConfigCache(p.args.MeshConfigFile); err != nil {
            return
        }
        if src, err = p.createSource(mesh); err != nil {
            return
        }
        ...
        p.processor = runtime.NewProcessor(src, p.distributor, &processorCfg)
        ...
    }
    func (p *Processing) createSource(mesh meshconfig.Cache) (src runtime.Source, err error) {
        ...
        sourceSchema := p.getSourceSchema()
        if p.args.ConfigPath != "" {
            if src, err = fsNew(p.args.ConfigPath, sourceSchema, converterCfg); err != nil {
                return
            }
        } else {
            var k client.Interfaces
            if k, err = newKubeFromConfigFile(p.args.KubeConfig); err != nil {
                return
            }
            var found []schema.ResourceSpec
            ...
            sourceSchema = schema.New(found...)
            if src, err = newSource(k, p.args.ResyncPeriod, sourceSchema, converterCfg); err != nil {
                return
            }
        }
        return
    }
    

    可以看到创建的source将为参数传入到NewProcessor中, 这个在 [istio源码分析][galley] galley之runtime 中已经分析过了, 所以现在已经和runtime对接上了.

    full_source.png

    5.参考

    1. istio 1.3.6源码
    2. https://cloud.tencent.com/developer/article/1409159

    相关文章

      网友评论

          本文标题:[istio源码分析][galley] galley之上游(so

          本文链接:https://www.haomeiwen.com/subject/nulmthtx.html