美文网首页etcdGo
golang http2长链接问题

golang http2长链接问题

作者: 凹大猫的鱼 | 来源:发表于2020-06-28 19:47 被阅读0次

    前几天项目中遇到了一个长链接假死问题,服务端和client端采用的是h2c长连接。服务端作为sidecar部署在k8s的pod里面,当滚动升级pod的时候,client端和老的pod的连接一直存在,即使老的pod已经被删除了。(client和网关是一个东西)

    发现问题

    突然有一天前端同事说调用全部503(内部服务不可用),赶紧去环境上查看log,发现网关发送到后段的请求全部超时。用netstat查看连接状态也是没问题的。

    网关端状态图

    去新建的pod上查看,发现并没有连接,因此网关端的连接并不是和新建的pod建立的。

    服务端状态图

    解决问题

    1.查看网关端代码

    首先看一下网关测的代码:

        func NewH2cClient() *http.Client {
        client := &http.Client{
            Transport: &http2.Transport{
                AllowHTTP: true,
                DialTLS:   dialH2cTimeout,
            },
            Timeout: 10 * time.Second,
            }
            return client
        }
    
        func dialH2cTimeout(network, addr string, cfg *tls.Config) (net.Conn, error) {
           tylog.Infof("connect to remote address: %+v:", addr)
           return net.Dial(network, addr)
        }
    

    client端的实现很简单,复用http.Client对象。在这里设置了TimeOut为10s,这个是发送超时时间。通过log也可以看到10s超时后返回了error,但连接并没有终端,查阅了一些net/http的文档服务端对于超时的控制还是比较细腻的。

    服务端超时控制

    但client端的控制就比较简陋(其实应该是net/http封装的东西太多,导致用户可直接操作的东西太少)

    client端超时控制

    从图中可以看出,client端不像服务端有读写超时设置,client端就一个Timeout,是从建立链接到接收到body的时间段。因此我们在client端可以做的东西貌似并不多。

    2.回归问题

    查阅了部分资料后,回到问题的所在点:

    为什么服务端已经被删除,但长链接还存在??

    因此我们需要看的就是client端的conn什么时候应该断开。http包中的读和写是分开的,分别对应的是两个goroutine,我们主要应该看的是读操作。

          func (cc *ClientConn) readLoop() {
             rl := &clientConnReadLoop{cc: cc}
             defer rl.cleanup() //这个会清理链接
             cc.readerErr = rl.run()
             if ce, ok := cc.readerErr.(ConnectionError); ok {
                  cc.wmu.Lock()
                  cc.fr.WriteGoAway(0, ErrCode(ce), nil)
                  cc.wmu.Unlock()
             }
         }
    

    上述代码是read的入口,我们看一下run这个函数的实现

    func (rl *clientConnReadLoop) run() error {
        cc := rl.cc
        rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
        gotReply := false // ever saw a HEADERS reply
        gotSettings := false
        for {
            f, err := cc.fr.ReadFrame()
            if err != nil {
                cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
            }
            if se, ok := err.(StreamError); ok {
                if cs := cc.streamByID(se.StreamID, false); cs != nil {
                    cs.cc.writeStreamReset(cs.ID, se.Code, err)
                    cs.cc.forgetStreamID(cs.ID)
                    if se.Cause == nil {
                        se.Cause = cc.fr.errDetail
                    }
                    rl.endStreamError(cs, se)
                }
                continue
            } else if err != nil {
                return err
            }
            if VerboseLogs {
                cc.vlogf("http2: Transport received %s", summarizeFrame(f))
            }
            if !gotSettings {
                if _, ok := f.(*SettingsFrame); !ok {
                    cc.logf("protocol error: received %T before a SETTINGS frame", f)
                    return ConnectionError(ErrCodeProtocol)
                }
                gotSettings = true
            }
            maybeIdle := false // whether frame might transition us to idle
    
            switch f := f.(type) {
            case *MetaHeadersFrame:
                err = rl.processHeaders(f)
                maybeIdle = true
                gotReply = true
            case *DataFrame:
                err = rl.processData(f)
                maybeIdle = true
            case *GoAwayFrame:
                err = rl.processGoAway(f)
                maybeIdle = true
            case *RSTStreamFrame:
                err = rl.processResetStream(f)
                maybeIdle = true
            case *SettingsFrame:
                err = rl.processSettings(f)
            case *PushPromiseFrame:
                err = rl.processPushPromise(f)
            case *WindowUpdateFrame:
                err = rl.processWindowUpdate(f)
            case *PingFrame:
                err = rl.processPing(f)
            default:
                cc.logf("Transport: unhandled response frame type %T", f)
            }
            if err != nil {
                if VerboseLogs {
                    cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
                }
                return err
            }
            if rl.closeWhenIdle && gotReply && maybeIdle {
                cc.closeIfIdle()
            }
        }
    }
    

    再看一下ReadFrame函数:

    // ReadFrame reads a single frame. The returned Frame is only valid
    // until the next call to ReadFrame.
    //
    // If the frame is larger than previously set with SetMaxReadFrameSize, the
    // returned error is ErrFrameTooLarge. Other errors may be of type
    // ConnectionError, StreamError, or anything else from the underlying
    // reader.
    func (fr *Framer) ReadFrame() (Frame, error) {
        fr.errDetail = nil
        if fr.lastFrame != nil {
            fr.lastFrame.invalidate()
        }
        fh, err := readFrameHeader(fr.headerBuf[:], fr.r)
        if err != nil {
            return nil, err
        }
        if fh.Length > fr.maxReadSize {
            return nil, ErrFrameTooLarge
        }
        payload := fr.getReadBuf(fh.Length)
        if _, err := io.ReadFull(fr.r, payload); err != nil {
            return nil, err
        }
        f, err := typeFrameParser(fh.Type)(fr.frameCache, fh, payload)
        if err != nil {
            if ce, ok := err.(connError); ok {
                return nil, fr.connError(ce.Code, ce.Reason)
            }
            return nil, err
        }
        if err := fr.checkFrameOrder(f); err != nil {
            return nil, err
        }
        if fr.logReads {
            fr.debugReadLoggerf("http2: Framer %p: read %v", fr, summarizeFrame(f))
        }
        if fh.Type == FrameHeaders && fr.ReadMetaHeaders != nil {
            return fr.readMetaFrame(f.(*HeadersFrame))
        }
        return f, nil
    }
    

    其实就是一直阻塞读取数据,只有在收到reset或者fin包(EOF)的时候才会退出,然后会执行前面的清理函数:

     defer rl.cleanup()
    

    该函数会调用MarkDead函数清理conn关联的map

    func (p *clientConnPool) MarkDead(cc *ClientConn) {
        p.mu.Lock()
        defer p.mu.Unlock()
        for _, key := range p.keys[cc] {
            vv, ok := p.conns[key]
            if !ok {
                continue
            }
            newList := filterOutClientConn(vv, cc)
            if len(newList) > 0 {
                p.conns[key] = newList
            } else {
                delete(p.conns, key)
            }
        }
        delete(p.keys, cc)
    }
    

    看到这基本就知道问题的所在了:

    1. 删除pod的时候服务端没有发送Fin包
    2. 服务端发送了Fin包,但网关没收到

    3.验证问题所在

    使用了tcpdump抓包,发现client端的确没有收到Fin包

    client端抓包

    从抓包信息看的确没有收到Fin包,服务端抓也没抓到,于是感觉像k8s升级的时候直接删掉了老的pod(类似直接delete),并没有发送kill命令。但集群权限是拿不到的,不太好认证这个猜想。

    4.解决方案

    因为服务端也是自己写的,因此可以设置deployment来设置pod更新时的动作,比如:

    "lifecycle": {
                  "preStop": {
                    "httpGet": {
                      "path": "/hc.do?a=offline",
                      "scheme": "HTTP",
                      "port": 8809
                    }
                  }
                },
    

    让服务监听8809端口,k8s在删除pod前会访问 http127.0.0.1:8809/hc.do?a=offline, 这时候在服务里面做优雅下线,关掉tcp链接,这个问题就不会发生了。

    5.总结

    net/http包虽然很容易实现http服务,但对于client端的确不够友好,至少我没找到什么方式可以直接去控制超时时间,或者管理conn连接池,当然我也可以在它基础上实现一个自己的连接池(ClientConnPool接口)但成本不低。所以很多人都会自己去实现。

    对于pod删除时信号量的问题,因为没有权限,所以没法看pod滚动升级时,kublet是怎么删除老的节点的,按照我的理解应该是先给容器发kill -15, 如果没退出再kill -9, 但就目前情况来看服务端并没有捕捉到信号量(我加了捕捉信号量,但并未打印出来log),后面有机会再去深入研究一下k8s的那块代码。

    相关文章

      网友评论

        本文标题:golang http2长链接问题

        本文链接:https://www.haomeiwen.com/subject/qnbwfktx.html