1. 前言
转载请说明原文出处, 尊重他人劳动成果!
源码位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)
1. [istio源码分析][galley] galley之上游(source)
2. [istio源码分析][galley] galley之runtime
3. [istio源码分析][galley] galley之下游(mcp)
2. runtime
本文将着重分析
galley
中一个承上启下的component
.
[root@master pkg]# pwd
/root/go/src/istio.io/istio/galley/pkg
[root@master pkg]# tree -L 1
.
├── authplugin
├── authplugins
├── config
├── crd
├── meshconfig
├── metadata
├── runtime
├── server
├── source
├── testing
└── util
11 directories, 0 files
[root@master pkg]#
source
会产生事件的源头,runtime
负责接收source
中的事件并交给下游处理. 本文的重点将放到runtime
中.
3. source
// Source to be implemented by a source configuration provider.
type Source interface {
// 开始方法 对k8s而言 就是开始监控一些crd资源交由handler处理
Start(handler resource.EventHandler) error
// 停止监控
Stop()
}
4. processing
4.1 Dispatcher 和 handler
type Handler interface {
Handle(e resource.Event)
}
type Dispatcher struct {
handlers map[resource.Collection][]Handler
}
// Dispatcher是一个Handler的实现类
// 并且针对每一种collection 都有其对应的一系列handler
func (d *Dispatcher) Handle(e resource.Event) {
handlers, found := d.handlers[e.Entry.ID.Collection]
if !found {
scope.Warnf("Unhandled resource event: %v", e)
return
}
for _, h := range handlers {
h.Handle(e)
}
}
可以看到
Dispatcher
是以collection
为类, 每个collection
都有其对应的Handler
数组. (collection
就是一些crd
的名字, 比如istio/networking/v1alpha3/virtualservices
)
Dispatcher
实现了Handler
接口, 针对每一个event
会找到其collection
所有的handler
一个个进行处理.
4.2 Listener
// Listener gets notified when resource of a given collection has changed.
type Listener interface {
CollectionChanged(c resource.Collection)
}
当某一个
collection
发生变化时会触发该方法
5. state
// 记录着内存状态的galley
type State struct {
listener processing.Listener
config *Config
// version counter is a nonce that generates unique ids for each updated view of State.
versionCounter int64
// entries for per-message-type State.
entriesLock sync.Mutex
entries map[resource.Collection]*resourceTypeState
// Virtual version numbers for Gateways & VirtualServices for Ingress projected ones
ingressGWVersion int64
ingressVSVersion int64
lastIngressVersion int64
// 等待被发布的事件个数
pendingEvents int64
// 上一次发布的时间
lastSnapshotTime time.Time
}
type resourceTypeState struct {
// 当前状态的version
version int64
entries map[resource.FullName]*mcp.Resource
versions map[resource.FullName]resource.Version
}
5.1 Handler方法
func (s *State) Handle(event resource.Event) {
pks, found := s.getResourceTypeState(event.Entry.ID.Collection)
if !found {
return
}
switch event.Kind {
case resource.Added, resource.Updated:
// Check to see if the version has changed.
if curVersion := pks.versions[event.Entry.ID.FullName]; curVersion == event.Entry.ID.Version {
log.Scope.Debugf("Received event for the current, known version: %v", event)
return
}
// 将事件的entry转成mcp.Resource类型
entry, ok := s.toResource(event.Entry)
if !ok {
return
}
// 保存当前对象内存中的值以及版本
pks.entries[event.Entry.ID.FullName] = entry
pks.versions[event.Entry.ID.FullName] = event.Entry.ID.Version
monitoring.RecordStateTypeCount(event.Entry.ID.Collection.String(), len(pks.entries))
monitorEntry(event.Entry.ID, true)
case resource.Deleted:
// 删除当前对象内存中的值以及版本
delete(pks.entries, event.Entry.ID.FullName)
delete(pks.versions, event.Entry.ID.FullName)
monitoring.RecordStateTypeCount(event.Entry.ID.Collection.String(), len(pks.entries))
monitorEntry(event.Entry.ID, false)
default:
log.Scope.Errorf("Unknown event kind: %v", event.Kind)
return
}
// 更新version
s.versionCounter++
pks.version = s.versionCounter
log.Scope.Debugf("In-memory State has changed:\n%v\n", s)
s.pendingEvents++
// 通知listener对该collection以已经发生变化
s.listener.CollectionChanged(event.Entry.ID.Collection)
}
func (s *State) getResourceTypeState(name resource.Collection) (*resourceTypeState, bool) {
s.entriesLock.Lock()
defer s.entriesLock.Unlock()
// 根据collection找到当前内存中存在的对象
// 比如collection是virtualservice 那就是得到内存中所有virtualservice的对象
pks, found := s.entries[name]
return pks, found
}
Handler
的主要工作是将当前事件的类型转化成mcp.Resource
类型并将其保存到内存中. 那保留在内存中干什么呢? 在s.listener.CollectionChanged(event.Entry.ID.Collection)
中会进行处理, 在下面processor
中会明白.
6. processor
func NewProcessor(src Source, distributor Distributor, cfg *Config) *Processor {
stateStrategy := publish.NewStrategyWithDefaults()
return newProcessor(src, cfg, stateStrategy, distributor, nil)
}
func newProcessor(
src Source,
cfg *Config,
stateStrategy *publish.Strategy,
distributor Distributor,
postProcessHook postProcessHookFn) *Processor {
now := time.Now()
p := &Processor{
stateStrategy: stateStrategy,
distributor: distributor,
source: src,
eventCh: make(chan resource.Event, 1024),
postProcessHook: postProcessHook,
worker: util.NewWorker("runtime processor", scope),
lastEventTime: now,
fullSyncCond: sync.NewCond(&sync.Mutex{}),
}
stateListener := processing.ListenerFromFn(func(c resource.Collection) {
if p.distribute {
stateStrategy.OnChange()
}
})
p.state = newState(cfg, stateListener)
// 这个暂时可以先不用看 以后分析serviceentry的时候需要
p.serviceEntryHandler = serviceentry.NewHandler(cfg.DomainSuffix, processing.ListenerFromFn(func(_ resource.Collection) {
scope.Debug("Processor.process: publish serviceEntry")
s := p.serviceEntryHandler.BuildSnapshot()
p.distributor.SetSnapshot(groups.SyntheticServiceEntry, s)
}))
p.handler = buildDispatcher(p.state, p.serviceEntryHandler)
p.seedMesh()
return p
}
1. 初始化
p.state
, 并且传入了listener
.
2. 初始化p.handler
, 传入p.state
,p.serviceEntryHandler
.
func buildDispatcher(state *State, serviceEntryHandler processing.Handler) *processing.Dispatcher {
b := processing.NewDispatcherBuilder()
// 所有注册的crds
stateSchema := resource.NewSchemaBuilder().RegisterSchema(state.config.Schema).Build()
for _, spec := range stateSchema.All() {
b.Add(spec.Collection, state)
}
if state.config.SynthesizeServiceEntries {
for _, spec := range serviceentry.Schema.All() {
b.Add(spec.Collection, serviceEntryHandler)
}
}
return b.Build()
}
可以看到每个
collection
都有一个基本的handler
, 就是传进来的p.state
.
6.1 Start方法
func (p *Processor) Start() error {
// 启动方法
setupFn := func() error {
err := p.source.Start(func(e resource.Event) {
// 将事件e传给管道p.eventCh
p.eventCh <- e
})
if err != nil {
return fmt.Errorf("runtime unable to Start source: %v", err)
}
return nil
}
// 运行方法
runFn := func(ctx context.Context) {
scope.Info("Starting processor...")
defer func() {
scope.Debugf("Process.process: Exiting worker thread")
p.source.Stop()
close(p.eventCh)
p.stateStrategy.Close()
}()
scope.Debug("Starting process loop")
for {
select {
case <-ctx.Done():
// Graceful termination.
scope.Debug("Processor.process: done")
return
case e := <-p.eventCh:
// 从管道p.eventCh中取出要处理的事件
p.processEvent(e)
case <-p.stateStrategy.Publish:
scope.Debug("Processor.process: publish")
// 将当前state对象内存中保存的对象建立一个快照
s := p.state.buildSnapshot()
// 该快照将交由distributor处理
p.distributor.SetSnapshot(groups.Default, s)
}
if p.postProcessHook != nil {
p.postProcessHook()
}
}
}
// 通过工具类来运行这两个方法
return p.worker.Start(setupFn, runFn)
}
再看
processEvent
方法
func (p *Processor) processEvent(e resource.Event) {
if scope.DebugEnabled() {
scope.Debugf("Incoming source event: %v", e)
}
p.recordEvent()
if e.Kind == resource.FullSync {
scope.Infof("Synchronization is complete, starting distribution.")
p.fullSyncCond.L.Lock()
// 把distribute设置为true
p.distribute = true
p.fullSyncCond.Broadcast()
p.fullSyncCond.L.Unlock()
// 这个将会触发runFn中的<-p.stateStrategy.Publish
p.stateStrategy.OnChange()
return
}
// 将该event交由dispatcher处理
// 现在可以理解为就是p.state来处理, 原因p.handler就是一个dispatcher
// dispatcher里面每一个collection都注册了一个p.state这样的handler
p.handler.Handle(e)
}
1. 如果是
FullSync
, 也就是第一次做同步, 有两个动作:1.1 将
p.distribute
设置为true
. 现在回头来看一下newProcessor
方法.
// processor.go
...
stateListener := processing.ListenerFromFn(func(c resource.Collection) {
// When the state indicates a change occurred, update the publishing strategy
if p.distribute {
stateStrategy.OnChange()
}
})
...
// state.go中的Handler方法
func (s *State) Handle(event resource.Event) {
...
// 通知listener对该collection以已经发生变化
s.listener.CollectionChanged(event.Entry.ID.Collection)
...
}
所以当
图片.pngp.distribute=true
时将调用stateStrategy.OnChange()
这个时候就会触发到Processor的Start()
方法中的<-p.stateStrategy.Publish:
进而调用p.state.buildSnapshot()
生成当前内存快照交由p.distributor
处理. 这部分在分析mcp
的时候会涉及到.
1.2 通过
p.stateStrategy.OnChange()
触发<-p.stateStrategy.Publish
.
- 调用
p.handler.Handle(e)
方法, 目前可以理解为调用p.state.Handle(e)
, 因为p.handler
是p.dispatcher
并且为每个collection
注册了p.state
该handler
方法.
6.2 buildSnapshot
按照
state.entries
中的内容创建一个内存快照.
// 返回snapshot.Snapshot
func (s *State) buildSnapshot() snapshot.Snapshot {
s.entriesLock.Lock()
defer s.entriesLock.Unlock()
now := time.Now()
monitoring.RecordProcessorSnapshotPublished(s.pendingEvents, now.Sub(s.lastSnapshotTime))
s.lastSnapshotTime = now
// 创建快照
b := snapshot.NewInMemoryBuilder()
for collection, state := range s.entries {
entries := make([]*mcp.Resource, 0, len(state.entries))
for _, entry := range state.entries {
entries = append(entries, entry)
}
version := fmt.Sprintf("%d", state.version)
b.Set(collection.String(), version, entries)
}
// Build entities that are derived from existing ones.
s.buildProjections(b)
// 将pendingEvents清空
sn := b.Build()
s.pendingEvents = 0
return sn
}
func (s *State) buildProjections(b *snapshot.InMemoryBuilder) {
s.buildIngressProjectionResources(b)
}
func (s *State) buildIngressProjectionResources(b *snapshot.InMemoryBuilder) {
ingressByHost := make(map[string]resource.Entry)
// Build ingress projections
state := s.entries[metadata.K8sExtensionsV1beta1Ingresses.Collection]
if state == nil || len(state.entries) == 0 {
return
}
...
}
7. 总结
上流:
source
从k8s
或者fs
中读取信息并整理成event
.
处理: 将source
中的事件event
放入p.eventch
, 并且processEvent
从p.eventCh
中读取, 将信息保存在内存中, 然后生成快照.
下流: 将生成的快照交由p.distributor
处理.
图片.png
8. 参考
1.
istio 1.3.6源码
2. https://cloud.tencent.com/developer/article/1409159
网友评论