美文网首页
关于RAFT后续的一些思考(一)

关于RAFT后续的一些思考(一)

作者: fooboo | 来源:发表于2019-02-11 23:53 被阅读12次

    这段时间也在总结之前分析过的源码和记录的文章内容,为了加深理解,需要结合具体需求和业务,去思考更多可能。

    之前分析过raft的基本实现Floyd&Raft的源码分析和对应的部分论文资料,大部分是按照论文来实现的,有些其他方面的并没有在代码中实现。后来因为想在过年假期把braft和bfs分析,看到github上关于raft的资料介绍braft RAFT介绍,发现这上面说的是之前没有考虑到的一些情况,所以这里再结合之前的源码总结下,这里还是要参考下tikv的pingcap/raft-rs

    源码中,某个处于follower状态的node随机超时后从follower->candidate并对自己的term加1,并向其他节点请求投票,此时会带上自己的term,ip/port(可选),last_log_term和last_log_index;其他node收到后进行term比较,如果request_term > my_term就把自己变成follower,并对齐到request node的term,也不管自己是处于什么状态:

    714 int FloydImpl::ReplyRequestVote(const CmdRequest& request, CmdResponse* response) {
    715   slash::MutexLock l(&context_->global_mu);
    716   bool granted = false;
    717   CmdRequest_RequestVote request_vote = request.request_vote();
    718   /*
    719    * If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (5.1)
    720    */
    721   if (request_vote.term() > context_->current_term) {
    722     context_->BecomeFollower(request_vote.term());
    723     raft_meta_->SetCurrentTerm(context_->current_term);
    724   } 
    725   // if caller's term smaller than my term, then I will notice him
    726   if (request_vote.term() < context_->current_term) {
    727     BuildRequestVoteResponse(context_->current_term, granted, response);
    728     return -1;
    729   }
    

    bfs中的raft功能实现也是如此:

    231 void RaftNodeImpl::Vote(::google::protobuf::RpcController* controller,
    232                     const ::baidu::bfs::VoteRequest* request,
    233                     ::baidu::bfs::VoteResponse* response,
    234                     ::google::protobuf::Closure* done) {
    235     int64_t term = request->term();
    236     const std::string& candidate = request->candidate();
    237     int64_t last_log_index = request->last_log_index();
    238     int64_t last_log_term = request->last_log_term();
    239     LOG(INFO, "Recv vote request: %s %ld %ld / (%s %ld %ld)",
    240         candidate.c_str(), term, last_log_term,
    241         voted_for_.c_str(), current_term_, log_term_);
    242     MutexLock lock(&mu_);
    243     CheckTerm(term);
    244     if (term >= current_term_
    245         && (voted_for_ == "" || voted_for_ == candidate)
    246         && (last_log_term > log_term_ ||
    247         (last_log_term == log_term_ && last_log_index >= log_index_))) {
    248         voted_for_ = candidate;
    249         if (!StoreContext("current_term", current_term_) || !StoreContext("voted_for", voted_for_)) {
    250             LOG(FATAL, "Store term & vote_for fail %s %ld", voted_for_.c_str(), current_term_);
    251         } else {
    252             LOG(INFO, "Granted %s %ld %ld", candidate.c_str(), term, last_log_index);
    253         }
    254         response->set_vote_granted(true);
    255         response->set_term(term);
    256         done->Run();
    257         return;
    258     }
    259 
    260     response->set_vote_granted(false);
    261     response->set_term(current_term_);
    262     done->Run();
    263 }
    

    引用braft RAFT介绍中的“原始的RAFT论文中对非对称的网络划分处理不好,比如S1、S2、S3分别位于三个IDC,其中S1和S2之间网络不通,其他之间可以联通。这样一旦S1或者是S2抢到了Leader,另外一方在超时之后就会触发选主,例如S1为Leader,S2不断超时触发选主,S3提升Term打断当前Lease,从而拒绝Leader的更新。”和“原始的RAFT论文中对于对称网络划分的处理是,一个节点再次上线之后,Leader接收到高于currentTerm的RequestVote请求就进行StepDown。这样即使这个节点已经通过RemovePeer删除了,依然会打断当前的Lease,导致复制组不可用。”

    Asymmetric network partitioning
    原引用中也给出相关的解决方案:
    1. 对于属于PeerSet中的节点,Leader会在重试的AppendEntries中因为遇到更高的term而StepDown;
    2. 对于不属于PeerSet中的节点,Leader永远忽略;

    StepDown
    RAFT原始协议中Leader收到任何term高于currentTerm的请求都会进行StepDown,在实际开发中应该在以下几个时刻进行StepDown:

    1. Leader接收到AppendEntries的失败应答,Term比currentTerm大;
    2. Leader在ElectionTimeout内没有写多数成功,通过logic clock检查实现(1个ElectionTimeout内会有10个HeartBeat);
    3. Leader在进行RemovePeer的LogEntry被Commit的时候,不在节点列表中,进行StepDown,通常还会进行Shutdown;

    这里分析braft中的代码实现:
    某节点进行pre_vote,重点是1377行代码:

    1372         OnPreVoteRPCDone* done = new OnPreVoteRPCDone(*iter, _current_term, this);
    1373         done->cntl.set_timeout_ms(_options.election_timeout_ms);
    1374         done->request.set_group_id(_group_id);
    1375         done->request.set_server_id(_server_id.to_string());
    1376         done->request.set_peer_id(iter->to_string());
    1377         done->request.set_term(_current_term + 1); // next term
    1378         done->request.set_last_log_index(last_log_id.index);
    1379         done->request.set_last_log_term(last_log_id.term);
    

    收到节点的``pre_vote```:

     31 void RaftServiceImpl::pre_vote(google::protobuf::RpcController* cntl_base,
     32                           const RequestVoteRequest* request,
     33                           RequestVoteResponse* response,
     34                           google::protobuf::Closure* done) {
     35     brpc::ClosureGuard done_guard(done);
     36     brpc::Controller* cntl =
     37         static_cast<brpc::Controller*>(cntl_base);
     38     
     39     PeerId peer_id;
     40     if (0 != peer_id.parse(request->peer_id())) {
     41         cntl->SetFailed(EINVAL, "peer_id invalid");
     42         return;
     43     }
     44 
     45     scoped_refptr<NodeImpl> node_ptr = NodeManager::GetInstance()->get(request->group_id(),
     46                                                                        peer_id);
     47     NodeImpl* node = node_ptr.get(); //判断节点是否存在
     48     if (!node) {
     49         cntl->SetFailed(ENOENT, "peer_id not exist");
     50         return;
     51     }
     52         
     53     // TODO: should return butil::Status
     54     int rc = node->handle_pre_vote_request(request, response);
     55     if (rc != 0) {        
     56         cntl->SetFailed(rc, "%s", berror(rc));
     57         return;
     58     }   
     59 }
    
    291 inline bool is_active_state(State s) {
    292     // This should be as fast as possible
    293     return s < STATE_ERROR;
    294 }
    
    267 enum State { //节点的几种状态
    269     STATE_LEADER = 1,
    270     STATE_TRANSFERRING = 2,
    271     STATE_CANDIDATE = 3,
    272     STATE_FOLLOWER = 4,
    273     STATE_ERROR = 5,
    274     STATE_UNINITIALIZED = 6,
    275     STATE_SHUTTING = 7,
    276     STATE_SHUTDOWN = 8,
    277     STATE_END,
    278 }; 
    
    1748 int NodeImpl::handle_pre_vote_request(const RequestVoteRequest* request,
    1749                                       RequestVoteResponse* response) {
    1750     std::unique_lock<raft_mutex_t> lck(_mutex);
    1751     
    1752     if (!is_active_state(_state)) {//判断该节点的状态
    1755         lck.unlock();
    1759         return EINVAL;
    1760     }
    1761 
    1762     PeerId candidate_id;
    1763     if (0 != candidate_id.parse(request->server_id())) {//解析失败
    1767         return EINVAL;
    1768     }
    1770     bool granted = false;
    1771     do {
    1772         if (request->term() < _current_term) {//请求的term比自己小
    1773             // ignore older term
    1778             break;
    1779         }
    1780 
    1781         // get last_log_id outof node mutex
    1782         lck.unlock();
    1783         LogId last_log_id = _log_manager->last_log_id(true);
    1784         lck.lock();
    1785         // pre_vote not need ABA check after unlock&lock
    1786 
    1787         granted = (LogId(request->last_log_index(), request->last_log_term())
    1788                         >= last_log_id); //比较last_log_index和last_log_term,可以参考之前的floyd分析
    1796     } while (0);
    1798     response->set_term(_current_term);
    1799     response->set_granted(granted);
    1800     return 0;
    1801 }
    

    在真正给自己投票前,先进行pre_vote,当发起pre_vote节点收到请求后以决定是进行step_down还是继续发起请求:

    1296 struct OnPreVoteRPCDone : public google::protobuf::Closure {
    1297     OnPreVoteRPCDone(const PeerId& peer_id_, const int64_t term_, NodeImpl* node_)
    1298         : peer(peer_id_), term(term_), node(node_) {
    1299             node->AddRef();
    1300     }
    1301     virtual ~OnPreVoteRPCDone() {
    1302         node->Release();
    1303     }
    1304 
    1305     void Run() {
    1306         do {
    1307             if (cntl.ErrorCode() != 0) {
    1310                 break;
    1311             }
    1312             node->handle_pre_vote_response(peer, term, response);
    1313         } while (0);
    1314         delete this;
    1315     }
    1316 
    1317     PeerId peer;
    1318     int64_t term;
    1319     RequestVoteRequest request;
    1320     RequestVoteResponse response;
    1321     brpc::Controller cntl;
    1322     NodeImpl* node;
    1323 };
    
    1254 void NodeImpl::handle_pre_vote_response(const PeerId& peer_id, const int64_t term,
    1255                                             const RequestVoteResponse& response) {
    1256     std::unique_lock<raft_mutex_t> lck(_mutex);
    1257     
    1258     // check state
    1259     if (_state != STATE_FOLLOWER) {
    1263         return;
    1264     }
    1265     // check stale response
    1266     if (term != _current_term) {
    1270         return;
    1271     }
    1272     // check response term
    1273     if (response.term() > _current_term) {
    1277         butil::Status status;
    1278         status.set_error(EHIGHERTERMRESPONSE, "Raft node receives higher term "
    1279                 "pre_vote_response.");
    1280         step_down(response.term(), false, status);
    1281         return;
    1282     }
    1287     // check granted quorum?
    1288     if (response.granted()) {
    1289         _pre_vote_ctx.grant(peer_id);
    1290         if (_pre_vote_ctx.granted()) {
    1291             elect_self(&lck);
    1292         }
    1293     }
    1294 }
    
    1392 void NodeImpl::elect_self(std::unique_lock<raft_mutex_t>* lck) {
    1396     if (!_conf.contains(_server_id)) {
    1399         return;
    1400     }
    1401     // cancel follower election timer
    1402     if (_state == STATE_FOLLOWER) {
    1405         _election_timer.stop();
    1406     }
    
    1415     _state = STATE_CANDIDATE;
    1416     _current_term++;
    1417     _voted_id = _server_id;
    
    1437     std::set<PeerId> peers;
    1438     _conf.list_peers(&peers);
    1439 
    1440     for (std::set<PeerId>::const_iterator
    1441         iter = peers.begin(); iter != peers.end(); ++iter) {
    1442         if (*iter == _server_id) {
    1443             continue;
    1444         }
    1445         brpc::ChannelOptions options;
    1446         options.connection_type = brpc::CONNECTION_TYPE_SINGLE;
    1447         options.max_retry = 0;
    1448         brpc::Channel channel;
    1449         if (0 != channel.Init(iter->addr, &options)) {
    1452             continue;
    1453         }
    1454         //向每个节点请求投票
    1455         OnRequestVoteRPCDone* done = new OnRequestVoteRPCDone(*iter, _current_term, this);
    1456         done->cntl.set_timeout_ms(_options.election_timeout_ms);
    1457         done->request.set_group_id(_group_id);
    1458         done->request.set_server_id(_server_id.to_string());
    1459         done->request.set_peer_id(iter->to_string());
    1460         done->request.set_term(_current_term);
    1461         done->request.set_last_log_index(last_log_id.index);
    1462         done->request.set_last_log_term(last_log_id.term);
    1463 
    1464         RaftService_Stub stub(&channel);
    1465         stub.request_vote(&done->cntl, &done->request, &done->response, done);
    1466     }
    
    1468     //TODO: outof lock
    1469     _meta_storage->set_term_and_votedfor(_current_term, _server_id);
    1470     _vote_ctx.grant(_server_id);
    1471     if (_vote_ctx.granted()) {
    1472         become_leader();
    1473     }
    1474 }
    

    处理投票请求:

     61 void RaftServiceImpl::request_vote(google::protobuf::RpcController* cntl_base,
     62                           const RequestVoteRequest* request,
     63                           RequestVoteResponse* response,
     64                           google::protobuf::Closure* done) {
     65     brpc::ClosureGuard done_guard(done);
     66     brpc::Controller* cntl =
     67         static_cast<brpc::Controller*>(cntl_base);
     68 
     69     PeerId peer_id;
     70     if (0 != peer_id.parse(request->peer_id())) {
     71         cntl->SetFailed(EINVAL, "peer_id invalid");
     72         return;
     73     }
     74 
     75     scoped_refptr<NodeImpl> node_ptr = NodeManager::GetInstance()->get(request->group_id(),
     76                                                                        peer_id);
     77     NodeImpl* node = node_ptr.get();
     78     if (!node) {
     79         cntl->SetFailed(ENOENT, "peer_id not exist");
     80         return;
     81     }
     82 
     83     int rc = node->handle_request_vote_request(request, response);
     84     if (rc != 0) {
     85         cntl->SetFailed(rc, "%s", berror(rc));
     86         return;
     87     }
     88 }
    
    1803 int NodeImpl::handle_request_vote_request(const RequestVoteRequest* request,
    1804                                           RequestVoteResponse* response) {
    1805     std::unique_lock<raft_mutex_t> lck(_mutex);
    1806 
    1807     if (!is_active_state(_state)) {
    1810         lck.unlock();
    1814         return EINVAL;
    1815     }
    1816 
    1817     PeerId candidate_id;
    1818     if (0 != candidate_id.parse(request->server_id())) {
    1822         return EINVAL;
    1823     }
    1825     do {
    1826         // check term
    1827         if (request->term() >= _current_term) {
    1832             // incress current term, change state to follower
    1833             if (request->term() > _current_term) {
    1834                 butil::Status status;
    1835                 status.set_error(EHIGHERTERMREQUEST, "Raft node receives higher term "
    1836                         "request_vote_request.");
    1837                 step_down(request->term(), false, status);
    1838             }
    1839         } else {
    1840             // ignore older term
    1845             break;
    1846         }
    1847 
    1848         // get last_log_id outof node mutex
    1849         lck.unlock();
    1850         LogId last_log_id = _log_manager->last_log_id(true);
    1851         lck.lock();
    1852         // vote need ABA check after unlock&lock
    1853         if (request->term() != _current_term) {
    1856             break;
    1857         }
    1859         bool log_is_ok = (LogId(request->last_log_index(), request->last_log_term())
    1860                           >= last_log_id);
    1861         // save
    1862         if (log_is_ok && _voted_id.is_empty()) {
    1863             butil::Status status;
    1864             status.set_error(EVOTEFORCANDIDATE, "Raft node votes for some candidate, "
    1865                     "step down to restart election_timer.");
    1866             step_down(request->term(), false, status);
    1867             _voted_id = candidate_id;
    1868             //TODO: outof lock
    1869             _meta_storage->set_votedfor(candidate_id);
    1870         }
    1871     } while (0);
    1872 
    1873     response->set_term(_current_term);
    1874     response->set_granted(request->term() == _current_term && _voted_id == candidate_id);
    1875     return 0;
    1876 }
    

    节点收到投票响应后:

    1592 void NodeImpl::become_leader() {
    1593     CHECK(_state == STATE_CANDIDATE);
    1600     // cancel candidate vote timer
    1601     _vote_timer.stop();
    1602 
    1603     _state = STATE_LEADER;
    1604     _leader_id = _server_id;
    1605 
    1606     _replicator_group.reset_term(_current_term);
    1607 
    1608     std::set<PeerId> peers;
    1609     _conf.list_peers(&peers);
    1610     for (std::set<PeerId>::const_iterator
    1611             iter = peers.begin(); iter != peers.end(); ++iter) {
    1612         if (*iter == _server_id) {
    1613             continue;
    1614         }
    1619         //TODO: check return code
    1620         _replicator_group.add_replicator(*iter);
    1621     }
    1623     // init commit manager
    1624     _ballot_box->reset_pending_index(_log_manager->last_log_index() + 1);
    1625 
    1626     // Register _conf_ctx to reject configuration changing before the first log
    1627     // is committed.
    1628     CHECK(!_conf_ctx.is_busy());
    1629     _conf_ctx.flush(_conf.conf, _conf.old_conf);
    1630     _stepdown_timer.start();
    1631 }
    

    引用Raft的PreVote实现机制
    Raft作者博士论文《CONSENSUS: BRIDGING THEORY AND PRACTICE》的第9.6节 "Preventing disruptions when a server rejoins the cluster"提到了PreVote算法的大概实现思路。
    在PreVote算法中,Candidate首先要确认自己能赢得集群中大多数节点的投票,这样才会把自己的term增加,然后发起真正的投票。其他投票节点同意发起选举的条件是(同时满足下面两个条件):

    1. 没有收到有效领导的心跳,至少有一次选举超时。
    2. Candidate的日志足够新(Term更大,或者Term相同raft index更大)。

    PreVote算法解决了网络分区节点在重新加入时,会中断集群的问题。在PreVote算法中,网络分区节点由于无法获得大部分节点的许可,因此无法增加其Term。然后当它重新加入集群时,它仍然无法递增其Term,因为其他服务器将一直收到来自Leader节点的定期心跳信息。一旦该服务器从领导者接收到心跳,它将返回到Follower状态,Term和Leader一致。”

    大部分还是和论文及其他raft的实现差不多,但braft中并没有实现上面引用中的第一个条件,自己加上也比较简单。里面还有很多值得学习的设计,后面继续分析下snapshot的实现。

    braft中的代码写的比较复杂,没仔细研究过。

    相关文章

      网友评论

          本文标题:关于RAFT后续的一些思考(一)

          本文链接:https://www.haomeiwen.com/subject/nlcrsqtx.html