美文网首页
etcd的使用

etcd的使用

作者: yongfutian | 来源:发表于2019-03-04 12:26 被阅读0次

    etcd的使用

    一、安装配置

    1、服务端
    2、客户端

    二、etcd的基础知识、原理分析

    三、etcd的API使用

    1.1、连接

    想要访问etcd,我们必须实例化一个client:

        cli,err := clientv3.New(clientv3.Config{
            Endpoints:[]string{"localhost:2379"},
            DialTimeout: 5 * time.Second,
        })
    

    这里需要传入的两个参数:

    • Endpoints:etcd的多个节点服务地址,因为我是单点本机测试,所以只传1个。
    • DialTimeout:创建client的首次连接超时,这里传了5秒,如果5秒都没有连接成功就会返回err;值得注意的是,一旦client创建成功,我们就不用再关心后续底层连接的状态了,client内部会重连。

    接着我们来看一下client的定义:

    type Client struct {
        Cluster
        KV
        Lease
        Watcher
        Auth
        Maintenance
    
        // Username is a user name for authentication.
        Username string
        // Password is a password for authentication.
        Password string
    }
    

    注意,这里显示的都是可导出的模块结构字段,代表了客户端能够使用的几大核心模块,其具体功能介绍如下:

    • Cluster:向集群里增加etcd服务端节点之类,属于管理员操作。
    • KV:我们主要使用的功能,即操作K-V。
    • Lease:租约相关操作,比如申请一个TTL=10秒的租约。
    • Watcher:观察订阅,从而监听最新的数据变化。
    • Auth:管理etcd的用户和权限,属于管理员操作。
    • Maintenance:维护etcd,比如主动迁移etcd的leader节点,属于管理员操作。

    下面我们来分别具体介绍一下这几大核心模块:

    1.2、k-v存取

    1.2.1. kv对象的实例获取

    kv  := clientev3.NewKV(client)
    

    我们来看一下这个kv对象具体长什么样子:

    type KV interface {
        // Put puts a key-value pair into etcd.
        // Note that key,value can be plain bytes array and string is
        // an immutable representation of that bytes array.
        // To get a string of bytes, do string([]byte{0x10, 0x20}).
        Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
    
        // Get retrieves keys.
        // By default, Get will return the value for "key", if any.
        // When passed WithRange(end), Get will return the keys in the range [key, end).
        // When passed WithFromKey(), Get returns keys greater than or equal to key.
        // When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
        // if the required revision is compacted, the request will fail with ErrCompacted .
        // When passed WithLimit(limit), the number of returned keys is bounded by limit.
        // When passed WithSort(), the keys will be sorted.
        Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
    
        // Delete deletes a key, or optionally using WithRange(end), [key, end).
        Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
    
        // Compact compacts etcd KV history before the given rev.
        Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
    
        // Do applies a single Op on KV without a transaction.
        // Do is useful when creating arbitrary operations to be issued at a
        // later time; the user can range over the operations, calling Do to
        // execute them. Get/Put/Delete, on the other hand, are best suited
        // for when the operation should be issued at the time of declaration.
        Do(ctx context.Context, op Op) (OpResponse, error)
    
        // Txn creates a transaction.
        Txn(ctx context.Context) Txn
    }
    

    从KV对象的定义我们可知,它就是一个接口对象,包含几个主要的kv操作方法

    1.2.2. k-v 存储 put
    案例:

    putResp, err := kv.Put(context.TODO(),"/data-dir/example", "hello-world!")
    

    put方法的原型如下:

    Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
    

    参数介绍:
    ctx: Context包对象,是用来跟踪上下文的,列如超时控制
    key: 存储对象的key
    val: 存储对象的value
    opts:  可变参数,额外选项

    1.2.3 k-v查询 get
    现在可以对存储的数据进行取值了:

    getResp, err := kv.Get(context.TODO(), "/data-dir/example")
    

    get方法原型如下:

    // By default, Get will return the value for "key", if any.
    // When passed WithRange(end), Get will return the keys in the range [key, end).
    // When passed WithFromKey(), Get returns keys greater than or equal to key.
    // When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
    // if the required revision is compacted, the request will fail with ErrCompacted .
    // When passed WithLimit(limit), the number of returned keys is bounded by limit.
    // When passed WithSort(), the keys will be sorted.
    Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
    

    从以上数据的存储和取值,我们知道put 返回PutResponse, get返回GetResponse,注意:不同的KV操作对应不同的response结构,定义如下:

    type (
        CompactResponse pb.CompactionResponse
        PutResponse     pb.PutResponse
        GetResponse     pb.RangeResponse
        DeleteResponse  pb.DeleteRangeResponse
        TxnResponse     pb.TxnResponse
    )
    

    我们分别来看一看PutResponse和GetResponse映射的RangeResponse结构的定义:

    type PutResponse struct {
        Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
        // if prev_kv is set in the request, the previous key-value pair will be returned.
        PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
    }
    //Header里保存的主要是本次更新的revision信息
    
    
    type RangeResponse struct {
        Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
        // kvs is the list of key-value pairs matched by the range request.
        // kvs is empty when count is requested.
        Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"`
        // more indicates if there are more keys to return in the requested range.
        More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`
        // count is set to the number of keys within the range when requested.
        Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
    }
    

    Kvs字段,保存了本次Get查询到的所有k-v对,我们继续看一下mvccpb.KeyValue对象长什么样子:

    type KeyValue struct {
        // key is the key in bytes. An empty key is not allowed.
        Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
        // create_revision is the revision of last creation on this key.
        CreateRevision int64 `protobuf:"varint,2,opt,name=create_revision,json=createRevision,proto3" json:"create_revision,omitempty"`
        // mod_revision is the revision of last modification on this key.
        ModRevision int64 `protobuf:"varint,3,opt,name=mod_revision,json=modRevision,proto3" json:"mod_revision,omitempty"`
        // version is the version of the key. A deletion resets
        // the version to zero and any modification of the key
        // increases its version.
        Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"`
        // value is the value held by the key, in bytes.
        Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"`
        // lease is the ID of the lease that attached to key.
        // When the attached lease expires, the key will be deleted.
        // If lease is 0, then no lease is attached to the key.
        Lease int64 `protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"`
    }
    

    至于RangeResponse.More和Count,当我们使用withLimit()选项进行Get时会发挥作用,相当于翻页查询。
    接下来,我们通过一个特别的Get选项,获取/data-dir/example目录下的所有孩子:

    rangeResp, err := kv.Get(context.TODO(), "/data-dir/example", clientv3.WithPrefix())
    

    WithPrefix()是指查找以/data-dir/example为前缀的所有key,因此可以模拟出查找子目录的效果。

    我们知道etcd是一个有序的k-v存储,因此/data-dir/example为前缀的key总是顺序排列在一起。

    withPrefix实际上会转化为范围查询,它根据前缀/data-dir/example生成了一个key range,[“/test/”, “/test0”),为什么呢?因为比/大的字符是’0’,所以以/test0作为范围的末尾,就可以扫描到所有的/test/打头的key了。

    1.3、租约lease

    我们先来获取一个lease对象:

    lease := clientv3.NewLease(client)
    

    接着我们来看一下lease对象长什么样子:

    type Lease interface {
        // Grant creates a new lease.
        Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
     
        // Revoke revokes the given lease.
        Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
     
        // TimeToLive retrieves the lease information of the given lease ID.
        TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
     
        // Leases retrieves all leases.
        Leases(ctx context.Context) (*LeaseLeasesResponse, error)
     
        // KeepAlive keeps the given lease alive forever.
        KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
     
        // KeepAliveOnce renews the lease once. In most of the cases, KeepAlive
        // should be used instead of KeepAliveOnce.
        KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
     
        // Close releases all resources Lease keeps for efficient communication
        // with the etcd server.
        Close() error
    }
    

    Lease提供了几个功能:

    • Grant:分配一个租约。
    • Revoke:释放一个租约。
    • TimeToLive:获取剩余TTL时间。
    • Leases:列举所有etcd中的租约。
    • KeepAlive:自动定时的续约某个租约。
    • KeepAliveOnce:为某个租约续约一次。
    • Close:貌似是关闭当前客户端建立的所有租约。

    要想实现key自动过期,首先得创建一个租约,它有10秒的TTL:

    grantResp, err := lease.Grant(context.TODO(), 10)
    

    grantResp中主要使用到了ID,也就是租约ID:

    // LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
    type LeaseGrantResponse struct {
        *pb.ResponseHeader
        ID    LeaseID
        TTL   int64
        Error string
    }
    

    接下来,我们用这个租约来Put一个会自动过期的Key:

    kv.Put(context.TODO(), "/example/expireme", "lease-go", clientv3.WithLease(grantResp.ID))
    

    这里特别需要注意,有一种情况是在Put之前Lease已经过期了,那么这个Put操作会返回error,此时你需要重新分配Lease。

    当我们实现服务注册时,需要主动给Lease进行续约,这需要调用KeepAlive/KeepAliveOnce,你可以在一个循环中定时的调用:

    keepResp, err := lease.KeepAliveOnce(context.TODO(), grantResp.ID)
    // sleep一会...
    

    keepResp结构如下:

    // LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse.
    type LeaseKeepAliveResponse struct {
        *pb.ResponseHeader
        ID  LeaseID
        TTL int64
    }
    

    KeepAlive和Put一样,如果在执行之前Lease就已经过期了,那么需要重新分配Lease。Etcd并没有提供API来实现原子的Put with Lease。

    1.4 Op

    Op字面意思就是”操作”,Get和Put都属于Op,只是为了简化用户开发而开放的特殊API。

    // Do is useful when creating arbitrary operations to be issued at a
    // later time; the user can range over the operations, calling Do to
    // execute them. Get/Put/Delete, on the other hand, are best suited
    // for when the operation should be issued at the time of declaration.
    Do(ctx context.Context, op Op) (OpResponse, error)
    

    其参数Op是一个抽象的操作,可以是Put/Get/Delete…;而OpResponse是一个抽象的结果,可以是PutResponse/GetResponse…

    可以通过一些函数来分配Op:

    func OpDelete(key string, opts …OpOption) Op
    func OpGet(key string, opts …OpOption) Op
    func OpPut(key, val string, opts …OpOption) Op
    func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op
    

    其实和直接调用KV.Put,KV.GET没什么区别。请看如下案例:

    op1 := clientv3.OpPut("/hi", "hello", clientv3.WithPrevKV())
    opResp, err := kv.Do(context.TODO(), op1)
    

    这里设置一个key=/hi,value=hello,希望结果中返回覆盖之前的value。

    把这个op交给Do方法执行,返回的opResp结构如下:

    type OpResponse struct {
        put *PutResponse
        get *GetResponse
        del *DeleteResponse
        txn *TxnResponse
    }
    

    1.5、事务Tnx

    etcd中事务是原子执行的,只支持if … then … else …这种表达,能实现一些有意思的场景。
    首先,我们需要开启一个事务,这是通过KV对象的方法实现的:

    txn := kv.Txn(context.TODO())
    

    我写了如下的测试代码,Then和Else还比较好理解,If是比较陌生的。

    txnResp, err := txn.If(clientv3.Compare(clientv3.Value("/hi"), "=", "hello")).
            Then(clientv3.OpGet("/hi")).
            Else(clientv3.OpGet("/test/", clientv3.WithPrefix())).
            Commit()
    

    我们先看下Txn支持的方法:

    type Txn interface {
        // If takes a list of comparison. If all comparisons passed in succeed,
        // the operations passed into Then() will be executed. Or the operations
        // passed into Else() will be executed.
        If(cs ...Cmp) Txn
    
        // Then takes a list of operations. The Ops list will be executed, if the
        // comparisons passed in If() succeed.
        Then(ops ...Op) Txn
    
        // Else takes a list of operations. The Ops list will be executed, if the
        // comparisons passed in If() fail.
        Else(ops ...Op) Txn
    
        // Commit tries to commit the transaction.
        Commit() (*TxnResponse, error)
    }
    

    我们来看一下value方法:

    func Value(key string) Cmp {
        return Cmp{Key: []byte(key), Target: pb.Compare_VALUE}
    }
    

    这个Value(“/hi”)返回的Cmp表达了:”/hi这个key对应的value”。

    接下来,利用Compare函数来继续为”主语”增加描述,形成了一个完整条件语句,即”/hi这个key对应的value”必须等于”hello”。

    Compare函数实际上是对Value返回的Cmp对象进一步修饰,增加了”=”与”hello”两个描述信息:

    func Compare(cmp Cmp, result string, v interface{}) Cmp {
        var r pb.Compare_CompareResult
    
        switch result {
        case "=":
            r = pb.Compare_EQUAL
        case "!=":
            r = pb.Compare_NOT_EQUAL
        case ">":
            r = pb.Compare_GREATER
        case "<":
            r = pb.Compare_LESS
        default:
            panic("Unknown result op")
        }
    
        cmp.Result = r
        switch cmp.Target {
        case pb.Compare_VALUE:
            val, ok := v.(string)
            if !ok {
                panic("bad compare value")
            }
            cmp.TargetUnion = &pb.Compare_Value{Value: []byte(val)}
        case pb.Compare_VERSION:
            cmp.TargetUnion = &pb.Compare_Version{Version: mustInt64(v)}
        case pb.Compare_CREATE:
            cmp.TargetUnion = &pb.Compare_CreateRevision{CreateRevision: mustInt64(v)}
        case pb.Compare_MOD:
            cmp.TargetUnion = &pb.Compare_ModRevision{ModRevision: mustInt64(v)}
        case pb.Compare_LEASE:
            cmp.TargetUnion = &pb.Compare_Lease{Lease: mustInt64orLeaseID(v)}
        default:
            panic("Unknown compare type")
        }
        return cmp
    }
    

    Cmp可以用于描述”key=xxx的yyy属性,必须=、!=、<、>,kkk值”,比如:

    • key=xxx的value,必须!=,hello。
    • key=xxx的create版本号,必须=,11233。
    • key=xxx的lease id,必须=,12319231231238。

    经过Compare函数修饰的Cmp对象,内部包含了完整的条件信息,传递给If函数即可。

    类似于Value的函数用于指定yyy属性,有这么几个方法:

    • func CreateRevision(key string) Cmp:key=xxx的创建版本必须满足…
    • func LeaseValue(key string) Cmp:key=xxx的Lease ID必须满足…
    • func ModRevision(key string) Cmp:key=xxx的最后修改版本必须满足…
    • func Value(key string) Cmp:key=xxx的创建值必须满足…
    • func Version(key string) Cmp:key=xxx的累计更新次数必须满足…

    最后Commit提交整个Txn事务,我们需要判断txnResp获知If条件是否成立:

    if txnResp.Succeeded { // If = true
            fmt.Println("~~~", txnResp.Responses[0].GetResponseRange().Kvs)
    } else { // If =false
            fmt.Println("!!!", txnResp.Responses[0].GetResponseRange().Kvs)
    }
    

    Succeed=true表示If条件成立,接下来我们需要获取Then或者Else中的OpResponse列表(因为可以传多个Op),可以看一下txnResp的结构:

    type TxnResponse struct {
        Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
        // succeeded is set to true if the compare evaluated to true or false otherwise.
        Succeeded bool `protobuf:"varint,2,opt,name=succeeded,proto3" json:"succeeded,omitempty"`
        // responses is a list of responses corresponding to the results from applying
        // success if succeeded is true or failure if succeeded is false.
        Responses []*ResponseOp `protobuf:"bytes,3,rep,name=responses" json:"responses,omitempty"`
    }
    

    1.6、监听watch

    在分析watch之前,我们先来罗列以下watch的整个流程:

    • a、我们先往etcd写入一对K-V

    • b、我们使用watch监听这对K-V,如果一切正常, 这时候请求会被阻塞住.

    • c、现在我们修改这一对K-V

    • d、阻塞的那个请求返回watch到的结果

        {
          "action":"set",
          "node":{ 
              "key":"/name",
              "value":"神蛋使者1号",
              "modifiedIndex":25,
             "createdIndex":25
          },
           "prevNode": {
             "key":"/name",
             "value":"神蛋使者",
             "modifiedIndex":24,
             "createdIndex":24
           }
        }
      

    1.6.1. watch接口定义

    type Watcher interface {
        // Watch watches on a key or prefix. The watched events will be returned
        // through the returned channel.
        // If the watch is slow or the required rev is compacted, the watch request
        // might be canceled from the server-side and the chan will be closed.
        // 'opts' can be: 'WithRev' and/or 'WithPrefix'.
        Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
    
        // Close closes the watcher and cancels all watch requests.
        Close() error
    }
    

    该接口定义了两个方法, Watch 和 Close

    Watch 方法返回一个WatchChan 类似的变量, WatchChan是一个channel, 定义如下:

    type WatchChan <-chan WatchResponse
    

    该通道传递WatchResponse类型

    type WatchResponse struct {
        Header pb.ResponseHeader
        Events []*Event
    
        // CompactRevision is the minimum revision the watcher may receive.
        CompactRevision int64
    
        // Canceled is used to indicate watch failure.
        // If the watch failed and the stream was about to close, before the channel is closed,
        // the channel sends a final response that has Canceled set to true with a non-nil Err().
        Canceled bool
    
        // Created is used to indicate the creation of the watcher.
        Created bool
    
        closeErr error
    }
    

    其中Event类型是一个gRPC生成的消息对象

    type Event struct {
        // type is the kind of event. If type is a PUT, it indicates
        // new data has been stored to the key. If type is a DELETE,
        // it indicates the key was deleted.
        Type Event_EventType `protobuf:"varint,1,opt,name=type,proto3,enum=mvccpb.Event_EventType" json:"type,omitempty"`
        // kv holds the KeyValue for the event.
        // A PUT event contains current kv pair.
        // A PUT event with kv.Version=1 indicates the creation of a key.
        // A DELETE/EXPIRE event contains the deleted key with
        // its modification revision set to the revision of deletion.
        Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
        // prev_kv holds the key-value pair before the event happens.
        PrevKv *KeyValue `protobuf:"bytes,3,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
    }
    

    接下来看实现了Watcher接口的watcher类型

    // watcher implements the Watcher interface
    type watcher struct {
        remote pb.WatchClient
    
        // mu protects the grpc streams map
        mu sync.RWMutex
    
        // streams holds all the active grpc streams keyed by ctx value.
        streams map[string]*watchGrpcStream
    }
    

    watcher结构很简单, 只有3个字段. remote抽象了发起watch请求的客户端, streams是一个map, 这个map映射了交互的数据流.还有一个保护并发环境下数据流读写安全的读写锁.

    streams所属的watchGrpcStream类型抽象了所有交互的数据, 它的结构定义如下

    type watchGrpcStream struct {
        owner  *watcher
        remote pb.WatchClient
    
        // ctx controls internal remote.Watch requests
        ctx context.Context
        // ctxKey is the key used when looking up this stream's context
        ctxKey string
        cancel context.CancelFunc
    
        // substreams holds all active watchers on this grpc stream
        substreams map[int64]*watcherStream
        // resuming holds all resuming watchers on this grpc stream
        resuming []*watcherStream
    
        // reqc sends a watch request from Watch() to the main goroutine
        reqc chan *watchRequest
        // respc receives data from the watch client
        respc chan *pb.WatchResponse
        // donec closes to broadcast shutdown
        donec chan struct{}
        // errc transmits errors from grpc Recv to the watch stream reconn logic
        errc chan error
        // closingc gets the watcherStream of closing watchers
        closingc chan *watcherStream
        // wg is Done when all substream goroutines have exited
        wg sync.WaitGroup
    
        // resumec closes to signal that all substreams should begin resuming
        resumec chan struct{}
        // closeErr is the error that closed the watch stream
        closeErr error
    }
    

    比较有意思的是, watchGrpcStream也包含了一个watcher类型的owner字段, watcher和watchGrpcStream可以互相引用到对方.同时又定义了watcher类型中已经定义过的remote,而且还不是指针类型, 这点不大明白作用是啥.

    还有几个字段值得关注, 一个是substreams, 看下它的定义和注释:

    // substreams holds all active watchers on this grpc stream
    substreams map[int64]*watcherStream
    

    再看看watcherStream类型的定义:

    // watcherStream represents a registered watcher
    type watcherStream struct {
        // initReq is the request that initiated this request
        initReq watchRequest
    
        // outc publishes watch responses to subscriber
        outc chan WatchResponse
        // recvc buffers watch responses before publishing
        recvc chan *WatchResponse
        // donec closes when the watcherStream goroutine stops.
        donec chan struct{}
        // closing is set to true when stream should be scheduled to shutdown.
        closing bool
        // id is the registered watch id on the grpc stream
        id int64
    
        // buf holds all events received from etcd but not yet consumed by the client
        buf []*WatchResponse
    }
    

    画个图整理下他们之间的关系:


    1244770-8d56f4f0d90de613.png

    接下来轮到watcher是如何watch方法的了:

    // Watch posts a watch request to run() and waits for a new watcher channel
    func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
        // 应用配置
        ow := opWatch(key, opts...)
    
        var filters []pb.WatchCreateRequest_FilterType
        if ow.filterPut {
        filters = append(filters, pb.WatchCreateRequest_NOPUT)
        }
        if ow.filterDelete {
        filters = append(filters, pb.WatchCreateRequest_NODELETE)
        }
    
        // 根据传入的参数构造watch请求
        wr := &watchRequest{
        ctx:            ctx,
        createdNotify:  ow.createdNotify,
        key:            string(ow.key),
        end:            string(ow.end),
        rev:            ow.rev,
        progressNotify: ow.progressNotify,
        filters:        filters,
        prevKV:         ow.prevKV,
        retc:           make(chan chan WatchResponse, 1),
        }
    
        ok := false
        // 将请求上下文格式化为字符串
        ctxKey := fmt.Sprintf("%v", ctx)
    
        // find or allocate appropriate grpc watch stream
        // 接下来配置对应的输出流, 注意得加锁
        w.mu.Lock()
    
        // 如果stream为空, 返回一个已经关闭的channel.
        // 这种情况应该是防止streams为空的情况
        if w.streams == nil {
        // closed
        w.mu.Unlock()
        ch := make(chan WatchResponse)
        close(ch)
        return ch
        }
    
        // 注意这里, 前面我们提到streams是一个map,该map的key是请求上下文
        // 如果该请求对应的流为空,则新建
        wgs := w.streams[ctxKey]
        if wgs == nil {
        wgs = w.newWatcherGrpcStream(ctx)
        w.streams[ctxKey] = wgs
        }
        donec := wgs.donec
        reqc := wgs.reqc
        w.mu.Unlock()
    
        // couldn't create channel; return closed channel
        // couldn't create channel; return closed channel
        // 这里要设置为缓冲的原因可能与下面的两个
        // closeCh <- WatchResponse{closeErr: wgs.closeErr}
        // 语句有关,这里不理解
        closeCh := make(chan WatchResponse, 1)
    
        // submit request
        select {
        // 发送上面构造好的watch请求给对应的流
        case reqc <- wr:
        ok = true
        // 请求断开(这里应该囊括了客户端请求断开的所有情况)
        case <-wr.ctx.Done():
        // watch完成
        // 这里应该是处理非正常完成的情况
        // 注意下面的重试逻辑
        case <-donec:
        if wgs.closeErr != nil {
            // 如果不是空上下文导致流被丢弃的情况
            // 则不应该重试
            closeCh <- WatchResponse{closeErr: wgs.closeErr}
            break
        }
        // retry; may have dropped stream from no ctxs
        return w.Watch(ctx, key, opts...)
        }
    
        // receive channel
        // 如果是初始请求顺利发送才会执行这里
        if ok {
        select {
        case ret := <-wr.retc:
            return ret
        case <-ctx.Done():
        case <-donec:
            if wgs.closeErr != nil {
                closeCh <- WatchResponse{closeErr: wgs.closeErr}
                break
            }
            // retry; may have dropped stream from no ctxs
            return w.Watch(ctx, key, opts...)
        }
        }
    
        close(closeCh)
        return closeCh
    }
    

    还有Watcher接口的另一个方法Close:

    func (w *watcher) Close() (err error) {
        // 在锁内先将streams字段置为空
        // 在锁外再将一个个流都关闭
        // 这样做的意义在于不管哪个流关闭失败了
        // 都能先保证streams与这些流的关系被切断
        w.mu.Lock()
        streams := w.streams
        w.streams = nil
        w.mu.Unlock()
        for _, wgs := range streams {
        if werr := wgs.Close(); werr != nil {
            err = werr
        }
        }
        // etcd竟然也只是返回一个error
        // 虽然上面的for循环可能产生多个error
        return err
    }
    

    相关文章

      网友评论

          本文标题:etcd的使用

          本文链接:https://www.haomeiwen.com/subject/tdgbuqtx.html