这篇是关于优化的snapshot。从连接中2.6小节快照管理Raft 算法原理及其在 CMQ 中的应用(上)中描述的那样,在floyd中实现也比较简单:
313 } else if (res.append_entries_res().success() == true) {
314 if (num_entries > 0) {
315 match_index_ = prev_log_index + num_entries;
316 // only log entries from the leader's current term are committed
317 // by counting replicas
318 if (append_entries->entries(num_entries - 1).term() == context_->current_term) {
319 AdvanceLeaderCommitIndex();
320 apply_->ScheduleApply();
321 }
322 next_index_ = prev_log_index + num_entries + 1;
323 }
194 // only leader will call AdvanceCommitIndex
195 // follower only need set commit as leader's
196 void Peer::AdvanceLeaderCommitIndex() {
197 Entry entry;
198 uint64_t new_commit_index = QuorumMatchIndex();
199 if (context_->commit_index < new_commit_index) {
200 context_->commit_index = new_commit_index;
201 raft_meta_->SetCommitIndex(context_->commit_index);
202 }
203 return;
204 }
62 void FloydApply::ApplyStateMachine() {
63 uint64_t last_applied = context_->last_applied;
64 // Apply as more entry as possible
65 uint64_t commit_index;
66 commit_index = context_->commit_index;
67
70 Entry log_entry;
71 if (last_applied >= commit_index) {
72 return;
73 }
74 // TODO: use batch commit to optimization
75 while (last_applied < commit_index) {
76 last_applied++;
77 raft_log_->GetEntry(last_applied, &log_entry);
78 // TODO: we need change the s type
79 // since the Apply may not operate rocksdb
80 rocksdb::Status s = Apply(log_entry);
81 if (!s.ok()) {
84 usleep(1000000);
85 ScheduleApply(); // try once more
86 return;
87 }
88 }
89 context_->apply_mu.Lock();
90 context_->last_applied = last_applied;
91 raft_meta_->SetLastApplied(last_applied); ///持久化applyindex
92 context_->apply_mu.Unlock();
93 context_->apply_cond.SignalAll();
94 }
网友评论