参考文章:
etcd有两种读模式:
一种是串行 (Serializable) 读,直接读状态机数据返回、无需通过 Raft 协议与集群进行交互。它具有低延时、高吞吐量的特点,适合对数据一致性要求不高的场景。
一种是线性(Linearizable)读,etcd 默认读模式是线性读,因为它需要经过 Raft 协议模块,反应的是集群共识,因此在延时和吞吐量上相比串行读略差一点,适用于对数据一致性要求高的场景。
1. 入参
type RangeRequest struct {
// key is the first key for the range. If range_end is not given, the request only looks up key.
Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
// range_end is the upper bound on the requested range [key, range_end).
// If range_end is '\0', the range is all keys >= key.
// If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"),
// then the range request gets all keys prefixed with key.
// If both key and range_end are '\0', then the range request returns all keys.
RangeEnd []byte `protobuf:"bytes,2,opt,name=range_end,json=rangeEnd,proto3" json:"range_end,omitempty"`
// limit is a limit on the number of keys returned for the request. When limit is set to 0,
// it is treated as no limit.
Limit int64 `protobuf:"varint,3,opt,name=limit,proto3" json:"limit,omitempty"`
// revision is the point-in-time of the key-value store to use for the range.
// If revision is less or equal to zero, the range is over the newest key-value store.
// If the revision has been compacted, ErrCompacted is returned as a response.
Revision int64 `protobuf:"varint,4,opt,name=revision,proto3" json:"revision,omitempty"`
// sort_order is the order for returned sorted results.
SortOrder RangeRequest_SortOrder `protobuf:"varint,5,opt,name=sort_order,json=sortOrder,proto3,enum=etcdserverpb.RangeRequest_SortOrder" json:"sort_order,omitempty"`
// sort_target is the key-value field to use for sorting.
SortTarget RangeRequest_SortTarget `protobuf:"varint,6,opt,name=sort_target,json=sortTarget,proto3,enum=etcdserverpb.RangeRequest_SortTarget" json:"sort_target,omitempty"`
// serializable sets the range request to use serializable member-local reads.
// Range requests are linearizable by default; linearizable requests have higher
// latency and lower throughput than serializable requests but reflect the current
// consensus of the cluster. For better performance, in exchange for possible stale reads,
// a serializable range request is served locally without needing to reach consensus
// with other nodes in the cluster.
Serializable bool `protobuf:"varint,7,opt,name=serializable,proto3" json:"serializable,omitempty"`
// keys_only when set returns only the keys and not the values.
KeysOnly bool
// count_only when set returns only the count of the keys in the range.
CountOnly bool
MinModRevision int64
MaxModRevision int64
MinCreateRevision int64
MaxCreateRevision int64
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
当etcd server接受到read请求:
如果是linearizable read的话,会调用linearizableReadNotify,然后再去read kv。
在linearizableReadNotify中,会通过nobuffer的channel来处理:给s.readwaitc发信号,然后等待 s.readNotifier,因为channel都是no-buff的,所以多个函数同时调用的话,是串行的、并且是1-1对应的
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
var resp *pb.RangeResponse
if !r.Serializable {
err = s.linearizableReadNotify(ctx)
trace.Step("agreement among raft nodes before linearized reading")
if err != nil {
return nil, err
}
}
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
get := func() { resp, err = s.applyV3Base.Range(ctx, nil, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
err = serr
return nil, err
}
return resp, err
}
func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
s.readMu.RLock()
nc := s.readNotifier
s.readMu.RUnlock()
// signal linearizable loop for current notify if it hasn't been already
select {
case s.readwaitc <- struct{}{}:
default:
}
// wait for read state notification
select {
case <-nc.c:
return nc.err
case <-ctx.Done():
return ctx.Err()
case <-s.done:
return ErrStopped
}
}
linearizableReadLoop接受到channel中的信号之后,会调用s.requestCurrentIndex获取leader的committed index,等到此etcd-node的applied index > confirmedIndex的时候,上面提高的linearizableReadNotify就会收到通知返回。
func (s *EtcdServer) linearizableReadLoop() {
for {
requestId := s.reqIDGen.Next()
select {
case <-s.readwaitc:
}
nextnr := newNotifier()
s.readMu.Lock()
nr := s.readNotifier
s.readNotifier = nextnr
s.readMu.Unlock()
confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
if isStopped(err) {
return
}
if err != nil {
nr.notify(err)
continue
}
appliedIndex := s.getAppliedIndex()
if appliedIndex < confirmedIndex {
select {
case <-s.applyWait.Wait(confirmedIndex):
case <-s.stopping:
return
}
}
nr.notify(nil)
}
}
关键是s.requestCurrentIndex
2. requestCurrentIndex
首先上面提到的函数,为每一个read index请求生成一个唯一的requestID,然后调用s.sendReadIndex,这会调用raft-node的ReadIndex: return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
,其中Data是requestID。
func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) {
err := s.sendReadIndex(requestId)
if err != nil {
return 0, err
}
for {
select {
case rs := <-s.r.readStateC:
requestIdBytes := uint64ToBigEndianBytes(requestId)
gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes)
if !gotOwnResponse {
// a previous request might time out. now we should ignore the response of it and
// continue waiting for the response of the current requests.
responseId := uint64(0)
if len(rs.RequestCtx) == 8 {
responseId = binary.BigEndian.Uint64(rs.RequestCtx)
}
lg.Warn(
"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
zap.Uint64("sent-request-id", requestId),
zap.Uint64("received-request-id", responseId),
)
slowReadIndex.Inc()
continue
}
return rs.Index, nil
}
}
}
2.1 leader处理MsgReadIndex
leader需要做的事情是,返回当前的commit index。这里有两个点:
1)确定leader确实是leader;这是为了处理网络分区等情况
2)leader必须在当前任期内提交过index。为什么呢?以下图中的(e)为例,当S1没有committed 4的情况下,很有可能获取的结果是1,而不是2,而2是在(c)中的提案
先确定是否有committedEntryInCurrentTerm,否则的话,先pending;如果已经有committedEntryInCurrentTerm,则使用requestID发送心跳
// Postpone read only request when this leader has not committed
// any log entry at its term.
if !r.committedEntryInCurrentTerm() {
r.pendingReadIndexMessages = append(r.pendingReadIndexMessages, m)
return nil
}
sendMsgReadIndexResponse(r, m)
func sendMsgReadIndexResponse(r *raft, m pb.Message) {
// thinking: use an internally defined context instead of the user given context.
// We can express this in terms of the term and index instead of a user-supplied value.
// This would allow multiple reads to piggyback on the same message.
switch r.readOnly.option {
// If more than the local vote is needed, go through a full broadcast.
case ReadOnlySafe:
r.readOnly.addRequest(r.raftLog.committed, m)
// The local node automatically acks the request.
r.readOnly.recvAck(r.id, m.Entries[0].Data)
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
case ReadOnlyLeaseBased:
if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
r.send(resp)
}
}
}
leader处理对应的MsgHeartbeatResp信号:查看requestID的MsgHeartbeatResp是否quorum.VoteWon。如果quorum.VoteWon,
- 如果是leader直接收到ReadIndex,则readStates中添加requestID对应的ReadState;
- 如果MsgReadIndex是follower转发过来的,则leader回应给对应的follower MsgReadIndexResp
if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
return nil
}
rss := r.readOnly.advance(m)
for _, rs := range rss {
if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
r.send(resp)
}
}
func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message {
if req.From == None || req.From == r.id {
r.readStates = append(r.readStates, ReadState{
Index: readIndex,
RequestCtx: req.Entries[0].Data,
})
return pb.Message{}
}
return pb.Message{
Type: pb.MsgReadIndexResp,
To: req.From,
Index: readIndex,
Entries: req.Entries,
}
}
2.2 follower处理MsgReadIndex
首先会将Msg转发到leader,leader处理完之后会转发MsgReadIndexResp到follower。
会根据requestID生成readStates,这样接下来的处理就跟leader一样了。
case pb.MsgReadIndexResp:
if len(m.Entries) != 1 {
r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
return nil
}
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
2.3 leader/follwer处理readStates
继续回到requestCurrentIndex,根据readState中的requestID进行匹配,如果匹配到则返回index。
for {
select {
case rs := <-s.r.readStateC:
requestIdBytes := uint64ToBigEndianBytes(requestId)
gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes)
if !gotOwnResponse {
// a previous request might time out. now we should ignore the response of it and
// continue waiting for the response of the current requests.
responseId := uint64(0)
if len(rs.RequestCtx) == 8 {
responseId = binary.BigEndian.Uint64(rs.RequestCtx)
}
lg.Warn(
"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
zap.Uint64("sent-request-id", requestId),
zap.Uint64("received-request-id", responseId),
)
slowReadIndex.Inc()
continue
}
return rs.Index, nil
}
}
网友评论