美文网首页
k8s watch源码简介

k8s watch源码简介

作者: wwq2020 | 来源:发表于2022-03-19 12:28 被阅读0次

简单总结

通过Chunked transfer encoding来做到服务端给客户端推送数据,达到watch的效果
client发送watch这个请求后,服务端返回响应,带上Header Transfer-Encoding:chunked,客户端和服务端的连接仍会保持,接着服务端就可以推送相应的事件给客户端了
server的resp内的writer是经过chunkwriter包装的,如果是chunk模式,会在数据前添加数据长度加\r\n(例如发送hello,那么实际发送是5\r\nhello\r\n)
客户端读取chunkline,也就是5\r\n,知道长度是5后,读取长度+2的数据,也就是hello\r\n,实际数据是hello

顺道提一下informer,拿pod举例
Informer 先通过Reflector先List 获得所有的 Pods
Reflect 拿到全部 Pod 放到 Store 中(如果调用Informer的List则从Store中取)
List完后,Reflector 开始 Watch Pod相关 的所有事件
收到后Reflector将事件发送到 DeltaFIFO,
Controller 收到这个事件,会触发 Processor 的回调函数,先记录到一个clientState的cache结构中,然后调用用户传入的ResourceEventHandler
reflector会周期性的resync,从clientState合规cache结构取出然后塞入DeltaFIFO

相关源码

拿pod watch举例
client-go相对路径
kubernetes/typed/core/v1/pod.go中

func (c *pods) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
    var timeout time.Duration
    if opts.TimeoutSeconds != nil {
        timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
    }
    opts.Watch = true
    return c.client.Get().
        Namespace(c.ns).
        Resource("pods").
        VersionedParams(&opts, scheme.ParameterCodec).
        Timeout(timeout).
        Watch(ctx)
}

rest/request.go中

func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
  ...
  resp, err := client.Do(req)
        updateURLMetrics(ctx, r, resp, err)
        if r.c.base != nil {
            if err != nil {
                r.backoff.UpdateBackoff(r.c.base, err, 0)
            } else {
                r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
            }
        }
        if err == nil && resp.StatusCode == http.StatusOK {
            return r.newStreamWatcher(resp)
  }
  ...
}

func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) {
    contentType := resp.Header.Get("Content-Type")
    mediaType, params, err := mime.ParseMediaType(contentType)
    if err != nil {
        klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
    }
    objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
    if err != nil {
        return nil, err
    }

    handleWarnings(resp.Header, r.warningHandler)

    frameReader := framer.NewFrameReader(resp.Body)
    watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)

    return watch.NewStreamWatcher(
        restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
        // use 500 to indicate that the cause of the error is unknown - other error codes
        // are more specific to HTTP interactions, and set a reason
        errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
    ), nil
}

func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
    return r.SpecificallyVersionedParams(obj, codec, r.c.content.GroupVersion)
}

func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
    if r.err != nil {
        return r
    }
    params, err := codec.EncodeParameters(obj, version)
    if err != nil {
        r.err = err
        return r
    }
    for k, v := range params {
        if r.params == nil {
            r.params = make(url.Values)
        }
        r.params[k] = append(r.params[k], v...)
    }
    return r
}

kubernetes相对路径
vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go中

func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
  ...
  if opts.Watch || forceWatch {
  watcher, err := rw.Watch(ctx, &opts)
            if err != nil {
                scope.err(err, w, req)
                return
            }
            requestInfo, _ := request.RequestInfoFrom(ctx)
            metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
                serveWatch(watcher, scope, outputMediaType, req, w, timeout)
            })
            return
  }  
result, err := r.List(ctx, &opts)
        if err != nil {
            scope.err(err, w, req)
            return
        }
...
}

vendor/k8s.io/apiserver/pkg/endpoints/handlers/watch.go中

func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
  ...
server := &WatchServer{
        Watching: watcher,
        Scope:    scope,

        UseTextFraming:  useTextFraming,
        MediaType:       mediaType,
        Framer:          framer,
        Encoder:         encoder,
        EmbeddedEncoder: embeddedEncoder,

        Fixup: func(obj runtime.Object) runtime.Object {
            result, err := transformObject(ctx, obj, options, mediaTypeOptions, scope, req)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err))
                return obj
            }
            // When we are transformed to a table, use the table options as the state for whether we
            // should print headers - on watch, we only want to print table headers on the first object
            // and omit them on subsequent events.
            if tableOptions, ok := options.(*metav1.TableOptions); ok {
                tableOptions.NoHeaders = true
            }
            return result
        },

        TimeoutFactory: &realTimeoutFactory{timeout},
    }

    server.ServeHTTP(w, req)  
...
}

func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
...

    w.Header().Set("Content-Type", s.MediaType)
    w.Header().Set("Transfer-Encoding", "chunked")
    w.WriteHeader(http.StatusOK)
    flusher.Flush()
          

    for {
        select {
        case <-done:
            return
        case <-timeoutCh:
            return
        case event, ok := <-ch:
...
      }
    ]
    ...
}

标准库相对路径
net/http/transfer.go中

func readTransfer(msg any, r *bufio.Reader) (err error) {
        ...
    switch {
    case t.Chunked:
        if noResponseBodyExpected(t.RequestMethod) || !bodyAllowedForStatus(t.StatusCode) {
            t.Body = NoBody
        } else {
            t.Body = &body{src: internal.NewChunkedReader(r), hdr: msg, r: r, closing: t.Close}
        }
      }
     ...
}


net/http/server.go中

func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
...
w = &response{
        conn:          c,
        cancelCtx:     cancelCtx,
        req:           req,
        reqBody:       req.Body,
        handlerHeader: make(Header),
        contentLength: -1,
        closeNotifyCh: make(chan bool, 1),

        // We populate these ahead of time so we're not
        // reading from req.Header after their Handler starts
        // and maybe mutates it (Issue 14940)
        wants10KeepAlive: req.wantsHttp10KeepAlive(),
        wantsClose:       req.wantsClose(),
    }
    if isH2Upgrade {
        w.closeAfterReply = true
    }
    w.cw.res = w
    w.w = newBufioWriterSize(&w.cw, bufferBeforeChunkingSize)
...

}

func (w *response) Write(data []byte) (n int, err error) {
    return w.write(len(data), data, "")
}

func (w *response) WriteString(data string) (n int, err error) {
    return w.write(len(data), nil, data)
}

// either dataB or dataS is non-zero.
func (w *response) write(lenData int, dataB []byte, dataS string) (n int, err error) {
    if w.conn.hijacked() {
        if lenData > 0 {
            caller := relevantCaller()
            w.conn.server.logf("http: response.Write on hijacked connection from %s (%s:%d)", caller.Function, path.Base(caller.File), caller.Line)
        }
        return 0, ErrHijacked
    }

    if w.canWriteContinue.isSet() {
        // Body reader wants to write 100 Continue but hasn't yet.
        // Tell it not to. The store must be done while holding the lock
        // because the lock makes sure that there is not an active write
        // this very moment.
        w.writeContinueMu.Lock()
        w.canWriteContinue.setFalse()
        w.writeContinueMu.Unlock()
    }

    if !w.wroteHeader {
        w.WriteHeader(StatusOK)
    }
    if lenData == 0 {
        return 0, nil
    }
    if !w.bodyAllowed() {
        return 0, ErrBodyNotAllowed
    }

    w.written += int64(lenData) // ignoring errors, for errorKludge
    if w.contentLength != -1 && w.written > w.contentLength {
        return 0, ErrContentLength
    }
    if dataB != nil {
        return w.w.Write(dataB)
    } else {
        return w.w.WriteString(dataS)
    }
}


func (cw *chunkWriter) Write(p []byte) (n int, err error) {
    if !cw.wroteHeader {
        cw.writeHeader(p)
    }
    if cw.res.req.Method == "HEAD" {
        // Eat writes.
        return len(p), nil
    }
    if cw.chunking {
        _, err = fmt.Fprintf(cw.res.conn.bufw, "%x\r\n", len(p))
        if err != nil {
            cw.res.conn.rwc.Close()
            return
        }
    }
    n, err = cw.res.conn.bufw.Write(p)
    if cw.chunking && err == nil {
        _, err = cw.res.conn.bufw.Write(crlf)
    }
    if err != nil {
        cw.res.conn.rwc.Close()
    }
    return
}

net/http/internal/chunked.go中

func (cr *chunkedReader) Read(b []uint8) (n int, err error) {
    for cr.err == nil {
        if cr.checkEnd {
            if n > 0 && cr.r.Buffered() < 2 {
                // We have some data. Return early (per the io.Reader
                // contract) instead of potentially blocking while
                // reading more.
                break
            }
            if _, cr.err = io.ReadFull(cr.r, cr.buf[:2]); cr.err == nil {
                if string(cr.buf[:]) != "\r\n" {
                    cr.err = errors.New("malformed chunked encoding")
                    break
                }
            } else {
                if cr.err == io.EOF {
                    cr.err = io.ErrUnexpectedEOF
                }
                break
            }
            cr.checkEnd = false
        }
        if cr.n == 0 {
            if n > 0 && !cr.chunkHeaderAvailable() {
                // We've read enough. Don't potentially block
                // reading a new chunk header.
                break
            }
            cr.beginChunk()
            continue
        }
        if len(b) == 0 {
            break
        }
        rbuf := b
        if uint64(len(rbuf)) > cr.n {
            rbuf = rbuf[:cr.n]
        }
        var n0 int
        n0, cr.err = cr.r.Read(rbuf)
        n += n0
        b = b[n0:]
        cr.n -= uint64(n0)
        // If we're at the end of a chunk, read the next two
        // bytes to verify they are "\r\n".
        if cr.n == 0 && cr.err == nil {
            cr.checkEnd = true
        } else if cr.err == io.EOF {
            cr.err = io.ErrUnexpectedEOF
        }
    }
    return n, cr.err
}


相关文章

  • k8s watch源码简介

    简单总结 通过Chunked transfer encoding[https://developer.mozill...

  • Apple Watch开发-初探

    目录: 前言 一、Apple Watch简介 二、Watch App在项目中的结构 三、 Apple Watch上...

  • 扩展Kubernetes Scheduler

    转载请注明出处即可。所使用源码k8s源码为release-1.18 如果对K8s的Scheduler不是很了解,可...

  • k8s webhook 源码简介

    相关源码 注册webhook 初始化webhook 调用链 简单介绍 先admission plugin注册进去初...

  • k8s list-watch

    k8s list-watch Background 参考kubernetes设计理念分析 | 从运行流程和list...

  • k8s watch rest api

    k8s rest api对rc、svc、ingress、pod、deployment等都提供的watch接口,可以...

  • 【vue3源码】五、watch源码解析

    【vue3源码】五、watch源码解析 参考代码版本:vue 3.2.37 官方文档:https://vuejs....

  • apple watch看小说 三目阅读watch 使用教程

    三目阅读watch 应用 使用教程 简介 三目阅读watch是在apple watch手表上独立使用的应用,在手表...

  • 每周阅读(3/4/2019)

    k8s 源码分析要深入理解 k8s,还是要对源码和运作机制有所了解,这本书还在创作中,可以跟踪看看。 技术面试的应...

  • k8s简介

    k8s简介 k8s介绍官网:https://kubernetes.iogithub: https://github...

网友评论

      本文标题:k8s watch源码简介

      本文链接:https://www.haomeiwen.com/subject/dqmgdrtx.html