美文网首页
使用临时容器排查问题

使用临时容器排查问题

作者: wwq2020 | 来源:发表于2021-10-20 17:24 被阅读0次

背景

构建镜像时候,为了减少镜像大小或者安全性,很多工具都没装,甚至没shell,出问题时候,如果需要登录pod去查看,会很麻烦

如何开启

--feature-gates=中添加EphemeralContainers=true

如何使用

kubectl debug -it tool-pod --image=busybox --target=bizpod

相关源码

kubelet部分

func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
    kubeDeps *Dependencies,
    crOptions *config.ContainerRuntimeOptions,
    containerRuntime string,
    runtimeCgroups string,
    remoteRuntimeEndpoint string,
    remoteImageEndpoint string,
    nonMasqueradeCIDR string) error {
    ...
    if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil {
        return err
    }
    ...
}

pkg/kubelet/server/server.go中

func (s *Server) InstallDebuggingHandlers() {
    ...
    ws = new(restful.WebService)
    ws.
        Path("/attach")
    ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
        To(s.getAttach).
        Operation("getAttach"))
    ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
        To(s.getAttach).
        Operation("getAttach"))
    ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
        To(s.getAttach).
        Operation("getAttach"))
    ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
        To(s.getAttach).
        Operation("getAttach"))
    s.restfulCont.Add(ws)
    ...
}

func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
    ...
    url, err := s.host.GetAttach(podFullName, params.podUID, params.containerName, *streamOpts)
    if err != nil {
        streaming.WriteError(err, response.ResponseWriter)
        return
    }

    proxyStream(response.ResponseWriter, request.Request, url)
    ...
}

pkg/kubelet/kubelet_pods.go中

func (kl *Kubelet) GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error) {
    ...
    return kl.streamingRuntime.GetAttach(container.ID, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, tty)
}

pkg/kubelet/kuberuntime/kuberuntime_container.go中

func NewKubeGenericRuntimeManager(
    recorder record.EventRecorder,
    livenessManager proberesults.Manager,
    readinessManager proberesults.Manager,
    startupManager proberesults.Manager,
    rootDirectory string,
    machineInfo *cadvisorapi.MachineInfo,
    podStateProvider podStateProvider,
    osInterface kubecontainer.OSInterface,
    runtimeHelper kubecontainer.RuntimeHelper,
    httpClient types.HTTPGetter,
    imageBackOff *flowcontrol.Backoff,
    serializeImagePulls bool,
    imagePullQPS float32,
    imagePullBurst int,
    imageCredentialProviderConfigFile string,
    imageCredentialProviderBinDir string,
    cpuCFSQuota bool,
    cpuCFSQuotaPeriod metav1.Duration,
    runtimeService internalapi.RuntimeService,
    imageService internalapi.ImageManagerService,
    internalLifecycle cm.InternalContainerLifecycle,
    legacyLogProvider LegacyLogProvider,
    logManager logs.ContainerLogManager,
    runtimeClassManager *runtimeclass.Manager,
    seccompDefault bool,
    memorySwapBehavior string,
    getNodeAllocatable func() v1.ResourceList,
    memoryThrottlingFactor float64,
) (KubeGenericRuntime, error) {
    ...
    kubeRuntimeManager := &kubeGenericRuntimeManager{
        ...
        runtimeService:         newInstrumentedRuntimeService(runtimeService),
        ...
    }
    ...
}
func (m *kubeGenericRuntimeManager) GetAttach(id kubecontainer.ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error) {
    req := &runtimeapi.AttachRequest{
        ContainerId: id.ID,
        Stdin:       stdin,
        Stdout:      stdout,
        Stderr:      stderr,
        Tty:         tty,
    }
    resp, err := m.runtimeService.Attach(req)
    if err != nil {
        return nil, err
    }
    return url.Parse(resp.Url)
}

pkg/kubelet/cri/remote/remote_runtime.go中

func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (internalapi.RuntimeService, error) {
    klog.V(3).InfoS("Connecting to runtime service", "endpoint", endpoint)
    addr, dialer, err := util.GetAddressAndDialer(endpoint)
    if err != nil {
        return nil, err
    }
    ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
    defer cancel()

    conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
    if err != nil {
        klog.ErrorS(err, "Connect remote runtime failed", "address", addr)
        return nil, err
    }

    return &remoteRuntimeService{
        timeout:       connectionTimeout,
        runtimeClient: runtimeapi.NewRuntimeServiceClient(conn),
        logReduction:  logreduction.NewLogReduction(identicalErrorDelay),
    }, nil
}

func (r *remoteRuntimeService) Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
    klog.V(10).InfoS("[RemoteRuntimeService] Attach", "containerID", req.ContainerId, "timeout", r.timeout)
    ctx, cancel := getContextWithTimeout(r.timeout)
    defer cancel()

    resp, err := r.runtimeClient.Attach(ctx, req)
    if err != nil {
        klog.ErrorS(err, "Attach container from runtime service failed", "containerID", req.ContainerId)
        return nil, err
    }
    klog.V(10).InfoS("[RemoteRuntimeService] Attach Response", "containerID", req.ContainerId)

    if resp.Url == "" {
        errorMessage := "URL is not set"
        err := errors.New(errorMessage)
        klog.ErrorS(err, "Attach failed")
        return nil, err
    }
    return resp, nil
}

vendor/k8s.io/cri-api/pkg/apis/runtime/v1alpha2/api.pb.go中

func NewRuntimeServiceClient(cc *grpc.ClientConn) RuntimeServiceClient {
    return &runtimeServiceClient{cc}
}

func (c *runtimeServiceClient) Attach(ctx context.Context, in *AttachRequest, opts ...grpc.CallOption) (*AttachResponse, error) {
    out := new(AttachResponse)
    err := c.cc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/Attach", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

pkg/kubelet/kuberuntime/instrumented_services.go中

func newInstrumentedRuntimeService(service internalapi.RuntimeService) internalapi.RuntimeService {
    return &instrumentedRuntimeService{service: service}
}

func (in instrumentedRuntimeService) Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
    const operation = "attach"
    defer recordOperation(operation, time.Now())

    resp, err := in.service.Attach(req)
    recordError(operation, err)
    return resp, err
}

apiserver部分

staging/src/k8s.io/apiserver/pkg/endpoints/installer.go中

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
    ...
    case "CONNECT":
        ...
        handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, restfulConnectResource(connecter, reqScope, admit, path, isSubresource))

    ...
}

vendor/k8s.io/apiserver/pkg/endpoints/installer.go中

func restfulConnectResource(connecter rest.Connecter, scope handlers.RequestScope, admit admission.Interface, restPath string, isSubresource bool) restful.RouteFunction {
    return func(req *restful.Request, res *restful.Response) {
        handlers.ConnectResource(connecter, &scope, admit, restPath, isSubresource)(res.ResponseWriter, req.Request)
    }
}

vendor/k8s.io/apiserver/pkg/endpoints/handlers/rest.go中

func ConnectResource(connecter rest.Connecter, scope *RequestScope, admit admission.Interface, restPath string, isSubresource bool) http.HandlerFunc {
    ...
    metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
            handler, err := connecter.Connect(ctx, name, opts, &responder{scope: scope, req: req, w: w})
            if err != nil {
                scope.err(err, w, req)
                return
            }
            handler.ServeHTTP(w, req)
        })
    ...
}

pkg/registry/core/rest/storage_core.go中

func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
    ...
    podStorage, err := podstore.NewStorage(
        restOptionsGetter,
        nodeStorage.KubeletConnectionInfo,
        c.ProxyTransport,
        podDisruptionClient,
    )
    ...
    restStorageMap := map[string]rest.Storage{
        "pods":             podStorage.Pod,
        "pods/attach":      podStorage.Attach,
    ...
}

pkg/registry/core/pod/storage/storage.go中

func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {

    store := &genericregistry.Store{
        NewFunc:                  func() runtime.Object { return &api.Pod{} },
        NewListFunc:              func() runtime.Object { return &api.PodList{} },
        PredicateFunc:            registrypod.MatchPod,
        DefaultQualifiedResource: api.Resource("pods"),

        CreateStrategy:      registrypod.Strategy,
        UpdateStrategy:      registrypod.Strategy,
        DeleteStrategy:      registrypod.Strategy,
        ResetFieldsStrategy: registrypod.Strategy,
        ReturnDeletedObject: true,

        TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
    }
    options := &generic.StoreOptions{
        RESTOptions: optsGetter,
        AttrFunc:    registrypod.GetAttrs,
        TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": registrypod.NodeNameTriggerFunc},
        Indexers:    registrypod.Indexers(),
    }
    if err := store.CompleteWithOptions(options); err != nil {
        return PodStorage{}, err
    }

    statusStore := *store
    statusStore.UpdateStrategy = registrypod.StatusStrategy
    statusStore.ResetFieldsStrategy = registrypod.StatusStrategy
    ephemeralContainersStore := *store
    ephemeralContainersStore.UpdateStrategy = registrypod.EphemeralContainersStrategy

    bindingREST := &BindingREST{store: store}
    return PodStorage{
        Pod:                 &REST{store, proxyTransport},
        Binding:             &BindingREST{store: store},
        LegacyBinding:       &LegacyBindingREST{bindingREST},
        Eviction:            newEvictionStorage(store, podDisruptionBudgetClient),
        Status:              &StatusREST{store: &statusStore},
        EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
        Log:                 &podrest.LogREST{Store: store, KubeletConn: k},
        Proxy:               &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
        Exec:                &podrest.ExecREST{Store: store, KubeletConn: k},
        Attach:              &podrest.AttachREST{Store: store, KubeletConn: k},
        PortForward:         &podrest.PortForwardREST{Store: store, KubeletConn: k},
    }, nil
}

pkg/registry/core/pod/rest/subresources.go中

func (r *AttachREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
    attachOpts, ok := opts.(*api.PodAttachOptions)
    if !ok {
        return nil, fmt.Errorf("Invalid options object: %#v", opts)
    }
    location, transport, err := pod.AttachLocation(ctx, r.Store, r.KubeletConn, name, attachOpts)
    if err != nil {
        return nil, err
    }
    return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, true, responder), nil
}

kubectl部分

vendor/k8s.io/kubectl/pkg/cmd/debug/debug.go中

func NewCmdDebug(f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.Command {
    ...
    cmdutil.CheckErr(o.Complete(f, cmd, args))
    cmdutil.CheckErr(o.Validate(cmd))
    cmdutil.CheckErr(o.Run(f, cmd))
    ...
}

func (o *DebugOptions) Run(f cmdutil.Factory, cmd *cobra.Command) error {
    ...
    opts.AttachFunc = attach.DefaultAttachFunc
    if err := handleAttachPod(ctx, f, o.podClient, debugPod.Namespace, debugPod.Name, containerName, opts); err != nil {
        return err
    }
    ...
}

func handleAttachPod(ctx context.Context, f cmdutil.Factory, podClient corev1client.PodsGetter, ns, podName, containerName string, opts *attach.AttachOptions) error {
    ...
    if err := opts.Run(); err != nil {
        fmt.Fprintf(opts.ErrOut, "Error attaching, falling back to logs: %v\n", err)
        return logOpts(f, pod, opts)
    }
    ...
}

vendor/k8s.io/kubectl/pkg/cmd/attach/attach.go中

func (o *AttachOptions) Run() error {
    ...
    if err := t.Safe(o.AttachFunc(o, containerToAttach, t.Raw, sizeQueue)); err != nil {
        return err
    }
    ...
}

vendor/k8s.io/kubectl/pkg/cmd/attach/attach.go中

func DefaultAttachFunc(o *AttachOptions, containerToAttach *corev1.Container, raw bool, sizeQueue remotecommand.TerminalSizeQueue) func() error {
    return func() error {
        restClient, err := restclient.RESTClientFor(o.Config)
        if err != nil {
            return err
        }
        req := restClient.Post().
            Resource("pods").
            Name(o.Pod.Name).
            Namespace(o.Pod.Namespace).
            SubResource("attach")
        req.VersionedParams(&corev1.PodAttachOptions{
            Container: containerToAttach.Name,
            Stdin:     o.Stdin,
            Stdout:    o.Out != nil,
            Stderr:    !o.DisableStderr,
            TTY:       raw,
        }, scheme.ParameterCodec)

        return o.Attach.Attach("POST", req.URL(), o.Config, o.In, o.Out, o.ErrOut, raw, sizeQueue)
    }
}

相关文章

网友评论

      本文标题:使用临时容器排查问题

      本文链接:https://www.haomeiwen.com/subject/lintaltx.html