美文网首页GO
grpc-go源码阅读(1) grpc server的启动

grpc-go源码阅读(1) grpc server的启动

作者: one_zheng | 来源:发表于2018-06-07 18:06 被阅读474次

    正常启动一个grpcServer demo如下:

    func main() {
        // 监听端口
        lis, err := net.Listen("tcp", port)
        if err != nil {
            log.Fatalf("failed to listen: %v", err)
        }
        s := grpc.NewServer()
        pb.RegisterGreeterServer(s, &server{})
        // Register reflection service on gRPC server.
        reflection.Register(s)
        if err := s.Serve(lis); err != nil {
            log.Fatalf("failed to serve: %v", err)
        }
    }
    

    1.grpc.NewServer

    // NewServer creates a gRPC server which has no service registered and has not
    // started to accept requests yet.
    func NewServer(opt ...ServerOption) *Server {
        var opts options
        opts.maxMsgSize = defaultMaxMsgSize // 默认4MB
        for _, o := range opt {
            o(&opts)
        }
        if opts.codec == nil {
            // Set the default codec.
            opts.codec = protoCodec{}
        }
        s := &Server{
            lis:   make(map[net.Listener]bool),
            opts:  opts,
            conns: make(map[io.Closer]bool),
            m:     make(map[string]*service),
        }
        s.cv = sync.NewCond(&s.mu) //  cond实例,可以唤醒等待mu锁的goroutine
        s.ctx, s.cancel = context.WithCancel(context.Background())
        if EnableTracing {   //  controls whether to trace RPCs using the golang.org/x/net/trace package
            _, file, line, _ := runtime.Caller(1)   //  runtime库的Caller函数,可以返回运行时正在执行的文件名和行号
            s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
        }
        return s
    }
    
    
    • ServerOption参数(Server的配置选项),启动时可不传,结构如下:
    type options struct {
        creds                credentials.TransportCredentials  // cred证书
        codec                Codec       // Codec defines the interface gRPC uses to encode and decode messages,默认为protobuf
        cp                   Compressor  // Compressor defines the interface gRPC uses to compress a message.
        dc                   Decompressor  //Decompressor defines the interface gRPC uses to decompress a message.
    
        maxMsgSize           int  // the max message size in bytes the server can receive,If this is not set, gRPC uses the default 4MB.(默认4MB)
        unaryInt             UnaryServerInterceptor  //provides a hook to intercept the execution of a unary RPC on the server(拦截器)
        streamInt            StreamServerInterceptor  //provides a hook to intercept the execution of a streaming RPC on the server(拦截器)
        inTapHandle          tap.ServerInHandle  //sets the tap handle for all the server transport to be created
        statsHandler         stats.Handler
        maxConcurrentStreams uint32 //  一个连接中最大并发Stream数
        useHandlerImpl       bool   // use http.Handler-based server 
    }
    注释来源:https://godoc.org/google.golang.org/grpc#UnaryInterceptor
    

    credentials.TransportCredentials 是grpc提供认证的证书

    RPC 默认提供了两种认证方式:

    基于SSL/TLS认证方式

    • 远程调用认证方式

    • 两种方式可以混合使用

    具体详情参考:https://segmentfault.com/a/1190000007933303

    通过grpc.Creds(creds)设置该参数

    options.useHandlerImpl 是控制处理RawConn的主要判别flag

    g-.png

    UnaryServerInterceptor/StreamServerInterceptor

    详情介绍:https://github.com/grpc-ecosystem/go-grpc-middleware / https://www.colabug.com/2496579.html

    2.RegisterService

    // RegisterService register a service and its implementation to the gRPC
    // server. Called from the IDL generated code. This must be called before
    // invoking Serve.
    func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
        ht := reflect.TypeOf(sd.HandlerType).Elem()    // 获取sd.HandlerType类型
        st := reflect.TypeOf(ss)                                      // 获取ss类型
        if !st.Implements(ht) {  //ss接口必须继承sd的处理类型接口
            grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
        }
        s.register(sd, ss)
    }
    
    func (s *Server) register(sd *ServiceDesc, ss interface{}) {
        s.mu.Lock()
        defer s.mu.Unlock()
        s.printf("RegisterService(%q)", sd.ServiceName)  // proto的service struct名称
        if _, ok := s.m[sd.ServiceName]; ok {  // serivce已注册
            grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
        }
        srv := &service{  // 构造service类,包括该service的information及method
            server: ss,
            md:     make(map[string]*MethodDesc),
            sd:     make(map[string]*StreamDesc),
            mdata:  sd.Metadata,
        }
        for i := range sd.Methods {  // 注册pb中service方法对应的handle
            d := &sd.Methods[i]
            srv.md[d.MethodName] = d
        }
        for i := range sd.Streams {  // 注册stream对应的handle
            d := &sd.Streams[i]
            srv.sd[d.StreamName] = d
        }
        s.m[sd.ServiceName] = srv  // 将service加入grpc server
    }
    

    3.Serve(lis net.Listener)

    net.Error接口
    //net.Error 
    type Error interface {
        error
        Timeout() bool   // Is the error a timeout?
        Temporary() bool // Is the error temporary?
    }
    
    // Serve accepts incoming connections on the listener lis, creating a new
    // ServerTransport and service goroutine for each. The service goroutines
    // read gRPC requests and then call the registered handlers to reply to them.
    // Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
    // this method returns.
    // Serve always returns non-nil error.
    func (s *Server) Serve(lis net.Listener) error {
        s.mu.Lock()
        s.printf("serving")
        if s.lis == nil {
            s.mu.Unlock()
            lis.Close()
            return ErrServerStopped
        }
        s.lis[lis] = true   // 设置grpcServer的lis为true
        s.mu.Unlock()
        defer func() {  // 退出函数时关闭lis,删除grpcServer的lis
            s.mu.Lock()
            if s.lis != nil && s.lis[lis] {
                lis.Close()
                delete(s.lis, lis)
            }
            s.mu.Unlock()
        }()
    
        var tempDelay time.Duration // how long to sleep on accept failure
    
        for {
            rawConn, err := lis.Accept()
            if err != nil {
                if ne, ok := err.(interface {
                    Temporary() bool
                }); ok && ne.Temporary() {
                    if tempDelay == 0 {
                        tempDelay = 5 * time.Millisecond   // 第一次接受失败retry,延迟5毫秒
                    } else {
                        tempDelay *= 2                              // 之后每次retry递增1倍时间
                    }
                    if max := 1 * time.Second; tempDelay > max {  // 最大等待1秒
                        tempDelay = max
                    }
                    s.mu.Lock()
                    s.printf("Accept error: %v; retrying in %v", err, tempDelay)
                    s.mu.Unlock()
                    select {  //  等待时间超时或context cancle时才继续往下
                    case <-time.After(tempDelay):  
                    case <-s.ctx.Done():
                    }
                    continue
                }
                s.mu.Lock()
                s.printf("done serving; Accept = %v", err)
                s.mu.Unlock()
                return err
            }
            tempDelay = 0
            // Start a new goroutine to deal with rawConn
            // so we don't stall this Accept loop goroutine.
            go s.handleRawConn(rawConn)  // 处理rawConn并不会导致grpc停止accept
        }
    }
    
    

    4.handleRawConn

    // handleRawConn is run in its own goroutine and handles a just-accepted
    // connection that has not had any I/O performed on it yet.
    func (s *Server) handleRawConn(rawConn net.Conn) {
        conn, authInfo, err := s.useTransportAuthenticator(rawConn)
        if err != nil {
            s.mu.Lock()
            s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
            s.mu.Unlock()
            grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
            // If serverHandShake returns ErrConnDispatched, keep rawConn open.
            if err != credentials.ErrConnDispatched {  // 如果是认证error,保持rawConn open
                rawConn.Close()
            }
            return
        }
    
        s.mu.Lock()
        if s.conns == nil {
            s.mu.Unlock()
            conn.Close()
            return
        }
        s.mu.Unlock()
    
        if s.opts.useHandlerImpl {    // 选择不同包的的http2服务,默认调用serveUsingHandler
            s.serveUsingHandler(conn)
        } else {
            s.serveHTTP2Transport(conn, authInfo)
        }
    }
    
    // serveUsingHandler is called from handleRawConn when s is configured
    // to handle requests via the http.Handler interface. It sets up a
    // net/http.Server to handle the just-accepted conn. The http.Server
    // is configured to route all incoming requests (all HTTP/2 streams)
    // to ServeHTTP, which creates a new ServerTransport for each stream.
    // serveUsingHandler blocks until conn closes.
    //
    // This codepath is only used when Server.TestingUseHandlerImpl has
    // been configured. This lets the end2end tests exercise the ServeHTTP
    // method as one of the environment types.
    //
    // conn is the *tls.Conn that's already been authenticated.
    func (s *Server) serveUsingHandler(conn net.Conn) {
        if !s.addConn(conn) {  // conn注册到grpc server中
            conn.Close()
            return
        }
        defer s.removeConn(conn)
        h2s := &http2.Server{
            MaxConcurrentStreams: s.opts.maxConcurrentStreams,  //一个连接中最大并发Stream数
        }
        h2s.ServeConn(conn, &http2.ServeConnOpts{
            Handler: s,
        })
    }
    
    

    相关文章

      网友评论

        本文标题:grpc-go源码阅读(1) grpc server的启动

        本文链接:https://www.haomeiwen.com/subject/wxzgsftx.html