美文网首页程序员Go
Go发起HTTP2.0请求流程分析(前篇)

Go发起HTTP2.0请求流程分析(前篇)

作者: Gopher指北 | 来源:发表于2020-09-07 09:09 被阅读0次

    来自公众号: 新世界杂货铺

    前言

    Go中的HTTP请求之——HTTP1.1请求流程分析之后,中间断断续续,历时近一月,终于才敢开始码字写下本文。

    阅读建议

    HTTP2.0在建立TCP连接和安全的TLS传输通道与HTTP1.1的流程基本一致。所以笔者建议没有看过Go中的HTTP请求之——HTTP1.1请求流程分析这篇文章的先去补一下课,本文会基于前一篇文章仅介绍和HTTP2.0相关的逻辑。

    (*Transport).roundTrip

    (*Transport).roundTrip方法会调用t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)初始化TLSClientConfig以及h2transport,而这两者都和HTTP2.0有着紧密的联系。

    TLSClientConfig: 初始化client支持的http协议, 并在tls握手时告知server。

    h2transport: 如果本次请求是http2,那么h2transport会接管连接,请求和响应的处理逻辑。

    下面看看源码:

    func (t *Transport) onceSetNextProtoDefaults() {
        // ...此处省略代码...
        t2, err := http2configureTransport(t)
        if err != nil {
            log.Printf("Error enabling Transport HTTP/2 support: %v", err)
            return
        }
        t.h2transport = t2
    
        // ...此处省略代码...
    }
    func http2configureTransport(t1 *Transport) (*http2Transport, error) {
        connPool := new(http2clientConnPool)
        t2 := &http2Transport{
            ConnPool: http2noDialClientConnPool{connPool},
            t1:       t1,
        }
        connPool.t = t2
        if err := http2registerHTTPSProtocol(t1, http2noDialH2RoundTripper{t2}); err != nil {
            return nil, err
        }
        if t1.TLSClientConfig == nil {
            t1.TLSClientConfig = new(tls.Config)
        }
        if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "h2") {
            t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...)
        }
        if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") {
            t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1")
        }
        upgradeFn := func(authority string, c *tls.Conn) RoundTripper {
            addr := http2authorityAddr("https", authority)
            if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil {
                go c.Close()
                return http2erringRoundTripper{err}
            } else if !used {
                // Turns out we don't need this c.
                // For example, two goroutines made requests to the same host
                // at the same time, both kicking off TCP dials. (since protocol
                // was unknown)
                go c.Close()
            }
            return t2
        }
        if m := t1.TLSNextProto; len(m) == 0 {
            t1.TLSNextProto = map[string]func(string, *tls.Conn) RoundTripper{
                "h2": upgradeFn,
            }
        } else {
            m["h2"] = upgradeFn
        }
        return t2, nil
    }
    

    笔者将上述的源码简单拆解为以下几个步骤:

    1. 新建一个http2clientConnPool并复制给t2,以后http2的请求会优先从该连接池中获取连接。
    2. 初始化TLSClientConfig,并将支持的h2http1.1协议添加到TLSClientConfig.NextProtos中。
    3. 定义一个h2upgradeFn存储到t1.TLSNextProto里。

    鉴于前一篇文章对新建连接前的步骤有了较为详细的介绍,所以这里直接看和server建立连接的部分源码,即(*Transport).dialConn方法:

    func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
        // ...此处省略代码...
        if cm.scheme() == "https" && t.hasCustomTLSDialer() {
            // ...此处省略代码...
        } else {
            conn, err := t.dial(ctx, "tcp", cm.addr())
            if err != nil {
                return nil, wrapErr(err)
            }
            pconn.conn = conn
            if cm.scheme() == "https" {
                var firstTLSHost string
                if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
                    return nil, wrapErr(err)
                }
                if err = pconn.addTLS(firstTLSHost, trace); err != nil {
                    return nil, wrapErr(err)
                }
            }
        }
    
        // Proxy setup.
        // ...此处省略代码...
    
        if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
            if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
                return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil
            }
        }
    
        // ...此处省略代码...
    }
    

    笔者对上述的源码描述如下:

    1. 调用t.dial(ctx, "tcp", cm.addr())创建TCP连接。
    2. 如果是https的请求, 则对请求建立安全的tls传输通道。
    3. 检查tls的握手状态,如果和server协商的NegotiatedProtocol协议不为空,且client的t.TLSNextProto有该协议,则返回alt不为空的持久连接(HTTP1.1不会进入if条件里)。

    笔者对上述的第三点进行展开。经笔者在本地debug验证,当client和server都支持http2时,s.NegotiatedProtocol的值为h2s.NegotiatedProtocolIsMutual的值为true

    在上面分析http2configureTransport函数时,我们知道TLSNextProto注册了一个key为h2的函数,所以调用next实际就是调用前面的upgradeFn函数。

    upgradeFn会调用connPool.addConnIfNeeded向http2的连接池添加一个tls传输通道,并最终返回前面已经创建好的t2http2Transport

    func (p *http2clientConnPool) addConnIfNeeded(key string, t *http2Transport, c *tls.Conn) (used bool, err error) {
        p.mu.Lock()
        // ...此处省略代码...
        // 主要用于判断是否有必要像连接池添加新的连接
        // 判断连接池中是否已有同host连接,如果有且该链接能够处理新的请求则直接返回
        call, dup := p.addConnCalls[key]
        if !dup {
            // ...此处省略代码...
            call = &http2addConnCall{
                p:    p,
                done: make(chan struct{}),
            }
            p.addConnCalls[key] = call
            go call.run(t, key, c)
        }
        p.mu.Unlock()
    
        <-call.done
        if call.err != nil {
            return false, call.err
        }
        return !dup, nil
    }
    func (c *http2addConnCall) run(t *http2Transport, key string, tc *tls.Conn) {
        cc, err := t.NewClientConn(tc)
    
        p := c.p
        p.mu.Lock()
        if err != nil {
            c.err = err
        } else {
            p.addConnLocked(key, cc)
        }
        delete(p.addConnCalls, key)
        p.mu.Unlock()
        close(c.done)
    }
    
    

    分析上述的源码我们能够得到两点结论:

    1. 执行完upgradeFn之后,(*Transport).dialConn返回的持久化连接中alt字段已经不是nil了。
    2. t.NewClientConn(tc)新建出来的连接会保存在http2的连接池即http2clientConnPool中,下一小结将对NewClientConn展开分析。

    最后我们回到(*Transport).roundTrip方法并分析其中的关键源码:

    func (t *Transport) roundTrip(req *Request) (*Response, error) {
        t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
        // ...此处省略代码...
        for {
            select {
            case <-ctx.Done():
                req.closeBody()
                return nil, ctx.Err()
            default:
            }
    
            // ...此处省略代码...
            pconn, err := t.getConn(treq, cm)
            if err != nil {
                t.setReqCanceler(req, nil)
                req.closeBody()
                return nil, err
            }
    
            var resp *Response
            if pconn.alt != nil {
                // HTTP/2 path.
                t.setReqCanceler(req, nil) // not cancelable with CancelRequest
                resp, err = pconn.alt.RoundTrip(req)
            } else {
                resp, err = pconn.roundTrip(treq)
            }
            if err == nil {
                return resp, nil
            }
    
            // ...此处省略代码...
        }
    }
    

    结合前面的分析,pconn.alt在server和client都支持http2协议的情况下是不为nil的。所以,http2的请求会走pconn.alt.RoundTrip(req)分支,也就是说http2的请求流程就被http2Transport接管啦。

    (*http2Transport).NewClientConn

    (*http2Transport).NewClientConn内部会调用t.newClientConn(c, t.disableKeepAlives())

    因为本节内容较多,所以笔者不再一次性贴出源码,而是按关键步骤分析并分块儿贴出源码。

    1、初始化一个http2ClientConn

    cc := &http2ClientConn{
        t:                     t,
        tconn:                 c,
        readerDone:            make(chan struct{}),
        nextStreamID:          1,
        maxFrameSize:          16 << 10,           // spec default
        initialWindowSize:     65535,              // spec default
        maxConcurrentStreams:  1000,               // "infinite", per spec. 1000 seems good enough.
        peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
        streams:               make(map[uint32]*http2clientStream),
        singleUse:             singleUse,
        wantSettingsAck:       true,
        pings:                 make(map[[8]byte]chan struct{}),
    }
    

    上面的源码新建了一个默认的http2ClientConn。

    initialWindowSize:初始化窗口大小为65535,这个值之后会初始化每一个数据流可发送的数据窗口大小。

    maxConcurrentStreams:表示每个连接上允许最多有多少个数据流同时传输数据。

    streams:当前连接上的数据流。

    singleUse: 控制http2的连接是否允许多个数据流共享,其值由t.disableKeepAlives()控制。

    2、创建一个条件锁并且新建Writer&Reader。

    cc.cond = sync.NewCond(&cc.mu)
    cc.flow.add(int32(http2initialWindowSize))
    cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr})
    cc.br = bufio.NewReader(c)
    

    新建Writer&Reader没什么好说的,需要注意的是cc.flow.add(int32(http2initialWindowSize))

    cc.flow.add将当前连接的可写流控制窗口大小设置为http2initialWindowSize,即65535。

    3、新建一个读写数据帧的Framer。

    cc.fr = http2NewFramer(cc.bw, cc.br)
    cc.fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil)
    cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
    

    4、向server发送开场白,并发送一些初始化数据帧。

    initialSettings := []http2Setting{
        {ID: http2SettingEnablePush, Val: 0},
        {ID: http2SettingInitialWindowSize, Val: http2transportDefaultStreamFlow},
    }
    if max := t.maxHeaderListSize(); max != 0 {
        initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxHeaderListSize, Val: max})
    }
    
    cc.bw.Write(http2clientPreface)
    cc.fr.WriteSettings(initialSettings...)
    cc.fr.WriteWindowUpdate(0, http2transportDefaultConnFlow)
    cc.inflow.add(http2transportDefaultConnFlow + http2initialWindowSize)
    cc.bw.Flush()
    

    client向server发送的开场白内容如下:

    const (
        // client首先想server发送以PRI开头的一串字符串。
        http2ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
    )
    var (
        http2clientPreface = []byte(http2ClientPreface)
    )
    

    发送完开场白后,client向server发送SETTINGS数据帧。

    http2SettingEnablePush: 告知server客户端是否开启push功能。

    http2SettingInitialWindowSize:告知server客户端可接受的最大数据窗口是http2transportDefaultStreamFlow(4M)。

    发送完SETTINGS数据帧后,发送WINDOW_UPDATE数据帧, 因为第一个参数为0即streamID为0,则是告知server此连接可接受的最大数据窗口为http2transportDefaultConnFlow(1G)。

    发送完WINDOW_UPDATE数据帧后,将client的可读流控制窗口大小设置为http2transportDefaultConnFlow + http2initialWindowSize

    5、开启读循环并返回

    go cc.readLoop()
    

    (*http2Transport).RoundTrip

    (*http2Transport).RoundTrip只是一个入口函数,它会调用(*http2Transport). RoundTripOpt方法。

    (*http2Transport). RoundTripOpt有两个步骤比较关键:

    t.connPool().GetClientConn(req, addr): 在http2的连接池里面获取一个可用连接,其中连接池的类型为http2noDialClientConnPool,参考http2configureTransport函数。

    cc.roundTrip(req): 通过获取到的可用连接发送请求并返回响应。

    (http2noDialClientConnPool).GetClientConn

    根据实际的debug结果(http2noDialClientConnPool).GetClientConn最终会调用(*http2clientConnPool).getClientConn(req *Request, addr string, dialOnMiss bool)

    通过(http2noDialClientConnPool).GetClientConn获取连接时传递给(*http2clientConnPool).getClientConn方法的第三个参数始终为false,该参数为false时代表着即使无法正常获取可用连接,也不在这个环节重新发起拨号流程。

    在(*http2clientConnPool).getClientConn中会遍历同地址的连接,并判断连接的状态从而获取一个可以处理请求的连接。

    for _, cc := range p.conns[addr] {
        if st := cc.idleState(); st.canTakeNewRequest {
            if p.shouldTraceGetConn(st) {
                http2traceGetConn(req, addr)
            }
            p.mu.Unlock()
            return cc, nil
        }
    }
    

    cc.idleState()判断当前连接池中的连接能否处理新的请求:

    1、当前连接是否能被多个请求共享,如果仅单个请求使用且已经有一个数据流,则当前连接不能处理新的请求。

    if cc.singleUse && cc.nextStreamID > 1 {
        return
    }
    

    2、以下几点均为true时,才代表当前连接能够处理新的请求:

    • 连接状态正常,即未关闭并且不处于正在关闭的状态。
    • 当前连接正在处理的数据流小于maxConcurrentStreams
    • 下一个要处理的数据流 + 当前连接处于等待状态的请求*2 < math.MaxInt32。
    • 当前连接没有长时间处于空闲状态(主要通过cc.tooIdleLocked()判断)。
    st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
            int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
            !cc.tooIdleLocked()
    

    当从链接池成功获取到一个可以处理请求的连接,就可以和server进行数据交互,即(*http2ClientConn).roundTrip流程。

    (*http2ClientConn).roundTrip

    1、在真正开始处理请求前,还要进行header检查,http2对http1.1的某些header是不支持的,笔者就不对这个逻辑进行分析了,直接上源码:

    func http2checkConnHeaders(req *Request) error {
        if v := req.Header.Get("Upgrade"); v != "" {
            return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"])
        }
        if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") {
            return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv)
        }
        if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !strings.EqualFold(vv[0], "close") && !strings.EqualFold(vv[0], "keep-alive")) {
            return fmt.Errorf("http2: invalid Connection request header: %q", vv)
        }
        return nil
    }
    func http2commaSeparatedTrailers(req *Request) (string, error) {
        keys := make([]string, 0, len(req.Trailer))
        for k := range req.Trailer {
            k = CanonicalHeaderKey(k)
            switch k {
            case "Transfer-Encoding", "Trailer", "Content-Length":
                return "", &http2badStringError{"invalid Trailer key", k}
            }
            keys = append(keys, k)
        }
        if len(keys) > 0 {
            sort.Strings(keys)
            return strings.Join(keys, ","), nil
        }
        return "", nil
    }
    

    2、调用(*http2ClientConn).awaitOpenSlotForRequest,一直等到当前连接处理的数据流小于maxConcurrentStreams, 如果此函数返回错误,则本次请求失败。

    2.1、double check当前连接可用。

    if cc.closed || !cc.canTakeNewRequestLocked() {
        if waitingForConn != nil {
            close(waitingForConn)
        }
        return http2errClientConnUnusable
    }
    

    2.2、如果当前连接处理的数据流小于maxConcurrentStreams则直接返回nil。笔者相信大部分逻辑走到这儿就返回了。

    if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
        if waitingForConn != nil {
            close(waitingForConn)
        }
        return nil
    }
    

    2.3、如果当前连接处理的数据流确实已经达到上限,则开始进入等待流程。

    if waitingForConn == nil {
        waitingForConn = make(chan struct{})
        go func() {
            if err := http2awaitRequestCancel(req, waitingForConn); err != nil {
                cc.mu.Lock()
                waitingForConnErr = err
                cc.cond.Broadcast()
                cc.mu.Unlock()
            }
        }()
    }
    cc.pendingRequests++
    cc.cond.Wait()
    cc.pendingRequests--
    

    通过上面的逻辑知道,当前连接处理的数据流达到上限后有两种情况,一是等待请求被取消,二是等待其他请求结束。如果有其他数据流结束并唤醒当前等待的请求,则重复2.1、2.2和2.3的步骤。

    3、调用cc.newStream()在连接上创建一个数据流(创建数据流是线程安全的,因为源码中在调用awaitOpenSlotForRequest之前先加锁,直到写入请求的header之后才释放锁)。

    func (cc *http2ClientConn) newStream() *http2clientStream {
        cs := &http2clientStream{
            cc:        cc,
            ID:        cc.nextStreamID,
            resc:      make(chan http2resAndError, 1),
            peerReset: make(chan struct{}),
            done:      make(chan struct{}),
        }
        cs.flow.add(int32(cc.initialWindowSize))
        cs.flow.setConnFlow(&cc.flow)
        cs.inflow.add(http2transportDefaultStreamFlow)
        cs.inflow.setConnFlow(&cc.inflow)
        cc.nextStreamID += 2
        cc.streams[cs.ID] = cs
        return cs
    }
    

    笔者对上述代码简单描述如下:

    • 新建一个http2clientStream,数据流ID为cc.nextStreamID,新建数据流后,cc.nextStreamID +=2
    • 数据流通过http2resAndError管道接收请求的响应。
    • 初始化当前数据流的可写流控制窗口大小为cc.initialWindowSize,并保存连接的可写流控制指针。
    • 初始化当前数据流的可读流控制窗口大小为http2transportDefaultStreamFlow,并保存连接的可读流控制指针。
    • 最后将新建的数据流注册到当前连接中。

    4、调用cc.t.getBodyWriterState(cs, body)会返回一个http2bodyWriterState结构体。通过该结构体可以知道请求body是否发送成功。

    func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reader) (s http2bodyWriterState) {
        s.cs = cs
        if body == nil {
            return
        }
        resc := make(chan error, 1)
        s.resc = resc
        s.fn = func() {
            cs.cc.mu.Lock()
            cs.startedWrite = true
            cs.cc.mu.Unlock()
            resc <- cs.writeRequestBody(body, cs.req.Body)
        }
        s.delay = t.expectContinueTimeout()
        if s.delay == 0 ||
            !httpguts.HeaderValuesContainsToken(
                cs.req.Header["Expect"],
                "100-continue") {
            return
        }
        // 此处省略代码,因为绝大部分请求都不会设置100-continue的标头
        return
    }
    

    s.fn: 标记当前数据流开始写入数据,并且将请求body的发送结果写入s.resc管道(本文暂不对writeRequestBody展开分析,下篇文章会对其进行分析)。

    5、因为是多个请求共享一个连接,那么向连接写入数据帧时需要加锁,比如加锁写入请求头。

    cc.wmu.Lock()
    endStream := !hasBody && !hasTrailers
    werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
    cc.wmu.Unlock()
    

    6、如果有请求body,则开始写入请求body,没有请求body则设置响应header的超时时间(有请求body时,响应header的超时时间需要在请求body写完之后设置)。

    if hasBody {
        bodyWriter.scheduleBodyWrite()
    } else {
        http2traceWroteRequest(cs.trace, nil)
        if d := cc.responseHeaderTimeout(); d != 0 {
            timer := time.NewTimer(d)
            defer timer.Stop()
            respHeaderTimer = timer.C
        }
    }
    

    scheduleBodyWrite的内容如下:

    func (s http2bodyWriterState) scheduleBodyWrite() {
        if s.timer == nil {
            // We're not doing a delayed write (see
            // getBodyWriterState), so just start the writing
            // goroutine immediately.
            go s.fn()
            return
        }
        http2traceWait100Continue(s.cs.trace)
        if s.timer.Stop() {
            s.timer.Reset(s.delay)
        }
    }
    

    因为笔者的请求header中没有携带100-continue标头,所以在前面的getBodyWriterState函数中初始化的s.timer为nil即调用scheduleBodyWrite会立即开始发送请求body。

    7、轮询管道获取响应结果。

    在看轮询源码之前,先看一个简单的函数:

    handleReadLoopResponse := func(re http2resAndError) (*Response, bool, error) {
        res := re.res
        if re.err != nil || res.StatusCode > 299 {
            bodyWriter.cancel()
            cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
        }
        if re.err != nil {
            cc.forgetStreamID(cs.ID)
            return nil, cs.getStartedWrite(), re.err
        }
        res.Request = req
        res.TLS = cc.tlsState
        return res, false, nil
    }
    

    该函数主要就是判断读到的响应是否正常,并根据响应的结果构造(*http2ClientConn).roundTrip的返回值。

    了解了handleReadLoopResponse之后,下面就看看轮询的逻辑:

    for {
        select {
        case re := <-readLoopResCh:
            return handleReadLoopResponse(re)
        // 此处省略代码(包含请求取消,请求超时等管道的轮询)
        case err := <-bodyWriter.resc:
            // Prefer the read loop's response, if available. Issue 16102.
            select {
            case re := <-readLoopResCh:
                return handleReadLoopResponse(re)
            default:
            }
            if err != nil {
                cc.forgetStreamID(cs.ID)
                return nil, cs.getStartedWrite(), err
            }
            bodyWritten = true
            if d := cc.responseHeaderTimeout(); d != 0 {
                timer := time.NewTimer(d)
                defer timer.Stop()
                respHeaderTimer = timer.C
            }
        }
    }
    

    笔者仅对上面的第二种情况即请求body发送完成进行描述:

    • 能否读到响应,如果能够读取响应则直接返回。
    • 判断请求body是否发送成功,如果发送失败,直接返回。
    • 如果请求body发送成功,则设置响应header的超时时间。

    总结

    本文主要描述了两个方面的内容:

    1. 确认client和server都支持http2协议,并构建一个http2的连接,同时开启该连接的读循环。
    2. 通过http2连接池获取一个http2连接,并发送请求和读取响应。

    预告

    鉴于HTTTP2.0的内容较多,且文章篇幅过长时不易阅读,笔者将后续要分析的内容拆为两个部分:

    1. 描述数据帧和流控制以及读循环读到响应并发送给readLoopResCh管道。
    2. http2.0标头压缩逻辑。

    最后,衷心希望本文能够对各位读者有一定的帮助。

    :

    1. 写本文时, 笔者所用go版本为: go1.14.2。
    2. 本文对h2c的情况不予以考虑。
    3. 因为笔者分析的是请求流程,所以没有在本地搭建server,而是使用了一个支持http2连接的图片一步步的debug。eg: https://dss0.bdstatic.com/5aV1bjqh_Q23odCf/static/superman/img/topnav/baiduyun@2x-e0be79e69e.png

    参考

    https://developers.google.com/web/fundamentals/performance/http2?hl=zh-cn

    相关文章

      网友评论

        本文标题:Go发起HTTP2.0请求流程分析(前篇)

        本文链接:https://www.haomeiwen.com/subject/ghpcektx.html