线性一致读
实现api
//NodeImpl
readIndex(final byte[] requestContext, final ReadIndexClosure done)
调用readIndex后
-
把请求添加到队列readIndexQueue (disruptor队列)
-
添加后被ReadIndexEventHandler处理 (如果生产速度过快32个一批)
-
构造请求ReadIndexRequest, 调用NodeImpl.handleReadIndexRequest处理,根据Node是leader还是follwer执行不同的操作
-
leader
-
记录lastCommittedIndex
-
向所有的peer发送心跳,过半节点心跳反馈后,触发ReadIndexResponseClosure的run方法。
-
如果lastCommittedInde未提交则批量请求都等待
-
如果lastCommittedInde已提交,则批量的请求都释放,见ReadIndexClosure (核心代码ReadIndexResponseClosure.notifySuccess)
private void notifySuccess(final ReadIndexStatus status) { final long nowMs = Utils.monotonicMs(); final List<ReadIndexState> states = status.getStates(); final int taskCount = states.size(); for (int i = 0; i < taskCount; i++) { final ReadIndexState task = states.get(i); final ReadIndexClosure done = task.getDone(); // stack copy if (done != null) { this.nodeMetrics.recordLatency("read-index", nowMs - task.getStartTimeMs()); done.setResult(task.getIndex(), task.getRequestContext().get()); done.run(Status.OK()); } } }
ReadIndexClosure.run核心代码会执行用户定义的run方法
public void run(final Status status) { run(status, this.index, this.requestContext); }
-
-
follower
- 向leader发送readIndex rpc请求
- leader执行一次readIndex, 返回leader的readIndex给follower
- 等待收到的readIndex已经apply,执行donw
网友评论