美文网首页
[istio源码分析][galley] galley之runti

[istio源码分析][galley] galley之runti

作者: nicktming | 来源:发表于2020-01-24 18:00 被阅读0次

1. 前言

转载请说明原文出处, 尊重他人劳动成果!

源码位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)

1. [istio源码分析][galley] galley之上游(source)
2. [istio源码分析][galley] galley之runtime
3. [istio源码分析][galley] galley之下游(mcp)

2. runtime

本文将着重分析galley中一个承上启下的component.

[root@master pkg]# pwd
/root/go/src/istio.io/istio/galley/pkg
[root@master pkg]# tree -L 1
.
├── authplugin
├── authplugins
├── config
├── crd
├── meshconfig
├── metadata
├── runtime
├── server
├── source
├── testing
└── util

11 directories, 0 files
[root@master pkg]# 

source会产生事件的源头, runtime负责接收source中的事件并交给下游处理. 本文的重点将放到runtime中.

3. source

// Source to be implemented by a source configuration provider.
type Source interface {
    // 开始方法 对k8s而言 就是开始监控一些crd资源交由handler处理
    Start(handler resource.EventHandler) error
    // 停止监控
    Stop()
}

4. processing

4.1 Dispatcher 和 handler

type Handler interface {
    Handle(e resource.Event)
}

type Dispatcher struct {
    handlers map[resource.Collection][]Handler
}
// Dispatcher是一个Handler的实现类
// 并且针对每一种collection 都有其对应的一系列handler
func (d *Dispatcher) Handle(e resource.Event) {
    handlers, found := d.handlers[e.Entry.ID.Collection]
    if !found {
        scope.Warnf("Unhandled resource event: %v", e)
        return
    }

    for _, h := range handlers {
        h.Handle(e)
    }
}

可以看到Dispatcher是以collection为类, 每个collection都有其对应的Handler数组. (collection就是一些crd的名字, 比如istio/networking/v1alpha3/virtualservices)

Dispatcher实现了Handler接口, 针对每一个event会找到其collection所有的handler一个个进行处理.

4.2 Listener

// Listener gets notified when resource of a given collection has changed.
type Listener interface {
    CollectionChanged(c resource.Collection)
}

当某一个collection发生变化时会触发该方法

5. state

// 记录着内存状态的galley
type State struct {
    listener processing.Listener
    config *Config
    // version counter is a nonce that generates unique ids for each updated view of State.
    versionCounter int64
    // entries for per-message-type State.
    entriesLock sync.Mutex
    entries     map[resource.Collection]*resourceTypeState

    // Virtual version numbers for Gateways & VirtualServices for Ingress projected ones
    ingressGWVersion   int64
    ingressVSVersion   int64
    lastIngressVersion int64
    // 等待被发布的事件个数
    pendingEvents int64
    // 上一次发布的时间
    lastSnapshotTime time.Time
}
type resourceTypeState struct {
    // 当前状态的version
    version  int64
    entries  map[resource.FullName]*mcp.Resource
    versions map[resource.FullName]resource.Version
}

5.1 Handler方法

func (s *State) Handle(event resource.Event) {
    pks, found := s.getResourceTypeState(event.Entry.ID.Collection)
    if !found {
        return
    }
    switch event.Kind {
    case resource.Added, resource.Updated:
        // Check to see if the version has changed.
        if curVersion := pks.versions[event.Entry.ID.FullName]; curVersion == event.Entry.ID.Version {
            log.Scope.Debugf("Received event for the current, known version: %v", event)
            return
        }
        // 将事件的entry转成mcp.Resource类型
        entry, ok := s.toResource(event.Entry)
        if !ok {
            return
        }
        // 保存当前对象内存中的值以及版本
        pks.entries[event.Entry.ID.FullName] = entry
        pks.versions[event.Entry.ID.FullName] = event.Entry.ID.Version
        monitoring.RecordStateTypeCount(event.Entry.ID.Collection.String(), len(pks.entries))
        monitorEntry(event.Entry.ID, true)

    case resource.Deleted:
        // 删除当前对象内存中的值以及版本
        delete(pks.entries, event.Entry.ID.FullName)
        delete(pks.versions, event.Entry.ID.FullName)
        monitoring.RecordStateTypeCount(event.Entry.ID.Collection.String(), len(pks.entries))
        monitorEntry(event.Entry.ID, false)

    default:
        log.Scope.Errorf("Unknown event kind: %v", event.Kind)
        return
    }
    // 更新version
    s.versionCounter++
    pks.version = s.versionCounter

    log.Scope.Debugf("In-memory State has changed:\n%v\n", s)
    s.pendingEvents++
    // 通知listener对该collection以已经发生变化
    s.listener.CollectionChanged(event.Entry.ID.Collection)
}

func (s *State) getResourceTypeState(name resource.Collection) (*resourceTypeState, bool) {
    s.entriesLock.Lock()
    defer s.entriesLock.Unlock()
    // 根据collection找到当前内存中存在的对象 
    // 比如collection是virtualservice 那就是得到内存中所有virtualservice的对象
    pks, found := s.entries[name]
    return pks, found
}

Handler的主要工作是将当前事件的类型转化成mcp.Resource类型并将其保存到内存中. 那保留在内存中干什么呢? 在s.listener.CollectionChanged(event.Entry.ID.Collection)中会进行处理, 在下面processor中会明白.

6. processor

func NewProcessor(src Source, distributor Distributor, cfg *Config) *Processor {
    stateStrategy := publish.NewStrategyWithDefaults()
    return newProcessor(src, cfg, stateStrategy, distributor, nil)
}
func newProcessor(
    src Source,
    cfg *Config,
    stateStrategy *publish.Strategy,
    distributor Distributor,
    postProcessHook postProcessHookFn) *Processor {
    now := time.Now()
    p := &Processor{
        stateStrategy:   stateStrategy,
        distributor:     distributor,
        source:          src,
        eventCh:         make(chan resource.Event, 1024),
        postProcessHook: postProcessHook,
        worker:          util.NewWorker("runtime processor", scope),
        lastEventTime:   now,
        fullSyncCond:    sync.NewCond(&sync.Mutex{}),
    }
    stateListener := processing.ListenerFromFn(func(c resource.Collection) {
        if p.distribute {
            stateStrategy.OnChange()
        }
    })
    p.state = newState(cfg, stateListener)
    // 这个暂时可以先不用看 以后分析serviceentry的时候需要
    p.serviceEntryHandler = serviceentry.NewHandler(cfg.DomainSuffix, processing.ListenerFromFn(func(_ resource.Collection) {
        scope.Debug("Processor.process: publish serviceEntry")
        s := p.serviceEntryHandler.BuildSnapshot()
        p.distributor.SetSnapshot(groups.SyntheticServiceEntry, s)
    }))
    p.handler = buildDispatcher(p.state, p.serviceEntryHandler)
    p.seedMesh()
    return p
}

1. 初始化p.state, 并且传入了listener.
2. 初始化p.handler, 传入p.state, p.serviceEntryHandler.

func buildDispatcher(state *State, serviceEntryHandler processing.Handler) *processing.Dispatcher {
    b := processing.NewDispatcherBuilder()
    // 所有注册的crds
    stateSchema := resource.NewSchemaBuilder().RegisterSchema(state.config.Schema).Build()
    for _, spec := range stateSchema.All() {
        b.Add(spec.Collection, state)
    }
    if state.config.SynthesizeServiceEntries {
        for _, spec := range serviceentry.Schema.All() {
            b.Add(spec.Collection, serviceEntryHandler)
        }
    }
    return b.Build()
}

可以看到每个collection都有一个基本的handler, 就是传进来的p.state.

6.1 Start方法

func (p *Processor) Start() error {
    // 启动方法
    setupFn := func() error {
        err := p.source.Start(func(e resource.Event) {
            // 将事件e传给管道p.eventCh
            p.eventCh <- e
        })
        if err != nil {
            return fmt.Errorf("runtime unable to Start source: %v", err)
        }
        return nil
    }
    // 运行方法
    runFn := func(ctx context.Context) {
        scope.Info("Starting processor...")
        defer func() {
            scope.Debugf("Process.process: Exiting worker thread")
            p.source.Stop()
            close(p.eventCh)
            p.stateStrategy.Close()
        }()

        scope.Debug("Starting process loop")

        for {
            select {
            case <-ctx.Done():
                // Graceful termination.
                scope.Debug("Processor.process: done")
                return
            case e := <-p.eventCh:
                // 从管道p.eventCh中取出要处理的事件
                p.processEvent(e)
            case <-p.stateStrategy.Publish:
                scope.Debug("Processor.process: publish")
                // 将当前state对象内存中保存的对象建立一个快照
                s := p.state.buildSnapshot()
                // 该快照将交由distributor处理
                p.distributor.SetSnapshot(groups.Default, s)
            }

            if p.postProcessHook != nil {
                p.postProcessHook()
            }
        }
    }
    // 通过工具类来运行这两个方法
    return p.worker.Start(setupFn, runFn)
}

再看processEvent方法

func (p *Processor) processEvent(e resource.Event) {
    if scope.DebugEnabled() {
        scope.Debugf("Incoming source event: %v", e)
    }
    p.recordEvent()

    if e.Kind == resource.FullSync {
        scope.Infof("Synchronization is complete, starting distribution.")

        p.fullSyncCond.L.Lock()
        // 把distribute设置为true
        p.distribute = true
        p.fullSyncCond.Broadcast()
        p.fullSyncCond.L.Unlock()
        // 这个将会触发runFn中的<-p.stateStrategy.Publish
        p.stateStrategy.OnChange()
        return
    }
    // 将该event交由dispatcher处理
    // 现在可以理解为就是p.state来处理, 原因p.handler就是一个dispatcher
    // dispatcher里面每一个collection都注册了一个p.state这样的handler
    p.handler.Handle(e)
}

1. 如果是FullSync, 也就是第一次做同步, 有两个动作:

1.1 将p.distribute设置为true. 现在回头来看一下newProcessor方法.

// processor.go
...
stateListener := processing.ListenerFromFn(func(c resource.Collection) {
        // When the state indicates a change occurred, update the publishing strategy
        if p.distribute {
            stateStrategy.OnChange()
        }
    })
...
// state.go中的Handler方法
func (s *State) Handle(event resource.Event) {
...
    // 通知listener对该collection以已经发生变化
    s.listener.CollectionChanged(event.Entry.ID.Collection)
...
}

所以当p.distribute=true时将调用stateStrategy.OnChange()这个时候就会触发到Processor的Start()方法中的<-p.stateStrategy.Publish:进而调用p.state.buildSnapshot()生成当前内存快照交由p.distributor处理. 这部分在分析mcp的时候会涉及到.

图片.png

1.2 通过p.stateStrategy.OnChange()触发<-p.stateStrategy.Publish.

  1. 调用p.handler.Handle(e)方法, 目前可以理解为调用p.state.Handle(e), 因为p.handlerp.dispatcher并且为每个collection注册了p.statehandler方法.

6.2 buildSnapshot

按照state.entries中的内容创建一个内存快照.

// 返回snapshot.Snapshot
func (s *State) buildSnapshot() snapshot.Snapshot {
    s.entriesLock.Lock()
    defer s.entriesLock.Unlock()

    now := time.Now()
    monitoring.RecordProcessorSnapshotPublished(s.pendingEvents, now.Sub(s.lastSnapshotTime))
    s.lastSnapshotTime = now
    // 创建快照
    b := snapshot.NewInMemoryBuilder()

    for collection, state := range s.entries {
        entries := make([]*mcp.Resource, 0, len(state.entries))
        for _, entry := range state.entries {
            entries = append(entries, entry)
        }
        version := fmt.Sprintf("%d", state.version)
        b.Set(collection.String(), version, entries)
    }

    // Build entities that are derived from existing ones.
    s.buildProjections(b)
    // 将pendingEvents清空
    sn := b.Build()
    s.pendingEvents = 0
    return sn
}

func (s *State) buildProjections(b *snapshot.InMemoryBuilder) {
    s.buildIngressProjectionResources(b)
}

func (s *State) buildIngressProjectionResources(b *snapshot.InMemoryBuilder) {
    ingressByHost := make(map[string]resource.Entry)
    // Build ingress projections
    state := s.entries[metadata.K8sExtensionsV1beta1Ingresses.Collection]
    if state == nil || len(state.entries) == 0 {
        return
    }
    ...
}

7. 总结

上流: sourcek8s或者fs中读取信息并整理成event.
处理: 将source中的事件event放入p.eventch, 并且processEventp.eventCh中读取, 将信息保存在内存中, 然后生成快照.
下流: 将生成的快照交由p.distributor处理.

图片.png

8. 参考

1. istio 1.3.6源码
2. https://cloud.tencent.com/developer/article/1409159

相关文章

网友评论

      本文标题:[istio源码分析][galley] galley之runti

      本文链接:https://www.haomeiwen.com/subject/jkdnzctx.html