美文网首页
Kubernetes源码分析 -- Kubelet配置文件加载

Kubernetes源码分析 -- Kubelet配置文件加载

作者: 何约什 | 来源:发表于2018-12-21 15:17 被阅读46次

前面分析过Kubernetes的资源的序列化、编解码、版本转换功能,该功能除了用于消息的传输,还能用于加载配置文件。kubelet在启动过程中,可以通过参数来指定各个配置项,而kubernetes从1.10开始,倾向于让我们基于配置文件来带入这些参数。本文主要研究一下,Kubelet配置文件的加载原理,从而再次重新温习一下编解码内容。

Kubelet的启动

Kubelet的启动也使用了Cobra框架,用来解析命令行参数,并执行对应的方法,当解析出参数中,如果指定了配置文件,那就需要加载配置文件,从而进而本文的主题。

            if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
                kubeletConfig, err = loadConfigFile(configFile)
                if err != nil {
                    glog.Fatal(err)
                }
                                ......
                      }

编解码初始化

加载配置文件在loadConfigFile方法中,这个方法比较简单,这里就不列举,它使用了一个配置文件加载器,加载器的实例的生成代码在:k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/configfiles/configfiles.go

func NewFsLoader(fs utilfs.Filesystem, kubeletFile string) (Loader, error) {
    _, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs()
    if err != nil {
        return nil, err
    }

    return &fsLoader{
        fs:            fs,
        kubeletCodecs: kubeletCodecs,
        kubeletFile:   kubeletFile,
    }, nil
}

这里我们看到了熟悉的Scheme和Codecs,这也是配置文件解析的关键,NewSchemeAndCodecs代码在:k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/schema/scheme.go

func NewSchemeAndCodecs() (*runtime.Scheme, *serializer.CodecFactory, error) {
    scheme := runtime.NewScheme()
    if err := kubeletconfig.AddToScheme(scheme); err != nil {
        return nil, nil, err
    }
    if err := v1alpha1.AddToScheme(scheme); err != nil {
        return nil, nil, err
    }
    codecs := serializer.NewCodecFactory(scheme)
    return scheme, &codecs, nil
}

可以看到,我们总共调用了kubeletconfig.AddToScheme和v1alpha1.AddToScheme,用来把对应版本的资源添加到Scheme中,

  • kubeletconfig.AddToScheme,代码位于:k8s/kubernetes/pkg/kubelet/apis/kuberconfig/register.go。
var (
    SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
    AddToScheme   = SchemeBuilder.AddToScheme
)

var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: runtime.APIVersionInternal}
......
func addKnownTypes(scheme *runtime.Scheme) error {
    // TODO this will get cleaned up with the scheme types are fixed
    scheme.AddKnownTypes(SchemeGroupVersion,
        &KubeletConfiguration{},
    )
    return nil
}

从代码中可以看出,添加了KubeletConfiguration这样的类型到Scheme中,对应的版本为GroupVersion为
{“kuberletconfig”, "__internal"}

  • kubeletconfig.AddToScheme,代码位于:k8s/kubernetes/pkg/kubelet/apis/kuberconfig/v1alpha1/register.go。
var (
    // TODO: move SchemeBuilder with zz_generated.deepcopy.go to k8s.io/api.
    // localSchemeBuilder and AddToScheme will stay in k8s.io/kubernetes.
    SchemeBuilder      runtime.SchemeBuilder
    localSchemeBuilder = &SchemeBuilder
    AddToScheme        = localSchemeBuilder.AddToScheme
)

var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha1"}

// 自动调用
func init() {
    // We only register manually written functions here. The registration of the
    // generated functions takes place in the generated files. The separation
    // makes the code compile even when the generated files are missing.
    localSchemeBuilder.Register(addKnownTypes, addDefaultingFuncs)
}

func addKnownTypes(scheme *runtime.Scheme) error {
    scheme.AddKnownTypes(SchemeGroupVersion,
        &KubeletConfiguration{},
    )
    return nil
}

这里添加的GroupVersion为{"kubeletconfig", "v1alpha1"},对象类型仍然为KubeletConfiguration。

另外前面在调用localSchemaBuilder.Register(addKnownTypes, addDefaultingFuncs)还有后者,就是缺省函数,缺省函数负责初始化空对象,让它具备初始值,代码如下所示:

func addDefaultingFuncs(scheme *kruntime.Scheme) error {
    return RegisterDefaults(scheme)
}
// RegisterDefaults adds defaulters functions to the given scheme.
// Public to allow building arbitrary schemes.
// All generated defaulters are covering - they call all nested defaulters.
func RegisterDefaults(scheme *runtime.Scheme) error {
    scheme.AddTypeDefaultingFunc(&KubeletConfiguration{}, func(obj interface{}) { SetObjectDefaults_KubeletConfiguration(obj.(*KubeletConfiguration)) })
    return nil
}

func SetObjectDefaults_KubeletConfiguration(in *KubeletConfiguration) {
    SetDefaults_KubeletConfiguration(in)
}


从上面可以看出KubeletConfiguration的缺省值设置函数为:SetObjectDefaults_KubeletConfiguration,该函数调用了SetDefaults_KubeletConfiguration方法,所以KubeletConfiguration对象的缺省值最终会通过调用SetDefaults_KubeletConfiguration来完成,该方法可以通过scheme.Default方法来调用来触发。

这里我们在刚开始创建KubeletConfiguration的实例时会创建。

// NewKubeletConfiguration will create a new KubeletConfiguration with default values
func NewKubeletConfiguration() (*kubeletconfig.KubeletConfiguration, error) {
    scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
    if err != nil {
        return nil, err
    }
    versioned := &v1alpha1.KubeletConfiguration{}  // 输入带版本的对象实例
    scheme.Default(versioned)  // 缺省值,为什么要这么做呢?因为只有v1alpha1这个版本才设置了缺省函数。
    config := &kubeletconfig.KubeletConfiguration{}
    if err := scheme.Convert(versioned, config, nil); err != nil {  // 转换成累内部版本
        return nil, err
    }
    return config, nil
}

加载配置文件

梳理完全面编解码的生成过程以后,下面来分析配置文件的加载就很简单了,代码如下:

func (loader *fsLoader) Load() (*kubeletconfig.KubeletConfiguration, error) {
    data, err := loader.fs.ReadFile(loader.kubeletFile) // 读取kubelet配置文件为二进制数据
    if err != nil {
        return nil, fmt.Errorf("failed to read kubelet config file %q, error: %v", loader.kubeletFile, err)
    }

    // no configuration is an error, some parameters are required
    if len(data) == 0 {
        return nil, fmt.Errorf("kubelet config file %q was empty", loader.kubeletFile)
    }

    kc, err := utilcodec.DecodeKubeletConfiguration(loader.kubeletCodecs, data)  // 使用前面构建的编解码器解码。
    if err != nil {
        return nil, err
    }

    // make all paths absolute
    resolveRelativePaths(kubeletconfig.KubeletConfigurationPathRefs(kc), filepath.Dir(loader.kubeletFile))
    return kc, nil
}

前面调用了utilcodec.DecodeKubeletConfiguration来完成解码的过程,这里看看具体的代码:

// DecodeKubeletConfiguration decodes a serialized KubeletConfiguration to the internal type
func DecodeKubeletConfiguration(kubeletCodecs *serializer.CodecFactory, data []byte) (*kubeletconfig.KubeletConfiguration, error) {
    // the UniversalDecoder runs defaulting and returns the ** internal type ** by default

        // 这里UniversionDecoder有没有没有带参数,将会解码为Internel版本
        // 采用Universal能够自动采用合适的编解码器来进行解码
    obj, gvk, err := kubeletCodecs.UniversalDecoder().Decode(data, nil, nil)
    if err != nil {
        return nil, fmt.Errorf("failed to decode, error: %v", err)
    }

    internalKC, ok := obj.(*kubeletconfig.KubeletConfiguration)
    if !ok {
        return nil, fmt.Errorf("failed to cast object to KubeletConfiguration, unexpected type: %v", gvk)
    }

    return internalKC, nil
}

相关文章

网友评论

      本文标题:Kubernetes源码分析 -- Kubelet配置文件加载

      本文链接:https://www.haomeiwen.com/subject/vxgitqtx.html