美文网首页
client go的token如何做到热更新

client go的token如何做到热更新

作者: wwq2020 | 来源:发表于2023-12-15 17:47 被阅读0次

背景

集群内client访问apiserver的方式常见的是使用Token,而此token来源于service account
一种是永久性的,存在泄漏的风险
一种是kubelet通过service account的CreateToken接口创建token,然后挂载到pod内,此token会关联到pod,且有过期时间,kubelet定期更新,client go会定期加载tokenfile

源码

kubelet

参见https://www.jianshu.com/p/425e9ce6103d中如何触发SetUp,此处RequiresRemount为true

简单总结就是projectedVolumeMounter-> kubeletVolumeHost-> tokenManager->apiserver

pkg/volume/projected/projected.go 中

func (plugin *projectedPlugin) RequiresRemount(spec *volume.Spec) bool {
    return true
}


func (s *projectedVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
    return s.SetUpAt(s.GetPath(), mounterArgs)
}

func (s *projectedVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
...
    data, err := s.collectData(mounterArgs)
    if err != nil {
        klog.Errorf("Error preparing data for projected volume %v for pod %v/%v: %s", s.volName, s.pod.Namespace, s.pod.Name, err.Error())
        return err
    }
...
    writer, err := volumeutil.NewAtomicWriter(dir, writerContext)
    if err != nil {
        klog.Errorf("Error creating atomic writer: %v", err)
        return err
    }

...
    err = writer.Write(data, setPerms)
    if err != nil {
        klog.Errorf("Error writing payload to dir: %v", err)
        return err
    }
...
}

func (plugin *projectedPlugin) Init(host volume.VolumeHost) error {
    plugin.host = host
    plugin.kvHost = host.(volume.KubeletVolumeHost)
    plugin.getSecret = host.GetSecretFunc()
    plugin.getConfigMap = host.GetConfigMapFunc()
    plugin.getServiceAccountToken = host.GetServiceAccountTokenFunc()
    plugin.deleteServiceAccountToken = host.DeleteServiceAccountTokenFunc()
    return nil
}

func (s *projectedVolumeMounter) collectData(mounterArgs volume.MounterArgs) (map[string]volumeutil.FileProjection, error) {
...
            tr, err := s.plugin.getServiceAccountToken(s.pod.Namespace, s.pod.Spec.ServiceAccountName, &authenticationv1.TokenRequest{
                Spec: authenticationv1.TokenRequestSpec{
                    Audiences:         auds,
                    ExpirationSeconds: tp.ExpirationSeconds,
                    BoundObjectRef: &authenticationv1.BoundObjectReference{
                        APIVersion: "v1",
                        Kind:       "Pod",
                        Name:       s.pod.Name,
                        UID:        s.pod.UID,
                    },
                },
            })
...
            payload[tp.Path] = volumeutil.FileProjection{
                Data:   []byte(tr.Status.Token),
                Mode:   mode,
                FsUser: mounterArgs.FsUser,
            }
...
}

pkg/kubelet/volume_host.go中
volumehost

func (kvh *kubeletVolumeHost) GetServiceAccountTokenFunc() func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
    return kvh.tokenManager.GetServiceAccountToken
}

pkg/kubelet/token/token_manager.go中
tokenmanager

func NewManager(c clientset.Interface) *Manager {
...
    m := &Manager{
        getToken: func(name, namespace string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
            if c == nil {
                return nil, errors.New("cannot use TokenManager when kubelet is in standalone mode")
            }
            tokenRequest, err := c.CoreV1().ServiceAccounts(namespace).CreateToken(context.TODO(), name, tr, metav1.CreateOptions{})
            if apierrors.IsNotFound(err) && !tokenRequestsSupported() {
                return nil, fmt.Errorf("the API server does not have TokenRequest endpoints enabled")
            }
            return tokenRequest, err
        },
        cache: make(map[string]*authenticationv1.TokenRequest),
        clock: clock.RealClock{},
    }
...
}

func (m *Manager) GetServiceAccountToken(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
...
    tr, err := m.getToken(name, namespace, tr)
    if err != nil {
        switch {
        case !ok:
            return nil, fmt.Errorf("failed to fetch token: %v", err)
        case m.expired(ctr):
            return nil, fmt.Errorf("token %s expired and refresh failed: %v", key, err)
        default:
            klog.ErrorS(err, "Couldn't update token", "cacheKey", key)
            return ctr, nil
        }
    }
...
}

client go

kubernetes/clientset.go中
构建clientset


func NewForConfig(c *rest.Config) (*Clientset, error) {
...
    httpClient, err := rest.HTTPClientFor(&configShallowCopy)

...
}

rest/transport.go中
构建transport

func HTTPClientFor(config *Config) (*http.Client, error) {
    transport, err := TransportFor(config)
...
}
func TransportFor(config *Config) (http.RoundTripper, error) {
    cfg, err := config.TransportConfig()
    if err != nil {
        return nil, err
    }
    return transport.New(cfg)
}

func New(config *Config) (http.RoundTripper, error) {
...
    return HTTPWrappersForConfig(config, rt)
}
func HTTPWrappersForConfig(config *Config, rt http.RoundTripper) (http.RoundTripper, error) {
    cfg, err := config.TransportConfig()
    if err != nil {
        return nil, err
    }
    return transport.HTTPWrappersForConfig(cfg, rt)
}

transport/round_trippers.go中
构建transportwrapper

func HTTPWrappersForConfig(config *Config, rt http.RoundTripper) (http.RoundTripper, error) {
...
    case config.HasTokenAuth():
        var err error
        rt, err = NewBearerAuthWithRefreshRoundTripper(config.BearerToken, config.BearerTokenFile, rt)
        if err != nil {
            return nil, err
        }
...
}

func NewBearerAuthWithRefreshRoundTripper(bearer string, tokenFile string, rt http.RoundTripper) (http.RoundTripper, error) {
...
    source := NewCachedFileTokenSource(tokenFile)
...
    return &bearerAuthRoundTripper{bearer, source, rt}, nil
}


func (rt *bearerAuthRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
...
    if rt.source != nil {
        if refreshedToken, err := rt.source.Token(); err == nil {
            token = refreshedToken.AccessToken
        }
    }
    req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
    return rt.rt.RoundTrip(req)
}

transport/token_source.go
构建tokensource

缓存
func NewCachedFileTokenSource(path string) *cachingTokenSource {
    return &cachingTokenSource{
        now:    time.Now,
        leeway: 10 * time.Second,
        base: &fileTokenSource{
            path: path,
            // This period was picked because it is half of the duration between when the kubelet
            // refreshes a projected service account token and when the original token expires.
            // Default token lifetime is 10 minutes, and the kubelet starts refreshing at 80% of lifetime.
            // This should induce re-reading at a frequency that works with the token volume source.
            period: time.Minute,
        },
    }
}

func (ts *cachingTokenSource) Token() (*oauth2.Token, error) {
...

    tok, err := ts.base.Token()
    if err != nil {
        if ts.tok == nil {
            return nil, err
        }
        klog.Errorf("Unable to rotate token: %v", err)
        return ts.tok, nil
    }
...
}


文件加载
func (ts *fileTokenSource) Token() (*oauth2.Token, error) {
    tokb, err := os.ReadFile(ts.path)
    if err != nil {
        return nil, fmt.Errorf("failed to read token file %q: %v", ts.path, err)
    }
...
    return &oauth2.Token{
        AccessToken: tok,
        Expiry:      time.Now().Add(ts.period),
    }, nil
}

相关文章

网友评论

      本文标题:client go的token如何做到热更新

      本文链接:https://www.haomeiwen.com/subject/orlggdtx.html