背景
集群内client访问apiserver的方式常见的是使用Token,而此token来源于service account
一种是永久性的,存在泄漏的风险
一种是kubelet通过service account的CreateToken接口创建token,然后挂载到pod内,此token会关联到pod,且有过期时间,kubelet定期更新,client go会定期加载tokenfile
源码
kubelet
参见https://www.jianshu.com/p/425e9ce6103d中如何触发SetUp,此处RequiresRemount为true
简单总结就是projectedVolumeMounter-> kubeletVolumeHost-> tokenManager->apiserver
pkg/volume/projected/projected.go 中
func (plugin *projectedPlugin) RequiresRemount(spec *volume.Spec) bool {
return true
}
func (s *projectedVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
return s.SetUpAt(s.GetPath(), mounterArgs)
}
func (s *projectedVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
...
data, err := s.collectData(mounterArgs)
if err != nil {
klog.Errorf("Error preparing data for projected volume %v for pod %v/%v: %s", s.volName, s.pod.Namespace, s.pod.Name, err.Error())
return err
}
...
writer, err := volumeutil.NewAtomicWriter(dir, writerContext)
if err != nil {
klog.Errorf("Error creating atomic writer: %v", err)
return err
}
...
err = writer.Write(data, setPerms)
if err != nil {
klog.Errorf("Error writing payload to dir: %v", err)
return err
}
...
}
func (plugin *projectedPlugin) Init(host volume.VolumeHost) error {
plugin.host = host
plugin.kvHost = host.(volume.KubeletVolumeHost)
plugin.getSecret = host.GetSecretFunc()
plugin.getConfigMap = host.GetConfigMapFunc()
plugin.getServiceAccountToken = host.GetServiceAccountTokenFunc()
plugin.deleteServiceAccountToken = host.DeleteServiceAccountTokenFunc()
return nil
}
func (s *projectedVolumeMounter) collectData(mounterArgs volume.MounterArgs) (map[string]volumeutil.FileProjection, error) {
...
tr, err := s.plugin.getServiceAccountToken(s.pod.Namespace, s.pod.Spec.ServiceAccountName, &authenticationv1.TokenRequest{
Spec: authenticationv1.TokenRequestSpec{
Audiences: auds,
ExpirationSeconds: tp.ExpirationSeconds,
BoundObjectRef: &authenticationv1.BoundObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: s.pod.Name,
UID: s.pod.UID,
},
},
})
...
payload[tp.Path] = volumeutil.FileProjection{
Data: []byte(tr.Status.Token),
Mode: mode,
FsUser: mounterArgs.FsUser,
}
...
}
pkg/kubelet/volume_host.go中
volumehost
func (kvh *kubeletVolumeHost) GetServiceAccountTokenFunc() func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
return kvh.tokenManager.GetServiceAccountToken
}
pkg/kubelet/token/token_manager.go中
tokenmanager
func NewManager(c clientset.Interface) *Manager {
...
m := &Manager{
getToken: func(name, namespace string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
if c == nil {
return nil, errors.New("cannot use TokenManager when kubelet is in standalone mode")
}
tokenRequest, err := c.CoreV1().ServiceAccounts(namespace).CreateToken(context.TODO(), name, tr, metav1.CreateOptions{})
if apierrors.IsNotFound(err) && !tokenRequestsSupported() {
return nil, fmt.Errorf("the API server does not have TokenRequest endpoints enabled")
}
return tokenRequest, err
},
cache: make(map[string]*authenticationv1.TokenRequest),
clock: clock.RealClock{},
}
...
}
func (m *Manager) GetServiceAccountToken(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
...
tr, err := m.getToken(name, namespace, tr)
if err != nil {
switch {
case !ok:
return nil, fmt.Errorf("failed to fetch token: %v", err)
case m.expired(ctr):
return nil, fmt.Errorf("token %s expired and refresh failed: %v", key, err)
default:
klog.ErrorS(err, "Couldn't update token", "cacheKey", key)
return ctr, nil
}
}
...
}
client go
kubernetes/clientset.go中
构建clientset
func NewForConfig(c *rest.Config) (*Clientset, error) {
...
httpClient, err := rest.HTTPClientFor(&configShallowCopy)
...
}
rest/transport.go中
构建transport
func HTTPClientFor(config *Config) (*http.Client, error) {
transport, err := TransportFor(config)
...
}
func TransportFor(config *Config) (http.RoundTripper, error) {
cfg, err := config.TransportConfig()
if err != nil {
return nil, err
}
return transport.New(cfg)
}
func New(config *Config) (http.RoundTripper, error) {
...
return HTTPWrappersForConfig(config, rt)
}
func HTTPWrappersForConfig(config *Config, rt http.RoundTripper) (http.RoundTripper, error) {
cfg, err := config.TransportConfig()
if err != nil {
return nil, err
}
return transport.HTTPWrappersForConfig(cfg, rt)
}
transport/round_trippers.go中
构建transportwrapper
func HTTPWrappersForConfig(config *Config, rt http.RoundTripper) (http.RoundTripper, error) {
...
case config.HasTokenAuth():
var err error
rt, err = NewBearerAuthWithRefreshRoundTripper(config.BearerToken, config.BearerTokenFile, rt)
if err != nil {
return nil, err
}
...
}
func NewBearerAuthWithRefreshRoundTripper(bearer string, tokenFile string, rt http.RoundTripper) (http.RoundTripper, error) {
...
source := NewCachedFileTokenSource(tokenFile)
...
return &bearerAuthRoundTripper{bearer, source, rt}, nil
}
func (rt *bearerAuthRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
...
if rt.source != nil {
if refreshedToken, err := rt.source.Token(); err == nil {
token = refreshedToken.AccessToken
}
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
return rt.rt.RoundTrip(req)
}
transport/token_source.go
构建tokensource
缓存
func NewCachedFileTokenSource(path string) *cachingTokenSource {
return &cachingTokenSource{
now: time.Now,
leeway: 10 * time.Second,
base: &fileTokenSource{
path: path,
// This period was picked because it is half of the duration between when the kubelet
// refreshes a projected service account token and when the original token expires.
// Default token lifetime is 10 minutes, and the kubelet starts refreshing at 80% of lifetime.
// This should induce re-reading at a frequency that works with the token volume source.
period: time.Minute,
},
}
}
func (ts *cachingTokenSource) Token() (*oauth2.Token, error) {
...
tok, err := ts.base.Token()
if err != nil {
if ts.tok == nil {
return nil, err
}
klog.Errorf("Unable to rotate token: %v", err)
return ts.tok, nil
}
...
}
文件加载
func (ts *fileTokenSource) Token() (*oauth2.Token, error) {
tokb, err := os.ReadFile(ts.path)
if err != nil {
return nil, fmt.Errorf("failed to read token file %q: %v", ts.path, err)
}
...
return &oauth2.Token{
AccessToken: tok,
Expiry: time.Now().Add(ts.period),
}, nil
}
网友评论