etcd的使用
一、安装配置
1、服务端
2、客户端
二、etcd的基础知识、原理分析
三、etcd的API使用
1.1、连接
想要访问etcd,我们必须实例化一个client:
cli,err := clientv3.New(clientv3.Config{
Endpoints:[]string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
这里需要传入的两个参数:
- Endpoints:etcd的多个节点服务地址,因为我是单点本机测试,所以只传1个。
- DialTimeout:创建client的首次连接超时,这里传了5秒,如果5秒都没有连接成功就会返回err;值得注意的是,一旦client创建成功,我们就不用再关心后续底层连接的状态了,client内部会重连。
接着我们来看一下client的定义:
type Client struct {
Cluster
KV
Lease
Watcher
Auth
Maintenance
// Username is a user name for authentication.
Username string
// Password is a password for authentication.
Password string
}
注意,这里显示的都是可导出的模块结构字段,代表了客户端能够使用的几大核心模块,其具体功能介绍如下:
- Cluster:向集群里增加etcd服务端节点之类,属于管理员操作。
- KV:我们主要使用的功能,即操作K-V。
- Lease:租约相关操作,比如申请一个TTL=10秒的租约。
- Watcher:观察订阅,从而监听最新的数据变化。
- Auth:管理etcd的用户和权限,属于管理员操作。
- Maintenance:维护etcd,比如主动迁移etcd的leader节点,属于管理员操作。
下面我们来分别具体介绍一下这几大核心模块:
1.2、k-v存取
1.2.1. kv对象的实例获取
kv := clientev3.NewKV(client)
我们来看一下这个kv对象具体长什么样子:
type KV interface {
// Put puts a key-value pair into etcd.
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte{0x10, 0x20}).
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
// Delete deletes a key, or optionally using WithRange(end), [key, end).
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
// Compact compacts etcd KV history before the given rev.
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
// Do applies a single Op on KV without a transaction.
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)
// Txn creates a transaction.
Txn(ctx context.Context) Txn
}
从KV对象的定义我们可知,它就是一个接口对象,包含几个主要的kv操作方法
1.2.2. k-v 存储 put
案例:
putResp, err := kv.Put(context.TODO(),"/data-dir/example", "hello-world!")
put方法的原型如下:
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
参数介绍:
ctx: Context包对象,是用来跟踪上下文的,列如超时控制
key: 存储对象的key
val: 存储对象的value
opts: 可变参数,额外选项
1.2.3 k-v查询 get
现在可以对存储的数据进行取值了:
getResp, err := kv.Get(context.TODO(), "/data-dir/example")
get方法原型如下:
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
从以上数据的存储和取值,我们知道put 返回PutResponse, get返回GetResponse,注意:不同的KV操作对应不同的response结构,定义如下:
type (
CompactResponse pb.CompactionResponse
PutResponse pb.PutResponse
GetResponse pb.RangeResponse
DeleteResponse pb.DeleteRangeResponse
TxnResponse pb.TxnResponse
)
我们分别来看一看PutResponse和GetResponse映射的RangeResponse结构的定义:
type PutResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// if prev_kv is set in the request, the previous key-value pair will be returned.
PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}
//Header里保存的主要是本次更新的revision信息
type RangeResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// kvs is the list of key-value pairs matched by the range request.
// kvs is empty when count is requested.
Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"`
// more indicates if there are more keys to return in the requested range.
More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`
// count is set to the number of keys within the range when requested.
Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
}
Kvs字段,保存了本次Get查询到的所有k-v对,我们继续看一下mvccpb.KeyValue对象长什么样子:
type KeyValue struct {
// key is the key in bytes. An empty key is not allowed.
Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
// create_revision is the revision of last creation on this key.
CreateRevision int64 `protobuf:"varint,2,opt,name=create_revision,json=createRevision,proto3" json:"create_revision,omitempty"`
// mod_revision is the revision of last modification on this key.
ModRevision int64 `protobuf:"varint,3,opt,name=mod_revision,json=modRevision,proto3" json:"mod_revision,omitempty"`
// version is the version of the key. A deletion resets
// the version to zero and any modification of the key
// increases its version.
Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"`
// value is the value held by the key, in bytes.
Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"`
// lease is the ID of the lease that attached to key.
// When the attached lease expires, the key will be deleted.
// If lease is 0, then no lease is attached to the key.
Lease int64 `protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"`
}
至于RangeResponse.More和Count,当我们使用withLimit()选项进行Get时会发挥作用,相当于翻页查询。
接下来,我们通过一个特别的Get选项,获取/data-dir/example目录下的所有孩子:
rangeResp, err := kv.Get(context.TODO(), "/data-dir/example", clientv3.WithPrefix())
WithPrefix()是指查找以/data-dir/example为前缀的所有key,因此可以模拟出查找子目录的效果。
我们知道etcd是一个有序的k-v存储,因此/data-dir/example为前缀的key总是顺序排列在一起。
withPrefix实际上会转化为范围查询,它根据前缀/data-dir/example生成了一个key range,[“/test/”, “/test0”),为什么呢?因为比/大的字符是’0’,所以以/test0作为范围的末尾,就可以扫描到所有的/test/打头的key了。
1.3、租约lease
我们先来获取一个lease对象:
lease := clientv3.NewLease(client)
接着我们来看一下lease对象长什么样子:
type Lease interface {
// Grant creates a new lease.
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
// Revoke revokes the given lease.
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
// TimeToLive retrieves the lease information of the given lease ID.
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
// Leases retrieves all leases.
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
// KeepAlive keeps the given lease alive forever.
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
// KeepAliveOnce renews the lease once. In most of the cases, KeepAlive
// should be used instead of KeepAliveOnce.
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
// Close releases all resources Lease keeps for efficient communication
// with the etcd server.
Close() error
}
Lease提供了几个功能:
- Grant:分配一个租约。
- Revoke:释放一个租约。
- TimeToLive:获取剩余TTL时间。
- Leases:列举所有etcd中的租约。
- KeepAlive:自动定时的续约某个租约。
- KeepAliveOnce:为某个租约续约一次。
- Close:貌似是关闭当前客户端建立的所有租约。
要想实现key自动过期,首先得创建一个租约,它有10秒的TTL:
grantResp, err := lease.Grant(context.TODO(), 10)
grantResp中主要使用到了ID,也就是租约ID:
// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
type LeaseGrantResponse struct {
*pb.ResponseHeader
ID LeaseID
TTL int64
Error string
}
接下来,我们用这个租约来Put一个会自动过期的Key:
kv.Put(context.TODO(), "/example/expireme", "lease-go", clientv3.WithLease(grantResp.ID))
这里特别需要注意,有一种情况是在Put之前Lease已经过期了,那么这个Put操作会返回error,此时你需要重新分配Lease。
当我们实现服务注册时,需要主动给Lease进行续约,这需要调用KeepAlive/KeepAliveOnce,你可以在一个循环中定时的调用:
keepResp, err := lease.KeepAliveOnce(context.TODO(), grantResp.ID)
// sleep一会...
keepResp结构如下:
// LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse.
type LeaseKeepAliveResponse struct {
*pb.ResponseHeader
ID LeaseID
TTL int64
}
KeepAlive和Put一样,如果在执行之前Lease就已经过期了,那么需要重新分配Lease。Etcd并没有提供API来实现原子的Put with Lease。
1.4 Op
Op字面意思就是”操作”,Get和Put都属于Op,只是为了简化用户开发而开放的特殊API。
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)
其参数Op是一个抽象的操作,可以是Put/Get/Delete…;而OpResponse是一个抽象的结果,可以是PutResponse/GetResponse…
可以通过一些函数来分配Op:
func OpDelete(key string, opts …OpOption) Op
func OpGet(key string, opts …OpOption) Op
func OpPut(key, val string, opts …OpOption) Op
func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op
其实和直接调用KV.Put,KV.GET没什么区别。请看如下案例:
op1 := clientv3.OpPut("/hi", "hello", clientv3.WithPrevKV())
opResp, err := kv.Do(context.TODO(), op1)
这里设置一个key=/hi,value=hello,希望结果中返回覆盖之前的value。
把这个op交给Do方法执行,返回的opResp结构如下:
type OpResponse struct {
put *PutResponse
get *GetResponse
del *DeleteResponse
txn *TxnResponse
}
1.5、事务Tnx
etcd中事务是原子执行的,只支持if … then … else …这种表达,能实现一些有意思的场景。
首先,我们需要开启一个事务,这是通过KV对象的方法实现的:
txn := kv.Txn(context.TODO())
我写了如下的测试代码,Then和Else还比较好理解,If是比较陌生的。
txnResp, err := txn.If(clientv3.Compare(clientv3.Value("/hi"), "=", "hello")).
Then(clientv3.OpGet("/hi")).
Else(clientv3.OpGet("/test/", clientv3.WithPrefix())).
Commit()
我们先看下Txn支持的方法:
type Txn interface {
// If takes a list of comparison. If all comparisons passed in succeed,
// the operations passed into Then() will be executed. Or the operations
// passed into Else() will be executed.
If(cs ...Cmp) Txn
// Then takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() succeed.
Then(ops ...Op) Txn
// Else takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() fail.
Else(ops ...Op) Txn
// Commit tries to commit the transaction.
Commit() (*TxnResponse, error)
}
我们来看一下value方法:
func Value(key string) Cmp {
return Cmp{Key: []byte(key), Target: pb.Compare_VALUE}
}
这个Value(“/hi”)返回的Cmp表达了:”/hi这个key对应的value”。
接下来,利用Compare函数来继续为”主语”增加描述,形成了一个完整条件语句,即”/hi这个key对应的value”必须等于”hello”。
Compare函数实际上是对Value返回的Cmp对象进一步修饰,增加了”=”与”hello”两个描述信息:
func Compare(cmp Cmp, result string, v interface{}) Cmp {
var r pb.Compare_CompareResult
switch result {
case "=":
r = pb.Compare_EQUAL
case "!=":
r = pb.Compare_NOT_EQUAL
case ">":
r = pb.Compare_GREATER
case "<":
r = pb.Compare_LESS
default:
panic("Unknown result op")
}
cmp.Result = r
switch cmp.Target {
case pb.Compare_VALUE:
val, ok := v.(string)
if !ok {
panic("bad compare value")
}
cmp.TargetUnion = &pb.Compare_Value{Value: []byte(val)}
case pb.Compare_VERSION:
cmp.TargetUnion = &pb.Compare_Version{Version: mustInt64(v)}
case pb.Compare_CREATE:
cmp.TargetUnion = &pb.Compare_CreateRevision{CreateRevision: mustInt64(v)}
case pb.Compare_MOD:
cmp.TargetUnion = &pb.Compare_ModRevision{ModRevision: mustInt64(v)}
case pb.Compare_LEASE:
cmp.TargetUnion = &pb.Compare_Lease{Lease: mustInt64orLeaseID(v)}
default:
panic("Unknown compare type")
}
return cmp
}
Cmp可以用于描述”key=xxx的yyy属性,必须=、!=、<、>,kkk值”,比如:
- key=xxx的value,必须!=,hello。
- key=xxx的create版本号,必须=,11233。
- key=xxx的lease id,必须=,12319231231238。
经过Compare函数修饰的Cmp对象,内部包含了完整的条件信息,传递给If函数即可。
类似于Value的函数用于指定yyy属性,有这么几个方法:
- func CreateRevision(key string) Cmp:key=xxx的创建版本必须满足…
- func LeaseValue(key string) Cmp:key=xxx的Lease ID必须满足…
- func ModRevision(key string) Cmp:key=xxx的最后修改版本必须满足…
- func Value(key string) Cmp:key=xxx的创建值必须满足…
- func Version(key string) Cmp:key=xxx的累计更新次数必须满足…
最后Commit提交整个Txn事务,我们需要判断txnResp获知If条件是否成立:
if txnResp.Succeeded { // If = true
fmt.Println("~~~", txnResp.Responses[0].GetResponseRange().Kvs)
} else { // If =false
fmt.Println("!!!", txnResp.Responses[0].GetResponseRange().Kvs)
}
Succeed=true表示If条件成立,接下来我们需要获取Then或者Else中的OpResponse列表(因为可以传多个Op),可以看一下txnResp的结构:
type TxnResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// succeeded is set to true if the compare evaluated to true or false otherwise.
Succeeded bool `protobuf:"varint,2,opt,name=succeeded,proto3" json:"succeeded,omitempty"`
// responses is a list of responses corresponding to the results from applying
// success if succeeded is true or failure if succeeded is false.
Responses []*ResponseOp `protobuf:"bytes,3,rep,name=responses" json:"responses,omitempty"`
}
1.6、监听watch
在分析watch之前,我们先来罗列以下watch的整个流程:
-
a、我们先往etcd写入一对K-V
-
b、我们使用watch监听这对K-V,如果一切正常, 这时候请求会被阻塞住.
-
c、现在我们修改这一对K-V
-
d、阻塞的那个请求返回watch到的结果
{ "action":"set", "node":{ "key":"/name", "value":"神蛋使者1号", "modifiedIndex":25, "createdIndex":25 }, "prevNode": { "key":"/name", "value":"神蛋使者", "modifiedIndex":24, "createdIndex":24 } }
1.6.1. watch接口定义
type Watcher interface {
// Watch watches on a key or prefix. The watched events will be returned
// through the returned channel.
// If the watch is slow or the required rev is compacted, the watch request
// might be canceled from the server-side and the chan will be closed.
// 'opts' can be: 'WithRev' and/or 'WithPrefix'.
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
// Close closes the watcher and cancels all watch requests.
Close() error
}
该接口定义了两个方法, Watch 和 Close
Watch 方法返回一个WatchChan 类似的变量, WatchChan是一个channel, 定义如下:
type WatchChan <-chan WatchResponse
该通道传递WatchResponse类型
type WatchResponse struct {
Header pb.ResponseHeader
Events []*Event
// CompactRevision is the minimum revision the watcher may receive.
CompactRevision int64
// Canceled is used to indicate watch failure.
// If the watch failed and the stream was about to close, before the channel is closed,
// the channel sends a final response that has Canceled set to true with a non-nil Err().
Canceled bool
// Created is used to indicate the creation of the watcher.
Created bool
closeErr error
}
其中Event类型是一个gRPC生成的消息对象
type Event struct {
// type is the kind of event. If type is a PUT, it indicates
// new data has been stored to the key. If type is a DELETE,
// it indicates the key was deleted.
Type Event_EventType `protobuf:"varint,1,opt,name=type,proto3,enum=mvccpb.Event_EventType" json:"type,omitempty"`
// kv holds the KeyValue for the event.
// A PUT event contains current kv pair.
// A PUT event with kv.Version=1 indicates the creation of a key.
// A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion.
Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
// prev_kv holds the key-value pair before the event happens.
PrevKv *KeyValue `protobuf:"bytes,3,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}
接下来看实现了Watcher接口的watcher类型
// watcher implements the Watcher interface
type watcher struct {
remote pb.WatchClient
// mu protects the grpc streams map
mu sync.RWMutex
// streams holds all the active grpc streams keyed by ctx value.
streams map[string]*watchGrpcStream
}
watcher结构很简单, 只有3个字段. remote抽象了发起watch请求的客户端, streams是一个map, 这个map映射了交互的数据流.还有一个保护并发环境下数据流读写安全的读写锁.
streams所属的watchGrpcStream类型抽象了所有交互的数据, 它的结构定义如下
type watchGrpcStream struct {
owner *watcher
remote pb.WatchClient
// ctx controls internal remote.Watch requests
ctx context.Context
// ctxKey is the key used when looking up this stream's context
ctxKey string
cancel context.CancelFunc
// substreams holds all active watchers on this grpc stream
substreams map[int64]*watcherStream
// resuming holds all resuming watchers on this grpc stream
resuming []*watcherStream
// reqc sends a watch request from Watch() to the main goroutine
reqc chan *watchRequest
// respc receives data from the watch client
respc chan *pb.WatchResponse
// donec closes to broadcast shutdown
donec chan struct{}
// errc transmits errors from grpc Recv to the watch stream reconn logic
errc chan error
// closingc gets the watcherStream of closing watchers
closingc chan *watcherStream
// wg is Done when all substream goroutines have exited
wg sync.WaitGroup
// resumec closes to signal that all substreams should begin resuming
resumec chan struct{}
// closeErr is the error that closed the watch stream
closeErr error
}
比较有意思的是, watchGrpcStream也包含了一个watcher类型的owner字段, watcher和watchGrpcStream可以互相引用到对方.同时又定义了watcher类型中已经定义过的remote,而且还不是指针类型, 这点不大明白作用是啥.
还有几个字段值得关注, 一个是substreams, 看下它的定义和注释:
// substreams holds all active watchers on this grpc stream
substreams map[int64]*watcherStream
再看看watcherStream类型的定义:
// watcherStream represents a registered watcher
type watcherStream struct {
// initReq is the request that initiated this request
initReq watchRequest
// outc publishes watch responses to subscriber
outc chan WatchResponse
// recvc buffers watch responses before publishing
recvc chan *WatchResponse
// donec closes when the watcherStream goroutine stops.
donec chan struct{}
// closing is set to true when stream should be scheduled to shutdown.
closing bool
// id is the registered watch id on the grpc stream
id int64
// buf holds all events received from etcd but not yet consumed by the client
buf []*WatchResponse
}
画个图整理下他们之间的关系:
1244770-8d56f4f0d90de613.png
接下来轮到watcher是如何watch方法的了:
// Watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
// 应用配置
ow := opWatch(key, opts...)
var filters []pb.WatchCreateRequest_FilterType
if ow.filterPut {
filters = append(filters, pb.WatchCreateRequest_NOPUT)
}
if ow.filterDelete {
filters = append(filters, pb.WatchCreateRequest_NODELETE)
}
// 根据传入的参数构造watch请求
wr := &watchRequest{
ctx: ctx,
createdNotify: ow.createdNotify,
key: string(ow.key),
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
filters: filters,
prevKV: ow.prevKV,
retc: make(chan chan WatchResponse, 1),
}
ok := false
// 将请求上下文格式化为字符串
ctxKey := fmt.Sprintf("%v", ctx)
// find or allocate appropriate grpc watch stream
// 接下来配置对应的输出流, 注意得加锁
w.mu.Lock()
// 如果stream为空, 返回一个已经关闭的channel.
// 这种情况应该是防止streams为空的情况
if w.streams == nil {
// closed
w.mu.Unlock()
ch := make(chan WatchResponse)
close(ch)
return ch
}
// 注意这里, 前面我们提到streams是一个map,该map的key是请求上下文
// 如果该请求对应的流为空,则新建
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
donec := wgs.donec
reqc := wgs.reqc
w.mu.Unlock()
// couldn't create channel; return closed channel
// couldn't create channel; return closed channel
// 这里要设置为缓冲的原因可能与下面的两个
// closeCh <- WatchResponse{closeErr: wgs.closeErr}
// 语句有关,这里不理解
closeCh := make(chan WatchResponse, 1)
// submit request
select {
// 发送上面构造好的watch请求给对应的流
case reqc <- wr:
ok = true
// 请求断开(这里应该囊括了客户端请求断开的所有情况)
case <-wr.ctx.Done():
// watch完成
// 这里应该是处理非正常完成的情况
// 注意下面的重试逻辑
case <-donec:
if wgs.closeErr != nil {
// 如果不是空上下文导致流被丢弃的情况
// 则不应该重试
closeCh <- WatchResponse{closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
}
// receive channel
// 如果是初始请求顺利发送才会执行这里
if ok {
select {
case ret := <-wr.retc:
return ret
case <-ctx.Done():
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
}
}
close(closeCh)
return closeCh
}
还有Watcher接口的另一个方法Close:
func (w *watcher) Close() (err error) {
// 在锁内先将streams字段置为空
// 在锁外再将一个个流都关闭
// 这样做的意义在于不管哪个流关闭失败了
// 都能先保证streams与这些流的关系被切断
w.mu.Lock()
streams := w.streams
w.streams = nil
w.mu.Unlock()
for _, wgs := range streams {
if werr := wgs.Close(); werr != nil {
err = werr
}
}
// etcd竟然也只是返回一个error
// 虽然上面的for循环可能产生多个error
return err
}
网友评论