美文网首页
gRPC 源码详解(一)配置化的结构体

gRPC 源码详解(一)配置化的结构体

作者: 阿碎Abser | 来源:发表于2019-07-25 21:54 被阅读0次

    grpc 源码结构详解

    <a name="KMqFG"></a>

    DialOptions

    DialOptions 是最重要的一环,负责配置每一次 rpc 请求的时候的一应选择。
    <a name="frTx3"></a>

    结构

    先来看看这个的结构<br />链接

    // dialOptions configure a Dial call. dialOptions are set by the DialOption
    // values passed to Dial.
    type dialOptions struct {
        unaryInt  UnaryClientInterceptor
        streamInt StreamClientInterceptor
    
        chainUnaryInts  []UnaryClientInterceptor
        chainStreamInts []StreamClientInterceptor
    
        cp          Compressor
        dc          Decompressor
        bs          backoff.Strategy
        block       bool
        insecure    bool
        timeout     time.Duration
        scChan      <-chan ServiceConfig
        authority   string
        copts       transport.ConnectOptions
        callOptions []CallOption
        // This is used by v1 balancer dial option WithBalancer to support v1
        // balancer, and also by WithBalancerName dial option.
        balancerBuilder balancer.Builder
        // This is to support grpclb.
        resolverBuilder             resolver.Builder
        channelzParentID            int64
        disableServiceConfig        bool
        disableRetry                bool
        disableHealthCheck          bool
        healthCheckFunc             internal.HealthChecker
        minConnectTimeout           func() time.Duration
        defaultServiceConfig        *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
        defaultServiceConfigRawJSON *string
    }
    

    由于命名非常规范,加上注释很容易看懂每一个 field 配置的哪一条属性。如果掠过看的 大概有 压缩解压器,超时阻塞设置,认证安全转发,负载均衡,服务持久化的信息存储 ,配置,心跳检测等。

    其一应函数方法都是设置 其中字段的。
    <a name="Dutuq"></a>

    如何设置

    这里是 grpc 设计较好的地方,通过函数设置,同时设有生成函数的函数。什么意思呢?首先结合图来理解,这也是整个 grpc 设置的精华部分

    image.png
    <br />这里的意思是 , DialOptions 是一个导出接口,实现函数是 apply 同时接受参数 dialOptions 来修改它。<br />而实际上,是使用 newFuncDialOption 函数包装一个 修改 dialOptions 的方法给 funcDialOption 结构体,在实际 Dial 调用的时候 是使用闭包 调用 funcDialOption 结构体的 apply 方法。<br />可以在这里看一下 Dial 方法的源码(Dial 调用的是 DialContext<br />起作用的就是 opt.apply()
    func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
        cc := &ClientConn{
            target:            target,
            csMgr:             &connectivityStateManager{},
            conns:             make(map[*addrConn]struct{}),
            dopts:             defaultDialOptions(),
            blockingpicker:    newPickerWrapper(),
            czData:            new(channelzData),
            firstResolveEvent: grpcsync.NewEvent(),
        }
        ···
        for _, opt := range opts {
            opt.apply(&cc.dopts)
        }
        ···
    }
    

    这里的 options 可以说是 client 发起 rpc 请求的核心中转站。<br />另一个重要的接口,同时也集中在 dialOptions 结构体中初始化处理的是 <br />callOptions []CallOption

    <a name="O3rgQ"></a>

    CallOption

    CallOption 是一个接口,定义在 rpc_util 包内
    <a name="yiHEU"></a>

    结构

    // CallOption configures a Call before it starts or extracts information from
    // a Call after it completes.
    type CallOption interface {
        // before is called before the call is sent to any server.  If before
        // returns a non-nil error, the RPC fails with that error.
        before(*callInfo) error
    
        // after is called after the call has completed.  after cannot return an
        // error, so any failures should be reported via output parameters.
        after(*callInfo)
    }
    

    操作的是 callInfo 结构里的数据,其被包含在 dialOptions 结构体中,<br />即每一次 dial 的时候进行调用。

    <a name="HaUUg"></a>

    callInfo

    同时它自身定义很有意思,操作的是 callInfo 结构体

    // callInfo contains all related configuration and information about an RPC.
    type callInfo struct {
        compressorType        string
        failFast              bool
        stream                ClientStream
        maxReceiveMessageSize *int
        maxSendMessageSize    *int
        creds                 credentials.PerRPCCredentials
        contentSubtype        string
        codec                 baseCodec
        maxRetryRPCBufferSize int
    }
    

    可以看到 callInfo 中字段用来表示 单次调用中独有的自定义选项如 压缩,流控,认证,编解码器等。

    <a name="m6Q2v"></a>

    一个实现

    简单看一个 CallOption 接口的实现

    // Header returns a CallOptions that retrieves the header metadata
    // for a unary RPC.
    func Header(md *metadata.MD) CallOption {
        return HeaderCallOption{HeaderAddr: md}
    }
    
    // HeaderCallOption is a CallOption for collecting response header metadata.
    // The metadata field will be populated *after* the RPC completes.
    // This is an EXPERIMENTAL API.
    type HeaderCallOption struct {
        HeaderAddr *metadata.MD
    }
    
    func (o HeaderCallOption) before(c *callInfo) error { return nil }
    func (o HeaderCallOption) after(c *callInfo) {
        if c.stream != nil {
            *o.HeaderAddr, _ = c.stream.Header()
        }
    }
    

    重点看到,实际操作是在 before 和 after 方法中执行,它们会在 Client 发起请求的时候自动执行,顾名思义,一个在调用前执行,一个在调用后执行。

    <a name="Io1nW"></a>

    实现注意

    这里可以看出,这里也是通过函数返回一个拥有这两个方法的结构体,注意这一个设计,可以作为你自己的 Option 设计的时候的参考。

    <a name="k6iNP"></a>

    两种方法

    有两种方法让 Client 接受你的 CallOption 设置

    1. 在 Client 使用方法的时候直接作为 参数传递,将刚才所说的函数-返回一个实现了 CallOption 接口的结构体。
    2. 在 生成 Client 的时候就传递设置。具体如下
    3. 通过 dialOptions.go 中的 函数 grpc.WithDefaultCallOptions()
    4. 这个函数会将 CallOption 设置到 dialOptions 中的字段 []CallOption 中。
    // WithDefaultCallOptions returns a DialOption which sets the default
    // CallOptions for calls over the connection.
    func WithDefaultCallOptions(cos ...CallOption) DialOption {
        return newFuncDialOption(func(o *dialOptions) {
            o.callOptions = append(o.callOptions, cos...)
        })
    }
    

    有没有感觉有点不好理解?给你们一个实例

    1. response, err := myclient.MyCall(ctx, request, grpc.CallContentSubtype("mycodec"))
    2. myclient := grpc.Dial(ctx, target, grpc.WithDefaultCallOptions(grpc.CallContentSubtype("mycodec")))

    这里假设 我们设置了一个 mycodec 的译码器。马上下面解释它的设计。

    <a name="Inqny"></a>

    值得注意的是, 我好像只提到了在 Client 调用时设置,callOption 只在客户端设置的情况是不是让大家感到困惑。<br />实际上 gRPC server 端会自动检测 callOption 的设置,并检测自己是否支持此项选择,如果不支持则会返回失败。也就是说,在 Server 端注册的所有 Codec 译码器之后,Client 直接使用相应的设置就好了。

    <a name="r8ZNv"></a>

    Codec

    在 gRPC 中 Codec 有两个接口定义,一个是 baseCodec 包含正常的 Marshal 和 Unmarshal 方法,另一个是拥有名字的 Codec 定义在 encoding 包内,这是由于在注册 registry 的时候会使用到这个方法。
    <a name="mHBlU"></a>

    接口

    type Codec interface {
        // Marshal returns the wire format of v.
        Marshal(v interface{}) ([]byte, error)
        // Unmarshal parses the wire format into v.
        Unmarshal(data []byte, v interface{}) error
        // String returns the name of the Codec implementation.  This is unused by
        // gRPC.
        String() string
    }
    

    就是这个方法

    // RegisterCodec registers the provided Codec for use with all gRPC clients and
    // servers.
    //
    // The Codec will be stored and looked up by result of its Name() method, which
    // should match the content-subtype of the encoding handled by the Codec.  This
    // is case-insensitive, and is stored and looked up as lowercase.  If the
    // result of calling Name() is an empty string, RegisterCodec will panic. See
    // Content-Type on
    // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
    // more details.
    //
    // NOTE: this function must only be called during initialization time (i.e. in
    // an init() function), and is not thread-safe.  If multiple Compressors are
    // registered with the same name, the one registered last will take effect.
    func RegisterCodec(codec Codec) {
        if codec == nil {
            panic("cannot register a nil Codec")
        }
        if codec.Name() == "" {
            panic("cannot register Codec with empty string result for Name()")
        }
        contentSubtype := strings.ToLower(codec.Name())
        registeredCodecs[contentSubtype] = codec
    }
    

    <a name="7otw4"></a>

    Compressor

    同时 encoding 包中还定义了 Compressor 接口,参照 Codec 理解即可。

    // Compressor is used for compressing and decompressing when sending or
    // receiving messages.
    type Compressor interface {
        // Compress writes the data written to wc to w after compressing it.  If an
        // error occurs while initializing the compressor, that error is returned
        // instead.
        Compress(w io.Writer) (io.WriteCloser, error)
        // Decompress reads data from r, decompresses it, and provides the
        // uncompressed data via the returned io.Reader.  If an error occurs while
        // initializing the decompressor, that error is returned instead.
        Decompress(r io.Reader) (io.Reader, error)
        // Name is the name of the compression codec and is used to set the content
        // coding header.  The result must be static; the result cannot change
        // between calls.
        Name() string
    }
    
    

    <a name="4enZe"></a>

    MetaData

    这个包对应 context 中的 Value field 也就是 key-value 形式的存储

    在其他包中简写是 MD
    <a name="2oNn3"></a>

    结构

    type MD map[string][]string
    

    <a name="kUMPq"></a>

    函数

    实现了完善的存储功能,从单一读写到批量(采用 pair 模式,...string 作为参数,len(string)%2==1 时会报错,由于会有孤立的没有配对的元信息。

    另外几个函数是实现了从 context 中的读取和写入(这里的写入是 使用 context.WithValue 方法,即生成 parent context 的 copy。

    <a name="dpAWo"></a>

    注意⚠️

    • 值得注意的是,在 MetaData 结构体中, value 的结构是 []string 。
    • 同时 key 不可以以 "grpc-" 开头,这是因为在 grpc 的 internal 包中已经保留了。
    • 更为重要的是 在 context 中的读取方式,其实是 MetaData 结构对应的是 context Value 中的 value 值,而 key 值设为 一个空结构体同时区分输入输入
      • type mdIncomingKey struct{}<br />
      • type mdOutgoingKey struct{}

    <a name="MPZam"></a>

    相关文章

      网友评论

          本文标题:gRPC 源码详解(一)配置化的结构体

          本文链接:https://www.haomeiwen.com/subject/xrokrctx.html