这段时间也在总结之前分析过的源码和记录的文章内容,为了加深理解,需要结合具体需求和业务,去思考更多可能。
之前分析过raft的基本实现Floyd&Raft的源码分析和对应的部分论文资料,大部分是按照论文来实现的,有些其他方面的并没有在代码中实现。后来因为想在过年假期把braft和bfs分析,看到github上关于raft的资料介绍braft RAFT介绍,发现这上面说的是之前没有考虑到的一些情况,所以这里再结合之前的源码总结下,这里还是要参考下tikv的pingcap/raft-rs。
源码中,某个处于follower状态的node随机超时后从follower->candidate并对自己的term加1,并向其他节点请求投票,此时会带上自己的term,ip/port(可选),last_log_term和last_log_index;其他node收到后进行term比较,如果request_term > my_term就把自己变成follower,并对齐到request node的term,也不管自己是处于什么状态:
714 int FloydImpl::ReplyRequestVote(const CmdRequest& request, CmdResponse* response) {
715 slash::MutexLock l(&context_->global_mu);
716 bool granted = false;
717 CmdRequest_RequestVote request_vote = request.request_vote();
718 /*
719 * If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (5.1)
720 */
721 if (request_vote.term() > context_->current_term) {
722 context_->BecomeFollower(request_vote.term());
723 raft_meta_->SetCurrentTerm(context_->current_term);
724 }
725 // if caller's term smaller than my term, then I will notice him
726 if (request_vote.term() < context_->current_term) {
727 BuildRequestVoteResponse(context_->current_term, granted, response);
728 return -1;
729 }
bfs中的raft功能实现也是如此:
231 void RaftNodeImpl::Vote(::google::protobuf::RpcController* controller,
232 const ::baidu::bfs::VoteRequest* request,
233 ::baidu::bfs::VoteResponse* response,
234 ::google::protobuf::Closure* done) {
235 int64_t term = request->term();
236 const std::string& candidate = request->candidate();
237 int64_t last_log_index = request->last_log_index();
238 int64_t last_log_term = request->last_log_term();
239 LOG(INFO, "Recv vote request: %s %ld %ld / (%s %ld %ld)",
240 candidate.c_str(), term, last_log_term,
241 voted_for_.c_str(), current_term_, log_term_);
242 MutexLock lock(&mu_);
243 CheckTerm(term);
244 if (term >= current_term_
245 && (voted_for_ == "" || voted_for_ == candidate)
246 && (last_log_term > log_term_ ||
247 (last_log_term == log_term_ && last_log_index >= log_index_))) {
248 voted_for_ = candidate;
249 if (!StoreContext("current_term", current_term_) || !StoreContext("voted_for", voted_for_)) {
250 LOG(FATAL, "Store term & vote_for fail %s %ld", voted_for_.c_str(), current_term_);
251 } else {
252 LOG(INFO, "Granted %s %ld %ld", candidate.c_str(), term, last_log_index);
253 }
254 response->set_vote_granted(true);
255 response->set_term(term);
256 done->Run();
257 return;
258 }
259
260 response->set_vote_granted(false);
261 response->set_term(current_term_);
262 done->Run();
263 }
引用braft RAFT介绍中的“原始的RAFT论文中对非对称的网络划分处理不好,比如S1、S2、S3分别位于三个IDC,其中S1和S2之间网络不通,其他之间可以联通。这样一旦S1或者是S2抢到了Leader,另外一方在超时之后就会触发选主,例如S1为Leader,S2不断超时触发选主,S3提升Term打断当前Lease,从而拒绝Leader的更新。”和“原始的RAFT论文中对于对称网络划分的处理是,一个节点再次上线之后,Leader接收到高于currentTerm的RequestVote请求就进行StepDown。这样即使这个节点已经通过RemovePeer删除了,依然会打断当前的Lease,导致复制组不可用。”
原引用中也给出相关的解决方案:
- 对于属于PeerSet中的节点,Leader会在重试的AppendEntries中因为遇到更高的term而StepDown;
- 对于不属于PeerSet中的节点,Leader永远忽略;
StepDown
RAFT原始协议中Leader收到任何term高于currentTerm的请求都会进行StepDown,在实际开发中应该在以下几个时刻进行StepDown:
- Leader接收到AppendEntries的失败应答,Term比currentTerm大;
- Leader在ElectionTimeout内没有写多数成功,通过logic clock检查实现(1个ElectionTimeout内会有10个HeartBeat);
- Leader在进行RemovePeer的LogEntry被Commit的时候,不在节点列表中,进行StepDown,通常还会进行Shutdown;
这里分析braft中的代码实现:
某节点进行pre_vote
,重点是1377行代码:
1372 OnPreVoteRPCDone* done = new OnPreVoteRPCDone(*iter, _current_term, this);
1373 done->cntl.set_timeout_ms(_options.election_timeout_ms);
1374 done->request.set_group_id(_group_id);
1375 done->request.set_server_id(_server_id.to_string());
1376 done->request.set_peer_id(iter->to_string());
1377 done->request.set_term(_current_term + 1); // next term
1378 done->request.set_last_log_index(last_log_id.index);
1379 done->request.set_last_log_term(last_log_id.term);
收到节点的``pre_vote```:
31 void RaftServiceImpl::pre_vote(google::protobuf::RpcController* cntl_base,
32 const RequestVoteRequest* request,
33 RequestVoteResponse* response,
34 google::protobuf::Closure* done) {
35 brpc::ClosureGuard done_guard(done);
36 brpc::Controller* cntl =
37 static_cast<brpc::Controller*>(cntl_base);
38
39 PeerId peer_id;
40 if (0 != peer_id.parse(request->peer_id())) {
41 cntl->SetFailed(EINVAL, "peer_id invalid");
42 return;
43 }
44
45 scoped_refptr<NodeImpl> node_ptr = NodeManager::GetInstance()->get(request->group_id(),
46 peer_id);
47 NodeImpl* node = node_ptr.get(); //判断节点是否存在
48 if (!node) {
49 cntl->SetFailed(ENOENT, "peer_id not exist");
50 return;
51 }
52
53 // TODO: should return butil::Status
54 int rc = node->handle_pre_vote_request(request, response);
55 if (rc != 0) {
56 cntl->SetFailed(rc, "%s", berror(rc));
57 return;
58 }
59 }
291 inline bool is_active_state(State s) {
292 // This should be as fast as possible
293 return s < STATE_ERROR;
294 }
267 enum State { //节点的几种状态
269 STATE_LEADER = 1,
270 STATE_TRANSFERRING = 2,
271 STATE_CANDIDATE = 3,
272 STATE_FOLLOWER = 4,
273 STATE_ERROR = 5,
274 STATE_UNINITIALIZED = 6,
275 STATE_SHUTTING = 7,
276 STATE_SHUTDOWN = 8,
277 STATE_END,
278 };
1748 int NodeImpl::handle_pre_vote_request(const RequestVoteRequest* request,
1749 RequestVoteResponse* response) {
1750 std::unique_lock<raft_mutex_t> lck(_mutex);
1751
1752 if (!is_active_state(_state)) {//判断该节点的状态
1755 lck.unlock();
1759 return EINVAL;
1760 }
1761
1762 PeerId candidate_id;
1763 if (0 != candidate_id.parse(request->server_id())) {//解析失败
1767 return EINVAL;
1768 }
1770 bool granted = false;
1771 do {
1772 if (request->term() < _current_term) {//请求的term比自己小
1773 // ignore older term
1778 break;
1779 }
1780
1781 // get last_log_id outof node mutex
1782 lck.unlock();
1783 LogId last_log_id = _log_manager->last_log_id(true);
1784 lck.lock();
1785 // pre_vote not need ABA check after unlock&lock
1786
1787 granted = (LogId(request->last_log_index(), request->last_log_term())
1788 >= last_log_id); //比较last_log_index和last_log_term,可以参考之前的floyd分析
1796 } while (0);
1798 response->set_term(_current_term);
1799 response->set_granted(granted);
1800 return 0;
1801 }
在真正给自己投票前,先进行pre_vote
,当发起pre_vote
节点收到请求后以决定是进行step_down
还是继续发起请求:
1296 struct OnPreVoteRPCDone : public google::protobuf::Closure {
1297 OnPreVoteRPCDone(const PeerId& peer_id_, const int64_t term_, NodeImpl* node_)
1298 : peer(peer_id_), term(term_), node(node_) {
1299 node->AddRef();
1300 }
1301 virtual ~OnPreVoteRPCDone() {
1302 node->Release();
1303 }
1304
1305 void Run() {
1306 do {
1307 if (cntl.ErrorCode() != 0) {
1310 break;
1311 }
1312 node->handle_pre_vote_response(peer, term, response);
1313 } while (0);
1314 delete this;
1315 }
1316
1317 PeerId peer;
1318 int64_t term;
1319 RequestVoteRequest request;
1320 RequestVoteResponse response;
1321 brpc::Controller cntl;
1322 NodeImpl* node;
1323 };
1254 void NodeImpl::handle_pre_vote_response(const PeerId& peer_id, const int64_t term,
1255 const RequestVoteResponse& response) {
1256 std::unique_lock<raft_mutex_t> lck(_mutex);
1257
1258 // check state
1259 if (_state != STATE_FOLLOWER) {
1263 return;
1264 }
1265 // check stale response
1266 if (term != _current_term) {
1270 return;
1271 }
1272 // check response term
1273 if (response.term() > _current_term) {
1277 butil::Status status;
1278 status.set_error(EHIGHERTERMRESPONSE, "Raft node receives higher term "
1279 "pre_vote_response.");
1280 step_down(response.term(), false, status);
1281 return;
1282 }
1287 // check granted quorum?
1288 if (response.granted()) {
1289 _pre_vote_ctx.grant(peer_id);
1290 if (_pre_vote_ctx.granted()) {
1291 elect_self(&lck);
1292 }
1293 }
1294 }
1392 void NodeImpl::elect_self(std::unique_lock<raft_mutex_t>* lck) {
1396 if (!_conf.contains(_server_id)) {
1399 return;
1400 }
1401 // cancel follower election timer
1402 if (_state == STATE_FOLLOWER) {
1405 _election_timer.stop();
1406 }
1415 _state = STATE_CANDIDATE;
1416 _current_term++;
1417 _voted_id = _server_id;
1437 std::set<PeerId> peers;
1438 _conf.list_peers(&peers);
1439
1440 for (std::set<PeerId>::const_iterator
1441 iter = peers.begin(); iter != peers.end(); ++iter) {
1442 if (*iter == _server_id) {
1443 continue;
1444 }
1445 brpc::ChannelOptions options;
1446 options.connection_type = brpc::CONNECTION_TYPE_SINGLE;
1447 options.max_retry = 0;
1448 brpc::Channel channel;
1449 if (0 != channel.Init(iter->addr, &options)) {
1452 continue;
1453 }
1454 //向每个节点请求投票
1455 OnRequestVoteRPCDone* done = new OnRequestVoteRPCDone(*iter, _current_term, this);
1456 done->cntl.set_timeout_ms(_options.election_timeout_ms);
1457 done->request.set_group_id(_group_id);
1458 done->request.set_server_id(_server_id.to_string());
1459 done->request.set_peer_id(iter->to_string());
1460 done->request.set_term(_current_term);
1461 done->request.set_last_log_index(last_log_id.index);
1462 done->request.set_last_log_term(last_log_id.term);
1463
1464 RaftService_Stub stub(&channel);
1465 stub.request_vote(&done->cntl, &done->request, &done->response, done);
1466 }
1468 //TODO: outof lock
1469 _meta_storage->set_term_and_votedfor(_current_term, _server_id);
1470 _vote_ctx.grant(_server_id);
1471 if (_vote_ctx.granted()) {
1472 become_leader();
1473 }
1474 }
处理投票请求:
61 void RaftServiceImpl::request_vote(google::protobuf::RpcController* cntl_base,
62 const RequestVoteRequest* request,
63 RequestVoteResponse* response,
64 google::protobuf::Closure* done) {
65 brpc::ClosureGuard done_guard(done);
66 brpc::Controller* cntl =
67 static_cast<brpc::Controller*>(cntl_base);
68
69 PeerId peer_id;
70 if (0 != peer_id.parse(request->peer_id())) {
71 cntl->SetFailed(EINVAL, "peer_id invalid");
72 return;
73 }
74
75 scoped_refptr<NodeImpl> node_ptr = NodeManager::GetInstance()->get(request->group_id(),
76 peer_id);
77 NodeImpl* node = node_ptr.get();
78 if (!node) {
79 cntl->SetFailed(ENOENT, "peer_id not exist");
80 return;
81 }
82
83 int rc = node->handle_request_vote_request(request, response);
84 if (rc != 0) {
85 cntl->SetFailed(rc, "%s", berror(rc));
86 return;
87 }
88 }
1803 int NodeImpl::handle_request_vote_request(const RequestVoteRequest* request,
1804 RequestVoteResponse* response) {
1805 std::unique_lock<raft_mutex_t> lck(_mutex);
1806
1807 if (!is_active_state(_state)) {
1810 lck.unlock();
1814 return EINVAL;
1815 }
1816
1817 PeerId candidate_id;
1818 if (0 != candidate_id.parse(request->server_id())) {
1822 return EINVAL;
1823 }
1825 do {
1826 // check term
1827 if (request->term() >= _current_term) {
1832 // incress current term, change state to follower
1833 if (request->term() > _current_term) {
1834 butil::Status status;
1835 status.set_error(EHIGHERTERMREQUEST, "Raft node receives higher term "
1836 "request_vote_request.");
1837 step_down(request->term(), false, status);
1838 }
1839 } else {
1840 // ignore older term
1845 break;
1846 }
1847
1848 // get last_log_id outof node mutex
1849 lck.unlock();
1850 LogId last_log_id = _log_manager->last_log_id(true);
1851 lck.lock();
1852 // vote need ABA check after unlock&lock
1853 if (request->term() != _current_term) {
1856 break;
1857 }
1859 bool log_is_ok = (LogId(request->last_log_index(), request->last_log_term())
1860 >= last_log_id);
1861 // save
1862 if (log_is_ok && _voted_id.is_empty()) {
1863 butil::Status status;
1864 status.set_error(EVOTEFORCANDIDATE, "Raft node votes for some candidate, "
1865 "step down to restart election_timer.");
1866 step_down(request->term(), false, status);
1867 _voted_id = candidate_id;
1868 //TODO: outof lock
1869 _meta_storage->set_votedfor(candidate_id);
1870 }
1871 } while (0);
1872
1873 response->set_term(_current_term);
1874 response->set_granted(request->term() == _current_term && _voted_id == candidate_id);
1875 return 0;
1876 }
节点收到投票响应后:
1592 void NodeImpl::become_leader() {
1593 CHECK(_state == STATE_CANDIDATE);
1600 // cancel candidate vote timer
1601 _vote_timer.stop();
1602
1603 _state = STATE_LEADER;
1604 _leader_id = _server_id;
1605
1606 _replicator_group.reset_term(_current_term);
1607
1608 std::set<PeerId> peers;
1609 _conf.list_peers(&peers);
1610 for (std::set<PeerId>::const_iterator
1611 iter = peers.begin(); iter != peers.end(); ++iter) {
1612 if (*iter == _server_id) {
1613 continue;
1614 }
1619 //TODO: check return code
1620 _replicator_group.add_replicator(*iter);
1621 }
1623 // init commit manager
1624 _ballot_box->reset_pending_index(_log_manager->last_log_index() + 1);
1625
1626 // Register _conf_ctx to reject configuration changing before the first log
1627 // is committed.
1628 CHECK(!_conf_ctx.is_busy());
1629 _conf_ctx.flush(_conf.conf, _conf.old_conf);
1630 _stepdown_timer.start();
1631 }
引用Raft的PreVote实现机制“
Raft作者博士论文《CONSENSUS: BRIDGING THEORY AND PRACTICE》的第9.6节 "Preventing disruptions when a server rejoins the cluster"提到了PreVote算法的大概实现思路。
在PreVote算法中,Candidate首先要确认自己能赢得集群中大多数节点的投票,这样才会把自己的term增加,然后发起真正的投票。其他投票节点同意发起选举的条件是(同时满足下面两个条件):
- 没有收到有效领导的心跳,至少有一次选举超时。
- Candidate的日志足够新(Term更大,或者Term相同raft index更大)。
PreVote算法解决了网络分区节点在重新加入时,会中断集群的问题。在PreVote算法中,网络分区节点由于无法获得大部分节点的许可,因此无法增加其Term。然后当它重新加入集群时,它仍然无法递增其Term,因为其他服务器将一直收到来自Leader节点的定期心跳信息。一旦该服务器从领导者接收到心跳,它将返回到Follower状态,Term和Leader一致。”
大部分还是和论文及其他raft的实现差不多,但braft中并没有实现上面引用中的第一个条件,自己加上也比较简单。里面还有很多值得学习的设计,后面继续分析下snapshot的实现。
braft中的代码写的比较复杂,没仔细研究过。
网友评论