美文网首页
etcd学习笔记(二): Linearizable Read

etcd学习笔记(二): Linearizable Read

作者: wangshanshi | 来源:发表于2021-11-26 18:56 被阅读0次

参考文章:

  1. https://zhuanlan.zhihu.com/p/27869566
  2. https://time.geekbang.org/column/article/335932

etcd有两种读模式:
一种是串行 (Serializable) 读,直接读状态机数据返回、无需通过 Raft 协议与集群进行交互。它具有低延时、高吞吐量的特点,适合对数据一致性要求不高的场景。
一种是线性(Linearizable)读,etcd 默认读模式是线性读,因为它需要经过 Raft 协议模块,反应的是集群共识,因此在延时和吞吐量上相比串行读略差一点,适用于对数据一致性要求高的场景。

1. 入参

type RangeRequest struct {
    // key is the first key for the range. If range_end is not given, the request only looks up key.
    Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
    // range_end is the upper bound on the requested range [key, range_end).
    // If range_end is '\0', the range is all keys >= key.
    // If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"),
    // then the range request gets all keys prefixed with key.
    // If both key and range_end are '\0', then the range request returns all keys.
    RangeEnd []byte `protobuf:"bytes,2,opt,name=range_end,json=rangeEnd,proto3" json:"range_end,omitempty"`
    // limit is a limit on the number of keys returned for the request. When limit is set to 0,
    // it is treated as no limit.
    Limit int64 `protobuf:"varint,3,opt,name=limit,proto3" json:"limit,omitempty"`
    // revision is the point-in-time of the key-value store to use for the range.
    // If revision is less or equal to zero, the range is over the newest key-value store.
    // If the revision has been compacted, ErrCompacted is returned as a response.
    Revision int64 `protobuf:"varint,4,opt,name=revision,proto3" json:"revision,omitempty"`
    // sort_order is the order for returned sorted results.
    SortOrder RangeRequest_SortOrder `protobuf:"varint,5,opt,name=sort_order,json=sortOrder,proto3,enum=etcdserverpb.RangeRequest_SortOrder" json:"sort_order,omitempty"`
    // sort_target is the key-value field to use for sorting.
    SortTarget RangeRequest_SortTarget `protobuf:"varint,6,opt,name=sort_target,json=sortTarget,proto3,enum=etcdserverpb.RangeRequest_SortTarget" json:"sort_target,omitempty"`
    // serializable sets the range request to use serializable member-local reads.
    // Range requests are linearizable by default; linearizable requests have higher
    // latency and lower throughput than serializable requests but reflect the current
    // consensus of the cluster. For better performance, in exchange for possible stale reads,
    // a serializable range request is served locally without needing to reach consensus
    // with other nodes in the cluster.
    Serializable bool `protobuf:"varint,7,opt,name=serializable,proto3" json:"serializable,omitempty"`
    // keys_only when set returns only the keys and not the values.
    KeysOnly bool 
    // count_only when set returns only the count of the keys in the range.
    CountOnly bool 


    MinModRevision int64
    MaxModRevision int64
    MinCreateRevision int64
    MaxCreateRevision    int64   
    XXX_NoUnkeyedLiteral struct{} `json:"-"`
    XXX_unrecognized     []byte   `json:"-"`
    XXX_sizecache        int32    `json:"-"`
}

当etcd server接受到read请求:
如果是linearizable read的话,会调用linearizableReadNotify,然后再去read kv。
在linearizableReadNotify中,会通过nobuffer的channel来处理:给s.readwaitc发信号,然后等待 s.readNotifier,因为channel都是no-buff的,所以多个函数同时调用的话,是串行的、并且是1-1对应的

func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {

    var resp *pb.RangeResponse

    if !r.Serializable {
        err = s.linearizableReadNotify(ctx)
        trace.Step("agreement among raft nodes before linearized reading")
        if err != nil {
            return nil, err
        }
    }
    chk := func(ai *auth.AuthInfo) error {
        return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
    }

    get := func() { resp, err = s.applyV3Base.Range(ctx, nil, r) }
    if serr := s.doSerialize(ctx, chk, get); serr != nil {
        err = serr
        return nil, err
    }
    return resp, err
}

func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
    s.readMu.RLock()
    nc := s.readNotifier
    s.readMu.RUnlock()

    // signal linearizable loop for current notify if it hasn't been already
    select {
    case s.readwaitc <- struct{}{}:
    default:
    }

    // wait for read state notification
    select {
    case <-nc.c:
        return nc.err
    case <-ctx.Done():
        return ctx.Err()
    case <-s.done:
        return ErrStopped
    }
}

linearizableReadLoop接受到channel中的信号之后,会调用s.requestCurrentIndex获取leader的committed index,等到此etcd-node的applied index > confirmedIndex的时候,上面提高的linearizableReadNotify就会收到通知返回。

func (s *EtcdServer) linearizableReadLoop() {
    for {
        requestId := s.reqIDGen.Next()

        select {
        case <-s.readwaitc:
        }

        nextnr := newNotifier()
        s.readMu.Lock()
        nr := s.readNotifier
        s.readNotifier = nextnr
        s.readMu.Unlock()

        confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
        if isStopped(err) {
            return
        }
        if err != nil {
            nr.notify(err)
            continue
        }
        appliedIndex := s.getAppliedIndex()
        if appliedIndex < confirmedIndex {
            select {
            case <-s.applyWait.Wait(confirmedIndex):
            case <-s.stopping:
                return
            }
        }
        nr.notify(nil)
    }
}

关键是s.requestCurrentIndex

2. requestCurrentIndex

首先上面提到的函数,为每一个read index请求生成一个唯一的requestID,然后调用s.sendReadIndex,这会调用raft-node的ReadIndex: return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}),其中Data是requestID。

func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) {
    err := s.sendReadIndex(requestId)
    if err != nil {
        return 0, err
    }


    for {
        select {
        case rs := <-s.r.readStateC:
            requestIdBytes := uint64ToBigEndianBytes(requestId)
            gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes)
            if !gotOwnResponse {
                // a previous request might time out. now we should ignore the response of it and
                // continue waiting for the response of the current requests.
                responseId := uint64(0)
                if len(rs.RequestCtx) == 8 {
                    responseId = binary.BigEndian.Uint64(rs.RequestCtx)
                }
                lg.Warn(
                    "ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
                    zap.Uint64("sent-request-id", requestId),
                    zap.Uint64("received-request-id", responseId),
                )
                slowReadIndex.Inc()
                continue
            }
            return rs.Index, nil
        }
    }
}

2.1 leader处理MsgReadIndex

leader需要做的事情是,返回当前的commit index。这里有两个点:
1)确定leader确实是leader;这是为了处理网络分区等情况
2)leader必须在当前任期内提交过index。为什么呢?以下图中的(e)为例,当S1没有committed 4的情况下,很有可能获取的结果是1,而不是2,而2是在(c)中的提案

image.png

先确定是否有committedEntryInCurrentTerm,否则的话,先pending;如果已经有committedEntryInCurrentTerm,则使用requestID发送心跳

        // Postpone read only request when this leader has not committed
        // any log entry at its term.
        if !r.committedEntryInCurrentTerm() {
            r.pendingReadIndexMessages = append(r.pendingReadIndexMessages, m)
            return nil
        }

        sendMsgReadIndexResponse(r, m)


func sendMsgReadIndexResponse(r *raft, m pb.Message) {
    // thinking: use an internally defined context instead of the user given context.
    // We can express this in terms of the term and index instead of a user-supplied value.
    // This would allow multiple reads to piggyback on the same message.
    switch r.readOnly.option {
    // If more than the local vote is needed, go through a full broadcast.
    case ReadOnlySafe:
        r.readOnly.addRequest(r.raftLog.committed, m)
        // The local node automatically acks the request.
        r.readOnly.recvAck(r.id, m.Entries[0].Data)
        r.bcastHeartbeatWithCtx(m.Entries[0].Data)
    case ReadOnlyLeaseBased:
        if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
            r.send(resp)
        }
    }
}

leader处理对应的MsgHeartbeatResp信号:查看requestID的MsgHeartbeatResp是否quorum.VoteWon。如果quorum.VoteWon,

  1. 如果是leader直接收到ReadIndex,则readStates中添加requestID对应的ReadState;
  2. 如果MsgReadIndex是follower转发过来的,则leader回应给对应的follower MsgReadIndexResp
    if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
            return nil
        }

        rss := r.readOnly.advance(m)
        for _, rs := range rss {
            if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
                r.send(resp)
            }
        }
func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message {
    if req.From == None || req.From == r.id {
        r.readStates = append(r.readStates, ReadState{
            Index:      readIndex,
            RequestCtx: req.Entries[0].Data,
        })
        return pb.Message{}
    }
    return pb.Message{
        Type:    pb.MsgReadIndexResp,
        To:      req.From,
        Index:   readIndex,
        Entries: req.Entries,
    }
}

2.2 follower处理MsgReadIndex

首先会将Msg转发到leader,leader处理完之后会转发MsgReadIndexResp到follower。
会根据requestID生成readStates,这样接下来的处理就跟leader一样了。

    case pb.MsgReadIndexResp:
        if len(m.Entries) != 1 {
            r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
            return nil
        }
        r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})

2.3 leader/follwer处理readStates

继续回到requestCurrentIndex,根据readState中的requestID进行匹配,如果匹配到则返回index。

for {
        select {
        case rs := <-s.r.readStateC:
            requestIdBytes := uint64ToBigEndianBytes(requestId)
            gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes)
            if !gotOwnResponse {
                // a previous request might time out. now we should ignore the response of it and
                // continue waiting for the response of the current requests.
                responseId := uint64(0)
                if len(rs.RequestCtx) == 8 {
                    responseId = binary.BigEndian.Uint64(rs.RequestCtx)
                }
                lg.Warn(
                    "ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
                    zap.Uint64("sent-request-id", requestId),
                    zap.Uint64("received-request-id", responseId),
                )
                slowReadIndex.Inc()
                continue
            }
            return rs.Index, nil
        }
    }

相关文章

网友评论

      本文标题:etcd学习笔记(二): Linearizable Read

      本文链接:https://www.haomeiwen.com/subject/mxlxxrtx.html