美文网首页Golang程序员
gRPC-transport包源码分析

gRPC-transport包源码分析

作者: 月光夕颜 | 来源:发表于2017-08-04 18:36 被阅读656次

    gRPC是基于HTTP/2标准和proto协议开发的,gRPC的很多特性都依赖于HTTP/2标准提供。gRPC设计的四种模式是基于底层HTTP/2的流的概念。transport包是基于HTTP/2标准的实现,提供了流控等特性。

    流控

    transport提供基于connection和stream的两级流控。

    -------------------------------------gRPC流控默认值----------------------------------------------
        defaultWindowSize = 65535 //64K
        initialWindowSize     = defaultWindowSize      // for an RPC
        initialConnWindowSize = defaultWindowSize * 16 // for a connection
    -------------------------------------流控数据结构------------------------------------------------
    type inFlow struct {
        //流控限制未处理的数据的数量
        limit uint32
        mu sync.Mutex
        //pendingData包含所有收到但未被应用消费的数据
        pendingData uint32
        //pendingUpdate包含被消费但为发送更新窗口的数量,减少窗口更新的频率
        pendingUpdate uint32
    }
    //真实的流控处理函数,server在接收到client的请求后会先
    //检查pendingData+pendingUpdate是否超过limit限制
    func (f *inFlow) onData(n uint32) error {
        f.mu.Lock()
        defer f.mu.Unlock()
        f.pendingData += n
        if f.pendingData+f.pendingUpdate > f.limit {
            return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
        }
        return nil
    }
    //http2标准中规定:针对控制类的frame,为了确保能够得到高优先级的处理不做流控。DataFrame的流控处理在如下的函数中进行处理。
    ----------------------------------server端处理流------------------------------------------------
    //server端handleData用于接收dataFrame
    func (t *http2Server) handleData(f *http2.DataFrame) {
        size := len(f.Data())
        //针对connection的流控,如果client和server在该connection的负载大于16 * 64K,server会主动断开与client之间的连接。
        if err := t.fc.onData(uint32(size)); err != nil {
            //onData函数实现见流控的数据结构
            grpclog.Printf("transport: http2Server %v", err)
            //超过负载,直接关闭connection
            t.Close()
            return
        }
        // 选择正确的流进行处理
        s, ok := t.getStream(f)
        if !ok {
            if w := t.fc.onRead(uint32(size)); w > 0 {
              //更新流控窗口的大小
                t.controlBuf.put(&windowUpdate{0, w})
            }
            return
        }
        if size > 0 {
            s.mu.Lock()
            if s.state == streamDone {
                s.mu.Unlock()
                // stream已经被关闭,需要更新流控窗口
                if w := t.fc.onRead(uint32(size)); w > 0 {
                    t.controlBuf.put(&windowUpdate{0, w})
                }
                return
            }
          //同一连接上的不同stream具有竞争关系,提供了strean级的流控
            if err := s.fc.onData(uint32(size)); err != nil {
                //onData()函数实现见流控数据结构
                s.mu.Unlock()
                //关闭超过流控限制的stream
                t.closeStream(s)
                //通知client再建立streamID相同的stream
                t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
                return
            }
            s.mu.Unlock()
            data := make([]byte, size)
            copy(data, f.Data())
            s.write(recvMsg{data: data})
        }
        if f.Header().Flags.Has(http2.FlagDataEndStream) {
            s.mu.Lock()
            if s.state != streamDone {
                s.state = streamReadDone
            }
            s.mu.Unlock()
            s.write(recvMsg{err: io.EOF})
        }
    }
    

    RPC调用的执行过程

    以unary模式的rpc调用为例分析一次RPC请求在gRPC中的流转过程,其他三种模式底层调用的函数与unary模式相同(四种模式从底层的HTTP/2分析都是stream,并且仍然是一套request和response的实现)。

    : 以下源码分析部分均是以grpc/example/route_guide为例进行分析。对其他模式感兴趣的读者可自行分析。

    unary模式的RPC请求在gRPC中的执行过程
    ------------------------------------------proto的声明-------------------------------------------
    service RouteGuide {
      rpc GetFeature(Point) returns (Feature) {}
    }
    ------------------------------------------pb.go源码---------------------------------------------
    func (c *routeGuideClient) GetFeature(ctx context.Context, in *Point, opts ...grpc.CallOption) (*Feature, error) {
        out := new(Feature)
      // -->/routeguide.RouteGuide/GetFeature ->/package/server/method
        err := grpc.Invoke(ctx, "/routeguide.RouteGuide/GetFeature", in, out, c.cc, opts...)
        if err != nil {
            return nil, err
        }
        return out, nil
    }
    //以下代码去掉错误处理和非关键函数的调用
    //以下代码分析的是grpc client端如何发送request到server
    -----------------------------------------grpc-client代码----------------------------------------
    func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
        c := defaultCallInfo //构造rpc调用的defaultCallInfo并根据用户传入的信息进行填充
        topts := &transport.Options{
            Last:  true,
            Delay: false,
        }
        for {
            var (
                err    error
                t      transport.ClientTransport
                stream *transport.Stream
                put func()
            )
            //callHdr携带详细的RPC调用信息,如Method->/routeguide.RouteGuide/GetFeature
            callHdr := &transport.CallHdr{
                Host:   cc.authority,
                Method: method,
            }
            gopts := BalancerGetOptions{
                BlockingWait: !c.failFast,
            }
            t, put, err = cc.getTransport(ctx, gopts)
            if err != nil {
                if _, ok := err.(*rpcError); ok {
                    return err
                }
                //非failFast情况下,err为以下两种情况会重试
                if err == errConnClosing || err == errConnUnavailable {
                    if c.failFast {
                        return Errorf(codes.Unavailable, "%v", err)
                    }
                    continue
                }
                return Errorf(codes.Internal, "%v", err)
            }
            //将client请求信息发送,并等待server返回
            stream, err = sendRequest(ctx, cc.dopts.codec, cc.dopts.cp, callHdr, t, args, topts)
            if err != nil {
                if put != nil {
                    put()
                    put = nil
                }
                if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
                    if c.failFast {
                        return toRPCErr(err)
                    }
                    continue
                }
                return toRPCErr(err)
            }
            //在sendRequest创建的stream上等待server返回response
            err = recvResponse(cc.dopts, t, &c, stream, reply)
            if err != nil {
                if put != nil {
                    put()
                    put = nil
                }
                if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
                    if c.failFast {
                        return toRPCErr(err)
                    }
                    continue
                }
                return toRPCErr(err)
            }
            //关闭创建的stream
            t.CloseStream(stream, nil)
            if put != nil {
                put()
                put = nil
            }
            return Errorf(stream.StatusCode(), "%s", stream.StatusDesc())
        }
    }
    ----------------------------------------------sendRequest()说明--------------------------------
    func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) {  
         //根据callHdr中包含的host和method信息创建对应的stream
        //函数具体实现-transport/http2_client.go/http2Client.NewStream()
        stream, err := t.NewStream(ctx, callHdr)
        //序列化消息并定义消息头
        //消息头=5yte=1byte(msg是否压缩) + 4byte(msg长度)
        //函数具体实现-rpc_util.go
        outBuf, err := encode(codec, args, compressor, cbuf)
        //将outBuf按照http2帧的大小分帧并发送到对端,下面会对该函数具体分析
        err = t.Write(stream, outBuf, opts)
        //发送成功,返回该stream,用于接收response
        return stream, nil
    }
    ------------------------------------ClientTransport.Write()说明---------------------------------
    //真正将message分帧在指定的stream上传输的函数如下,将对该函数进行详细分析
    func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
        r := bytes.NewBuffer(data)
        for {
            var p []byte
            if r.Len() > 0 {
                size := http2MaxFrameLen
                s.sendQuotaPool.add(0)
                // 等待stream的流控上有配额发送数据,stream.sendQuotaPool=65535
                sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
                if err != nil {
                    return err
                }
                t.sendQuotaPool.add(0)
                // 等待connection的流控有配额去发送数据,t.sendQuotaPool= 65535 * 16
                tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
                if err != nil {
                    if _, ok := err.(StreamError); ok || err == io.EOF {
                        t.sendQuotaPool.cancel()
                    }
                    return err
                }
                if sq < size {
                    size = sq
                }
                if tq < size {
                    size = tq
                }
                p = r.Next(size)
                ps := len(p)
                if ps < sq {
                    // 返回stream预留超额的配额数量
                    s.sendQuotaPool.add(sq - ps)
                }
                if ps < tq {
                    // 返回connection预留超额的配额数量
                    t.sendQuotaPool.add(tq - ps)
                }
            }
            var (
                endStream  bool
                forceFlush bool
            )
            //判断是否为最后一帧l
            if opts.Last && r.Len() == 0 {
                endStream = true
            }
            // 表明这将有一个writer将要去写data frame
            t.framer.adjustNumWriters(1)
            // 释放t.writableChan上加的锁,获得在该transport上写的权利,确保只有一个调用者可以调用t.framer.writeData()函数。
            if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil {
                if _, ok := err.(StreamError); ok || err == io.EOF {
                    // 释放connection上预留的配额数量
                    t.sendQuotaPool.add(len(p))
                }
                if t.framer.adjustNumWriters(-1) == 0 {
                    // 如果该Writer是这一批的最后一个有责任去刷新http2.frames的缓存区
                    //将刷新的请求排入一个队列而不是直接刷新合一避免和其他的Writer或者刷新请求的竞争
                    t.controlBuf.put(&flushIO{})
                }
                return err
            }
            select {
            case <-s.ctx.Done():
                t.sendQuotaPool.add(len(p))
                if t.framer.adjustNumWriters(-1) == 0 {
                    t.controlBuf.put(&flushIO{})
                }
               //再次为该transport加锁
                t.writableChan <- 0
                return ContextErr(s.ctx.Err())
            default:
            }
            if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 {
                // 强制刷新因为这是grpc message的最后一个数据帧
                //对于调用者来说此刻仅仅只有一个writer
                forceFlush = true
            }
            //如果t.framer.writeData失败,所有等待处理的stream将会在http2Clinet.Close()函数中进行处理,此处不必显示调用CloseStream()
          //writeData()不会并发被调用,确保server端收到的frame不会乱序(不会出现dataframe早于headerframe先到)
            if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil {
              //writeData()增加二进制帧的头部,函数实现-net/http2/frame.go
                t.notifyError(err)
                return connectionErrorf(true, err, "transport: %v", err)
            }
            if t.framer.adjustNumWriters(-1) == 0 {
                t.framer.flushWrite()
            }
          //再次为该transport加锁
            t.writableChan <- 0
            if r.Len() == 0 {
                break
            }
        }
        if !opts.Last {
            return nil
        }
        s.mu.Lock()
        if s.state != streamDone {
          //更新stream的状态
            s.state = streamWriteDone
        }
        s.mu.Unlock()
        return nil
    }
    //以下代码是分析grpc-server接收client的请求后内部的处理流程
    ---------------------------------------grpc-server代码------------------------------------------
    //serve函数在net.Listener接收客户端的连接,创建一个新的ServerTransport和service goroutine为每个连接,服务goroutine读取gRPC请求,然后调用server中注册的函数。
    func (s *Server) Serve(lis net.Listener) error {
        
        s.lis[lis] = true
    
        for {
            rawConn, err := lis.Accept()
            if err != nil {
                s.mu.Lock()
                s.printf("done serving; Accept = %v", err)
                s.mu.Unlock()
                return err
            }
            //开始一个单独的goroutine处理client的连接-rawConn
            //继续for循环等待其他client的到来
            go s.handleRawConn(rawConn)
        }
    }
    //handleRawConn运行在独立的goroutine,并且处理已经接收连接但未执行任何I/O操作的连接
    func (s *Server) handleRawConn(rawConn net.Conn) {
        conn, authInfo, err := s.useTransportAuthenticator(rawConn)
        if err != credentials.ErrConnDispatched {
                rawConn.Close()
            }
            return
        }
        if s.opts.useHandlerImpl {
            s.serveUsingHandler(conn)
        } else {
            s.serveNewHTTP2Transport(conn, authInfo)
        }
    }
    //serveNewHTTP2Transport建立一个新的HTTP/2 tranport并且为在该transport上的流提供服务
    func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
        //调用transport/http2_server.go
        st, err := transport.NewServerTransport("http2", c, 2, authInfo)
        if !s.addConn(st) {
            st.Close()
            return
        }
       //在transport上接收client发送stream并进行处理的函数
        s.serveStreams(st)
    }
    
    func (s *Server) serveStreams(st transport.ServerTransport) {
        defer s.removeConn(st)
        defer st.Close()
        var wg sync.WaitGroup
       //transport.ServerTranport下的st.HandleStreams处理client发送的stream
        st.HandleStreams(func(stream *transport.Stream) {
            wg.Add(1)
            go func() {
                defer wg.Done()
                s.handleStream(st, stream, s.traceInfo(st, stream))
            }()
        })
        wg.Wait()
    }
    ----------------------------transport/http2Server.HanleStreams()分析----------------------------
    func (t *http2Server) HandleStreams(handle func(*Stream)) {
        // 检查client 发送的preface是否合法
        preface := make([]byte, len(clientPreface))
        if _, err := io.ReadFull(t.conn, preface); err != nil {
            grpclog.Printf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
            t.Close()
            return
        }
        if !bytes.Equal(preface, clientPreface) {
            grpclog.Printf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
            t.Close()
            return
        }
    
        frame, err := t.framer.readFrame()
        if err == io.EOF || err == io.ErrUnexpectedEOF {
            t.Close()
            return
        }
        if err != nil {
            grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
            t.Close()
            return
        }
        //读取client发送的SettingFrame
        sf, ok := frame.(*http2.SettingsFrame)
        if !ok {
            grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
            t.Close()
            return
        }
        //根据SettingFrame的内容进行设置
        t.handleSettings(sf)
        //读取client发送的request内容
        for {
            frame, err := t.framer.readFrame()
            if err != nil {
                if se, ok := err.(http2.StreamError); ok {
                    t.mu.Lock()
                    s := t.activeStreams[se.StreamID]
                    t.mu.Unlock()
                    if s != nil {
                        t.closeStream(s)
                    }
                    t.controlBuf.put(&resetStream{se.StreamID, se.Code})
                    continue
                }
                if err == io.EOF || err == io.ErrUnexpectedEOF {
                    t.Close()
                    return
                }
                grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
                t.Close()
                return
            }
            switch frame := frame.(type) {
            case *http2.MetaHeadersFrame:
                //t.operateHeaders函数解码headers内容,并将传输该frame的stream进行记录
                //函数实现包括根据stream携带的callHdr信息,如何路由到grpc.Server中注册server具体实现method的过程
                //函数实现-transport/http2_server.go operateHeader()函数
                if t.operateHeaders(frame, handle) {
                    t.Close()
                    break
                }
            case *http2.DataFrame:
                t.handleData(frame)
            case *http2.RSTStreamFrame:
                t.handleRSTStream(frame)
            case *http2.SettingsFrame:
                t.handleSettings(frame)
            case *http2.PingFrame:
                t.handlePing(frame)
            case *http2.WindowUpdateFrame:
                t.handleWindowUpdate(frame)
            case *http2.GoAwayFrame:
            default:
                grpclog.Printf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
            }
        }
    }
    
    func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) {
        buf := newRecvBuffer()
        //保存client传输的stream信息
        s := &Stream{
            id:  frame.Header().StreamID,
            st:  t,
            buf: buf,
            fc:  &inFlow{limit: initialWindowSize},
        }
    
        var state decodeState
        for _, hf := range frame.Fields {
            state.processHeaderField(hf)
        }
        if err := state.err; err != nil {
            if se, ok := err.(StreamError); ok {
                t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
            }
            return
        }
    
        if frame.StreamEnded() {
            s.state = streamReadDone
        }
        s.recvCompress = state.encoding
        if state.timeoutSet {
            s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout)
        } else {
            s.ctx, s.cancel = context.WithCancel(context.TODO())
        }
      
        if uint32(len(t.activeStreams)) >= t.maxStreams {
            t.mu.Unlock()
            t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
            return
        }
        //对stream的合法性进行检查
        if s.id%2 != 1 || s.id <= t.maxStreamID {
            t.mu.Unlock()
            grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", s.id)
            return true
        }
        t.maxStreamID = s.id
        s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
        t.activeStreams[s.id] = s
        t.mu.Unlock()
        s.windowHandler = func(n int) {
            t.updateWindow(s, uint32(n))
        }
        //调用server.go serveStreams()传入的handle去处理server端接收的stream
        //handle()会调用server.go handleStream()路由到server端真正实现的函数
        handle(s)
        return
    }
    //handleData处理server端接收到数据帧
    func (t *http2Server) handleData(f *http2.DataFrame) {
        size := len(f.Data())
        //检查transport的流控
        if err := t.fc.onData(uint32(size)); err != nil {
            grpclog.Printf("transport: http2Server %v", err)
            t.Close()
            return
        }
        s, ok := t.getStream(f)
        if !ok {
            if w := t.fc.onRead(uint32(size)); w > 0 {
                t.controlBuf.put(&windowUpdate{0, w})
            }
            return
        }
        if size > 0 {
            s.mu.Lock()
            if s.state == streamDone {
                s.mu.Unlock()
                //检查stream的流控
                if w := t.fc.onRead(uint32(size)); w > 0 {
                    t.controlBuf.put(&windowUpdate{0, w})
                }
                return
            }
            if err := s.fc.onData(uint32(size)); err != nil {
                s.mu.Unlock()
                t.closeStream(s)
                t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
                return
            }
            s.mu.Unlock()
            data := make([]byte, size)
            copy(data, f.Data())
            s.write(recvMsg{data: data})
        }
        if f.Header().Flags.Has(http2.FlagDataEndStream) {
            s.mu.Lock()
            if s.state != streamDone {
                s.state = streamReadDone
            }
            s.mu.Unlock()
            s.write(recvMsg{err: io.EOF})
        }
    }
    

    以上源码分析一次gRPC调用,从client端如何发送请求到grpc.server端如何路由到server端注册函数的所有过程。

    问题总结:

    1.grpc的http/2的stream流是如何变化的?

    答:unary模式的stream的创建、删除都是由gRPC控制的,剩下的三种模式是将stream的很多操作暴露给用户层,由用户自行控制,但sendRequset和recvResponse的流程和unary模式处理相同。笔者测试发现grpc用到的都是client端的stream,server端的stream在gRPC中并未使用。client端发起的stream都是基数开始的,并且最大值为2^31-1,如果client的streamID超过限制,server端会断开与client的连接。测试结果如下:

    //2^31的最大取值2147483648
    client stream id 2147483649
    2017/08/04 10:44:17 transport: http2Client.notifyError got notified that the client transport was broken invalid stream ID.
    2017/08/04 10:44:17 &{0xc4201787e0}.RouteChat(_) = _, rpc error: code = 13 desc = transport: invalid stream ID
    exit status 1
    

    相关文章

      网友评论

        本文标题:gRPC-transport包源码分析

        本文链接:https://www.haomeiwen.com/subject/xgdblxtx.html