1 收到client请求
先是rpc的请求处理,最后会到NodeImpl的apply方法
第一步自然是写log,到LogEntryAndClosureHandler来处理
就来到LogManagerImpl的executeApplyingTasks方法,
再到appendEntries的方法
image.png
调用wakeupAllWaiter发送给follower
2 发送给follower
LogManagerImpl类的wakeupAllWaiter
image.png image.png image.png
会调用sendEntries,就会具体发送请求给follower了
image.png
注意一下,LogManagerImpl的waitmap的填充是在上一个请求的返回。
image.png image.png
这样就是串行的发送请求了。
3 超过半数以上的follower同意
在 Replicator类的 onAppendEntriesReturned方法
final int entriesSize = request.getEntriesCount();
if (entriesSize > 0) {
r.options.getBallotBox().commitAt(r.nextIndex, r.nextIndex + entriesSize - 1, r.options.getPeerId());
BallotBox的commitAt方法,超过半数成功返回的时候:
this.waiter.onCommitted(lastCommittedIndex);
然后FSMCallerImpl类:
private long runApplyTask(final ApplyTask task, long maxCommittedIndex, final boolean endOfBatch) {
CountDownLatch shutdown = null;
if (task.type == TaskType.COMMITTED) {
if (task.committedIndex > maxCommittedIndex) {
maxCommittedIndex = task.committedIndex;
}
} else {
if (maxCommittedIndex >= 0) {
this.currTask = TaskType.COMMITTED;
doCommitted(maxCommittedIndex); //这里提交
private void doApplyTasks(final IteratorImpl iterImpl) {
final IteratorWrapper iter = new IteratorWrapper(iterImpl);
final long startApplyMs = Utils.monotonicMs();
final long startIndex = iter.getIndex();
try {
this.fsm.onApply(iter);
this.fsm.onApply(iter)就是应用到状态机了。主流程就结束了。
网友评论