美文网首页自留地
k8s watch rest api

k8s watch rest api

作者: ywhu | 来源:发表于2017-09-27 21:39 被阅读0次

    k8s rest api对rc、svc、ingress、pod、deployment等都提供的watch接口,可以实时的监听应用部署状态。

    在此之前简单先说一下http长连接

    分块传输编码(Chunked transfer encoding)

    超文本传输协议(HTTP)中的一种数据传输机制,允许HTTP由应用服务器发送给客户端应用( 通常是网页浏览器)的数据可以分成多个部分。分块传输编码只在HTTP协议1.1版本(HTTP/1.1)中提供。
    通常,HTTP应答消息中发送的数据是整个发送的,Content-Length消息头字段表示数据的长度。数据的长度很重要,因为客户端需要知道哪里是应答消息的结束,以及后续应答消息的开始。然而,使用分块传输编码,数据分解成一系列数据块,并以一个或多个块发送,这样服务器可以发送数据而不需要预先知道发送内容的总大小。通常数据块的大小是一致的,但也不总是这种情况。

    Transfer-Encoding

    消息首部指明了将 entity 安全传递给用户所采用的编码形式。

    Transfer-Encoding 是一个逐跳传输消息首部,即仅应用于两个节点之间的消息传递,而不是所请求的资源本身。一个多节点连接中的每一段都可以应用不同的Transfer-Encoding 值。如果你想要将压缩后的数据应用于整个连接,那么请使用端到端传输消息首部 Content-Encoding 。

    当这个消息首部出现在 HEAD 请求的响应中,而这样的响应没有消息体,那么它其实指的是应用在相应的 GET 请求的应答的值。

    Header type Response header
    Forbidden header name   yes
    

    语法

    Transfer-Encoding: chunked
    Transfer-Encoding: compress
    Transfer-Encoding: deflate
    Transfer-Encoding: gzip
    Transfer-Encoding: identity
    
    // Several values can be listed, separated by a comma
    Transfer-Encoding: gzip, chunked
    

    指令

    chunked

    数据以一系列分块的形式进行发送。 Content-Length 首部在这种情况下不被发送。。在每一个分块的开头需要添加当前分块的长度,以十六进制的形式表示,后面紧跟着 '\r\n' ,之后是分块本身,后面也是'\r\n' 。终止块是一个常规的分块,不同之处在于其长度为0。终止块后面是一个挂载(trailer),由一系列(或者为空)的实体消息首部构成。

    compress

    采用 Lempel-Ziv-Welch (LZW) 压缩算法。这个名称来自UNIX系统的 compress 程序,该程序实现了前述算法。
    与其同名程序已经在大部分UNIX发行版中消失一样,这种内容编码方式已经被大部分浏览器弃用,部分因为专利问题(这项专利在2003年到期)。

    deflate

    采用 zlib 结构 (在 RFC 1950 中规定),和 deflate 压缩算法(在 RFC 1951 中规定)。

    gzip

    表示采用 Lempel-Ziv coding (LZ77) 压缩算法,以及32位CRC校验的编码方式。这个编码方式最初由 UNIX 平台上的 gzip 程序采用。处于兼容性的考虑, HTTP/1.1 标准提议支持这种编码方式的服务器应该识别作为别名的 x-gzip 指令。
    identity
    用于指代自身(例如:未经过压缩和修改)。除非特别指明,这个标记始终可以被接受。
    示例

    分块编码

    分块编码主要应用于如下场景,即要传输大量的数据,但是在请求在没有被处理完之前响应的长度是无法获得的。例如,当需要用从数据库中查询获得的数据生成一个大的HTML表格的时候,或者需要传输大量的图片的时候。一个分块响应形式如下:

    HTTP/1.1 200 OK 
    Content-Type: text/plain 
    Transfer-Encoding: chunked
    
    7\r\n
    Mozilla\r\n 
    9\r\n
    Developer\r\n
    7\r\n
    Network\r\n
    0\r\n 
    \r\n
    

    HTTP 1.1引入分块传输编码提供了以下几点好处:

    • HTTP分块传输编码允许服务器为动态生成的内容维持HTTP持久连接。通常,持久链接需要服务器在开始发送消息体前发送Content-Length消息头字段,但是对于动态生成的内容来说,在内容创建完之前是不可知的。[动态内容,content-length无法预知]
    • 分块传输编码允许服务器在最后发送消息头字段。对于那些头字段值在内容被生成之前无法知道的情形非常重要,例如消息的内容要使用散列进行签名,散列的结果通过HTTP消息头字段进行传输。没有分块传输编码时,服务器必须缓冲内容直到完成后计算头字段的值并在发送内容前发送这些头字段的值。[散列签名,需缓冲完成才能计算]
    • HTTP服务器有时使用压缩 (gzip或deflate)以缩短传输花费的时间。分块传输编码可以用来分隔压缩对象的多个部分。在这种情况下,块不是分别压缩的,而是整个负载进行压缩,压缩的输出使用本文描述的方案进行分块传输。在压缩的情形中,分块编码有利于一边进行压缩一边发送数据,而不是先完成压缩过程以得知压缩后数据的大小。[gzip压缩,压缩与传输同时进行]

    一般情况HTTP的Header包含Content-Length域来指明报文体的长度。有时候服务生成HTTP回应是无法确定消息大小的,比如大文件的下载,或者后台需要复杂的逻辑才能全部处理页面的请求,这时用需要实时生成消息长度,服务器一般使用chunked编码

    原理

    k8s提供的watch功能是建立在对etcd的watch之上的,当etcd的key-value出现变化时,会通知kube-apiserver,这里的Key-vlaue其实就是k8s资源的持久化。

    早期的k8s架构中,kube-apiserver、kube-controller-manager、kube-scheduler、kubelet、kube-proxy,都是直接去watch etcd的,这样就造成etcd的连接数太大(节点成千上万时),对etcd压力太大,浪费资源,因此到了后面,只有kube-apiserver去watch etcd,而kube-apiserver对外提供watch api,也就是kube-controller-manager、kube-scheduler、kubelet、kube-proxy去watch kube-apiserver,这样大大减小了etcd的压力

    Watch API

    通过k8s 官网 rest api的描述,可以看到,Watch API实际上一个标准的HTTP GET请求,我们以Pod的Watch API为例

    HTTP Request
    
    GET /api/v1/watch/namespaces/{namespace}/pods
    Path Parameters
    
    Parameter   Description
    namespace   object name and auth scope, such as for teams and projects
    Query Parameters
    
    Parameter   Description
    fieldSelector   A selector to restrict the list of returned objects by their fields. Defaults to everything.
    labelSelector   A selector to restrict the list of returned objects by their labels. Defaults to everything.
    pretty  If ‘true’, then the output is pretty printed.
    resourceVersion When specified with a watch call, shows changes that occur after that particular version of a resource. Defaults to changes from the beginning of history. When specified for list: - if unset, then the result is returned from remote storage based on quorum-read flag; - if it’s 0, then we simply return what we currently have in cache, no guarantee; - if set to non zero, then the result is at least as fresh as given rv.
    timeoutSeconds  Timeout for the list/watch call.
    watch   Watch for changes to the described resources and return them as a stream of add, update, and remove notifications. Specify resourceVersion.
    Response
    
    Code    Description
    200 WatchEvent  OK
    

    从上面可以看出Watch其实就是一个GET请求,和一般请求不同的是,它有一个watch的query parameter,也就是kube-apiserver接到这个请求,当发现query parameter里面包含watch,就知道这是一个Watch API,watch参数默认为true。

    ==返回值是200和WatchEvent。apiserver首先会返回一个200的状态码,建立长连接,然后不断的返回watch event==

    服务器端机制

    通过watch api涉及到的http源码分析,可以看到,watch支持http长连接和websocket两种方式

    func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
        w = httplog.Unlogged(w)
    
        if wsstream.IsWebSocketRequest(req) {
            w.Header().Set("Content-Type", s.MediaType)
            websocket.Handler(s.HandleWS).ServeHTTP(w, req)
            return
        }
        ...
        framer := s.Framer.NewFrameWriter(w)
        ...
        e := streaming.NewEncoder(framer, s.Encoder)
        ...
        // begin the stream
        w.Header().Set("Content-Type", s.MediaType)
        w.Header().Set("Transfer-Encoding", "chunked")
        w.WriteHeader(http.StatusOK)
        flusher.Flush()
    
        var unknown runtime.Unknown
        internalEvent := &metav1.InternalEvent{}
        buf := &bytes.Buffer{}
        ch := s.Watching.ResultChan()
        for {
            select {
            case <-cn.CloseNotify():
                return
            case <-timeoutCh:
                return
            case event, ok := <-ch:
                if !ok {
                    // End of results.
                    return
                }
    
                obj := event.Object
                s.Fixup(obj)
                if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
                    // unexpected error
                    utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
                    return
                }
    
                // ContentType is not required here because we are defaulting to the serializer
                // type
                unknown.Raw = buf.Bytes()
                event.Object = &unknown
    
                // the internal event will be versioned by the encoder
                *internalEvent = metav1.InternalEvent(event)
                if err := e.Encode(internalEvent); err != nil {
                    utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v (%#v)", err, e))
                    // client disconnect.
                    return
                }
                if len(ch) == 0 {
                    flusher.Flush()
                }
    
                buf.Reset()
            }
        }
    }
    

    每次调用watch API,kube-apiserver都会建立一个WatchServer,WatchServer通过channel会从etcd里面获取资源的watch event,中间经过一系列的处理(事件的广播)。然后WatchServer通过ServeHTTP将事件发送给client,我们就详细看看ServerHTTP的处理逻辑

    首先会查看发送来的请求是不是要求使用websockt,即wsstream.IsWebSocketRequest(req),假如是的话就通过websocket向client发送watch event,也就是说kube-apiserver是支持通过websocket向客户端发送watch event的。

    假如不是的话,则首先设置http返回头,Content-Type设置为s.MediaType,一般为json,同时设置Transfer-Encoding为chunked,设置返回码为200(StatusOK),和我们从API分析那一节获取的信息一样,首先会返回一个200的状态吗。

    这里比较有意思的是,将Transfer-Encoding设置为chunked,这是http1.1中支持的协议,它会建立一个长连接,同时可以不停的发送数据块,发送数据块的格式是,它首先会发送一个数据块的长度,加上回车符(/r/n),接着发送相应的数据块内容。假如数据块长度为0,则代表数据发送完成,连接断开。显然这里的watch event就是一个个数据块。

    w.Header().Set("Content-Type", s.MediaType)
    w.Header().Set("Transfer-Encoding", "chunked")
    w.WriteHeader(http.StatusOK)
    

    看看接下里的for循环,首先从channel里面获取event

    event, ok := <-ch
    

    然后序列化数据

    obj := event.Object
    s.EmbeddedEncoder.Encode(obj, buf)
    unknown.Raw = buf.Bytes()
    event.Object = &unknown
    

    最后将数据发送出去

    *internalEvent = metav1.InternalEvent(event)
    e.Encode(internalEvent)
    

    具体实现

    考虑的http长连接的资源消耗与性能问题,实现pod监听是采用的ws协议

    同时在服务多实例时涉及到pod状态合并的问题

    例如:通过deployment部署3个pod示例,在3个pod都起来时,deployment才算正常,在3个pod都删除是,deployment才算下线。并且一些java应用启动的时间可能较长,采用livenessprobe探针健康检查是需要探测多次才可探测到启动成功,这个会收到多条modify的消息,诸如此类的情况都要考虑,可根据监控需求具体实现

    //定义自己msg消息结构体
    type msg struct {
        
    }
    
    //存储pod状态
    var cache *CacheStatus
    
    var msgChan = make(chan msg, 1024)
    
    //ws协议监听
    func WatchPod() {
        
        //ReconnWs为自己封装websocket工具包
        reconnWs := new(ReconnWs)
    
        u, _ := url.Parse("ws://" + k8surl + "/api/v1/watch/pods?watch=true&pretty=true")
    
        reconnWs.Dial(u, http.Header{
            "Origin": []string{"http://" + k8surl + "/"},
        })
    
        done := make(chan struct{})
    
        //监听信息处理函数
        go func() {
            for {
                m := <-msgChan
                //消息处理函数
                go handler()
            }
        }()
    
        //pod监听
        go func() {
    
            if cache == nil {
                cache = NewCache()
            }
    
            defer func() {
                reconnWs.Close()
                close(done)
            }()
            for {
    
                var event PodStatus
    
                _, message, err := reconnWs.ReadMessage()
                if err != nil {
                    log.Println("read:", err)
                    continue
                }
    
                if err := jsoniter.Unmarshal(message, &event); err != nil {
                    log.Println(err)
                    continue
                }
    
                switch event.Type {
    
                case Added:
                
    
                case Deleted:
    
                    
    
                case Modified:
    
                    
    
                case Error:
    
    
                }
    
                m := msg{
                    //填入获取的状态信息
                }
    
                msgChan <- m
    
            }
        }()
    
        interrupt := make(chan os.Signal, 1)
        signal.Notify(interrupt, os.Interrupt)
        for {
    
            defer func() {
                Close()
            }()
    
            select {
            case <-interrupt:
                select {
                case <-done:
                case <-time.After(time.Second):
                }
                return
            }
        }
    }
    
    
    //资源回收
    func Close() {
        
    }
    
    

    cache采用的是map结构存储pod状态,getStatuInfo()函数是监控程序启动的时候初始化服务状态数据的函数,可以选择每次启动都从k8s查询一遍,根据自己定义的监听规则填入pod状态,也可以在每次接受到消息时都存入持久化存储(mysql、redis等),每次启动再从持久化存储中查询以及初始化已存在的cache数据

    type CacheStatus struct {
        lock *sync.RWMutex
        bm   map[string]Status
    }
    
    type Status struct {
        ReadyReplicas    int
        Replicas         int
        Event            string
        Phase            string
        Errmsg           string
        CreatingReplicas int
    }
    
    func NewCache() *CacheStatus {
    
        return &CacheStatus{
            lock: new(sync.RWMutex),
            bm:   getStatuInfo(),
        }
    
    }
    
    func (m *CacheStatus) IsExist(k string) (isExist bool) {
    
        m.lock.Lock()
        defer m.lock.Unlock()
    
        if _, ok := m.bm[k]; ok {
            isExist = true
        }
    
        return
    }
    
    func (m *CacheStatus) Get(k string) (status Status) {
        m.lock.RLock()
        defer m.lock.RUnlock()
        if status, ok := m.bm[k]; ok {
            return status
        }
        return
    }
    
    func (m *CacheStatus) Set(k string, v Status) {
        m.lock.Lock()
        defer m.lock.Unlock()
    
        m.bm[k] = v
    
    }
    
    func (m *CacheStatus) Check(k string) bool {
        m.lock.RLock()
        defer m.lock.RUnlock()
        if _, ok := m.bm[k]; !ok {
            return false
        }
        return true
    }
    func (m *CacheStatus) Delete(k string) {
        m.lock.Lock()
        defer m.lock.Unlock()
        delete(m.bm, k)
    }
    
    //range map
    func (m *CacheStatus) Each(cb func(string, Status)) {
        m.lock.RLock()
        defer m.lock.RUnlock()
        for k, v := range m.bm {
            cb(k, v)
        }
    }
    
    func (m *CacheStatus) String() string {
        str, _ := jsoniter.MarshalToString(m.bm)
        return str
    }
    
    
    func getStatuInfo() (cacheInfo map[string]Status) {
    
        ....
        
        return
    }
    
    

    相关文章

      网友评论

        本文标题:k8s watch rest api

        本文链接:https://www.haomeiwen.com/subject/alrpextx.html