美文网首页
[istio源码分析][galley] galley之runti

[istio源码分析][galley] galley之runti

作者: nicktming | 来源:发表于2020-01-24 18:00 被阅读0次

    1. 前言

    转载请说明原文出处, 尊重他人劳动成果!

    源码位置: https://github.com/nicktming/istio
    分支: tming-v1.3.6 (基于1.3.6版本)

    1. [istio源码分析][galley] galley之上游(source)
    2. [istio源码分析][galley] galley之runtime
    3. [istio源码分析][galley] galley之下游(mcp)

    2. runtime

    本文将着重分析galley中一个承上启下的component.

    [root@master pkg]# pwd
    /root/go/src/istio.io/istio/galley/pkg
    [root@master pkg]# tree -L 1
    .
    ├── authplugin
    ├── authplugins
    ├── config
    ├── crd
    ├── meshconfig
    ├── metadata
    ├── runtime
    ├── server
    ├── source
    ├── testing
    └── util
    
    11 directories, 0 files
    [root@master pkg]# 
    

    source会产生事件的源头, runtime负责接收source中的事件并交给下游处理. 本文的重点将放到runtime中.

    3. source

    // Source to be implemented by a source configuration provider.
    type Source interface {
        // 开始方法 对k8s而言 就是开始监控一些crd资源交由handler处理
        Start(handler resource.EventHandler) error
        // 停止监控
        Stop()
    }
    

    4. processing

    4.1 Dispatcher 和 handler

    type Handler interface {
        Handle(e resource.Event)
    }
    
    type Dispatcher struct {
        handlers map[resource.Collection][]Handler
    }
    // Dispatcher是一个Handler的实现类
    // 并且针对每一种collection 都有其对应的一系列handler
    func (d *Dispatcher) Handle(e resource.Event) {
        handlers, found := d.handlers[e.Entry.ID.Collection]
        if !found {
            scope.Warnf("Unhandled resource event: %v", e)
            return
        }
    
        for _, h := range handlers {
            h.Handle(e)
        }
    }
    

    可以看到Dispatcher是以collection为类, 每个collection都有其对应的Handler数组. (collection就是一些crd的名字, 比如istio/networking/v1alpha3/virtualservices)

    Dispatcher实现了Handler接口, 针对每一个event会找到其collection所有的handler一个个进行处理.

    4.2 Listener

    // Listener gets notified when resource of a given collection has changed.
    type Listener interface {
        CollectionChanged(c resource.Collection)
    }
    

    当某一个collection发生变化时会触发该方法

    5. state

    // 记录着内存状态的galley
    type State struct {
        listener processing.Listener
        config *Config
        // version counter is a nonce that generates unique ids for each updated view of State.
        versionCounter int64
        // entries for per-message-type State.
        entriesLock sync.Mutex
        entries     map[resource.Collection]*resourceTypeState
    
        // Virtual version numbers for Gateways & VirtualServices for Ingress projected ones
        ingressGWVersion   int64
        ingressVSVersion   int64
        lastIngressVersion int64
        // 等待被发布的事件个数
        pendingEvents int64
        // 上一次发布的时间
        lastSnapshotTime time.Time
    }
    type resourceTypeState struct {
        // 当前状态的version
        version  int64
        entries  map[resource.FullName]*mcp.Resource
        versions map[resource.FullName]resource.Version
    }
    

    5.1 Handler方法

    func (s *State) Handle(event resource.Event) {
        pks, found := s.getResourceTypeState(event.Entry.ID.Collection)
        if !found {
            return
        }
        switch event.Kind {
        case resource.Added, resource.Updated:
            // Check to see if the version has changed.
            if curVersion := pks.versions[event.Entry.ID.FullName]; curVersion == event.Entry.ID.Version {
                log.Scope.Debugf("Received event for the current, known version: %v", event)
                return
            }
            // 将事件的entry转成mcp.Resource类型
            entry, ok := s.toResource(event.Entry)
            if !ok {
                return
            }
            // 保存当前对象内存中的值以及版本
            pks.entries[event.Entry.ID.FullName] = entry
            pks.versions[event.Entry.ID.FullName] = event.Entry.ID.Version
            monitoring.RecordStateTypeCount(event.Entry.ID.Collection.String(), len(pks.entries))
            monitorEntry(event.Entry.ID, true)
    
        case resource.Deleted:
            // 删除当前对象内存中的值以及版本
            delete(pks.entries, event.Entry.ID.FullName)
            delete(pks.versions, event.Entry.ID.FullName)
            monitoring.RecordStateTypeCount(event.Entry.ID.Collection.String(), len(pks.entries))
            monitorEntry(event.Entry.ID, false)
    
        default:
            log.Scope.Errorf("Unknown event kind: %v", event.Kind)
            return
        }
        // 更新version
        s.versionCounter++
        pks.version = s.versionCounter
    
        log.Scope.Debugf("In-memory State has changed:\n%v\n", s)
        s.pendingEvents++
        // 通知listener对该collection以已经发生变化
        s.listener.CollectionChanged(event.Entry.ID.Collection)
    }
    
    func (s *State) getResourceTypeState(name resource.Collection) (*resourceTypeState, bool) {
        s.entriesLock.Lock()
        defer s.entriesLock.Unlock()
        // 根据collection找到当前内存中存在的对象 
        // 比如collection是virtualservice 那就是得到内存中所有virtualservice的对象
        pks, found := s.entries[name]
        return pks, found
    }
    

    Handler的主要工作是将当前事件的类型转化成mcp.Resource类型并将其保存到内存中. 那保留在内存中干什么呢? 在s.listener.CollectionChanged(event.Entry.ID.Collection)中会进行处理, 在下面processor中会明白.

    6. processor

    func NewProcessor(src Source, distributor Distributor, cfg *Config) *Processor {
        stateStrategy := publish.NewStrategyWithDefaults()
        return newProcessor(src, cfg, stateStrategy, distributor, nil)
    }
    func newProcessor(
        src Source,
        cfg *Config,
        stateStrategy *publish.Strategy,
        distributor Distributor,
        postProcessHook postProcessHookFn) *Processor {
        now := time.Now()
        p := &Processor{
            stateStrategy:   stateStrategy,
            distributor:     distributor,
            source:          src,
            eventCh:         make(chan resource.Event, 1024),
            postProcessHook: postProcessHook,
            worker:          util.NewWorker("runtime processor", scope),
            lastEventTime:   now,
            fullSyncCond:    sync.NewCond(&sync.Mutex{}),
        }
        stateListener := processing.ListenerFromFn(func(c resource.Collection) {
            if p.distribute {
                stateStrategy.OnChange()
            }
        })
        p.state = newState(cfg, stateListener)
        // 这个暂时可以先不用看 以后分析serviceentry的时候需要
        p.serviceEntryHandler = serviceentry.NewHandler(cfg.DomainSuffix, processing.ListenerFromFn(func(_ resource.Collection) {
            scope.Debug("Processor.process: publish serviceEntry")
            s := p.serviceEntryHandler.BuildSnapshot()
            p.distributor.SetSnapshot(groups.SyntheticServiceEntry, s)
        }))
        p.handler = buildDispatcher(p.state, p.serviceEntryHandler)
        p.seedMesh()
        return p
    }
    

    1. 初始化p.state, 并且传入了listener.
    2. 初始化p.handler, 传入p.state, p.serviceEntryHandler.

    func buildDispatcher(state *State, serviceEntryHandler processing.Handler) *processing.Dispatcher {
        b := processing.NewDispatcherBuilder()
        // 所有注册的crds
        stateSchema := resource.NewSchemaBuilder().RegisterSchema(state.config.Schema).Build()
        for _, spec := range stateSchema.All() {
            b.Add(spec.Collection, state)
        }
        if state.config.SynthesizeServiceEntries {
            for _, spec := range serviceentry.Schema.All() {
                b.Add(spec.Collection, serviceEntryHandler)
            }
        }
        return b.Build()
    }
    

    可以看到每个collection都有一个基本的handler, 就是传进来的p.state.

    6.1 Start方法

    func (p *Processor) Start() error {
        // 启动方法
        setupFn := func() error {
            err := p.source.Start(func(e resource.Event) {
                // 将事件e传给管道p.eventCh
                p.eventCh <- e
            })
            if err != nil {
                return fmt.Errorf("runtime unable to Start source: %v", err)
            }
            return nil
        }
        // 运行方法
        runFn := func(ctx context.Context) {
            scope.Info("Starting processor...")
            defer func() {
                scope.Debugf("Process.process: Exiting worker thread")
                p.source.Stop()
                close(p.eventCh)
                p.stateStrategy.Close()
            }()
    
            scope.Debug("Starting process loop")
    
            for {
                select {
                case <-ctx.Done():
                    // Graceful termination.
                    scope.Debug("Processor.process: done")
                    return
                case e := <-p.eventCh:
                    // 从管道p.eventCh中取出要处理的事件
                    p.processEvent(e)
                case <-p.stateStrategy.Publish:
                    scope.Debug("Processor.process: publish")
                    // 将当前state对象内存中保存的对象建立一个快照
                    s := p.state.buildSnapshot()
                    // 该快照将交由distributor处理
                    p.distributor.SetSnapshot(groups.Default, s)
                }
    
                if p.postProcessHook != nil {
                    p.postProcessHook()
                }
            }
        }
        // 通过工具类来运行这两个方法
        return p.worker.Start(setupFn, runFn)
    }
    

    再看processEvent方法

    func (p *Processor) processEvent(e resource.Event) {
        if scope.DebugEnabled() {
            scope.Debugf("Incoming source event: %v", e)
        }
        p.recordEvent()
    
        if e.Kind == resource.FullSync {
            scope.Infof("Synchronization is complete, starting distribution.")
    
            p.fullSyncCond.L.Lock()
            // 把distribute设置为true
            p.distribute = true
            p.fullSyncCond.Broadcast()
            p.fullSyncCond.L.Unlock()
            // 这个将会触发runFn中的<-p.stateStrategy.Publish
            p.stateStrategy.OnChange()
            return
        }
        // 将该event交由dispatcher处理
        // 现在可以理解为就是p.state来处理, 原因p.handler就是一个dispatcher
        // dispatcher里面每一个collection都注册了一个p.state这样的handler
        p.handler.Handle(e)
    }
    

    1. 如果是FullSync, 也就是第一次做同步, 有两个动作:

    1.1 将p.distribute设置为true. 现在回头来看一下newProcessor方法.

    // processor.go
    ...
    stateListener := processing.ListenerFromFn(func(c resource.Collection) {
            // When the state indicates a change occurred, update the publishing strategy
            if p.distribute {
                stateStrategy.OnChange()
            }
        })
    ...
    // state.go中的Handler方法
    func (s *State) Handle(event resource.Event) {
    ...
        // 通知listener对该collection以已经发生变化
        s.listener.CollectionChanged(event.Entry.ID.Collection)
    ...
    }
    

    所以当p.distribute=true时将调用stateStrategy.OnChange()这个时候就会触发到Processor的Start()方法中的<-p.stateStrategy.Publish:进而调用p.state.buildSnapshot()生成当前内存快照交由p.distributor处理. 这部分在分析mcp的时候会涉及到.

    图片.png

    1.2 通过p.stateStrategy.OnChange()触发<-p.stateStrategy.Publish.

    1. 调用p.handler.Handle(e)方法, 目前可以理解为调用p.state.Handle(e), 因为p.handlerp.dispatcher并且为每个collection注册了p.statehandler方法.

    6.2 buildSnapshot

    按照state.entries中的内容创建一个内存快照.

    // 返回snapshot.Snapshot
    func (s *State) buildSnapshot() snapshot.Snapshot {
        s.entriesLock.Lock()
        defer s.entriesLock.Unlock()
    
        now := time.Now()
        monitoring.RecordProcessorSnapshotPublished(s.pendingEvents, now.Sub(s.lastSnapshotTime))
        s.lastSnapshotTime = now
        // 创建快照
        b := snapshot.NewInMemoryBuilder()
    
        for collection, state := range s.entries {
            entries := make([]*mcp.Resource, 0, len(state.entries))
            for _, entry := range state.entries {
                entries = append(entries, entry)
            }
            version := fmt.Sprintf("%d", state.version)
            b.Set(collection.String(), version, entries)
        }
    
        // Build entities that are derived from existing ones.
        s.buildProjections(b)
        // 将pendingEvents清空
        sn := b.Build()
        s.pendingEvents = 0
        return sn
    }
    
    func (s *State) buildProjections(b *snapshot.InMemoryBuilder) {
        s.buildIngressProjectionResources(b)
    }
    
    func (s *State) buildIngressProjectionResources(b *snapshot.InMemoryBuilder) {
        ingressByHost := make(map[string]resource.Entry)
        // Build ingress projections
        state := s.entries[metadata.K8sExtensionsV1beta1Ingresses.Collection]
        if state == nil || len(state.entries) == 0 {
            return
        }
        ...
    }
    

    7. 总结

    上流: sourcek8s或者fs中读取信息并整理成event.
    处理: 将source中的事件event放入p.eventch, 并且processEventp.eventCh中读取, 将信息保存在内存中, 然后生成快照.
    下流: 将生成的快照交由p.distributor处理.

    图片.png

    8. 参考

    1. istio 1.3.6源码
    2. https://cloud.tencent.com/developer/article/1409159

    相关文章

      网友评论

          本文标题:[istio源码分析][galley] galley之runti

          本文链接:https://www.haomeiwen.com/subject/jkdnzctx.html