美文网首页我爱编程
Floyd&Raft的源码分析(三)

Floyd&Raft的源码分析(三)

作者: fooboo | 来源:发表于2018-04-15 14:04 被阅读63次

这篇是这个系列的最后一篇了,整个分析持续了两个月多,包括floyd和pink源码,差不多就每个周日会花几个小时分析一下,和查些资料,毕竟自己也是在学习过程中,可能会有些理解不正确的地方。
在学习raft的过程中,看了好些文章和别人的分析,理解起来似乎比较容易,但如果是自己从零实现一个可能要花些时间,况且不会灵活的运用到具体项目中,和结合现有的开源项目,比如leveldb和rocksdb等。
这篇结束后,准备分析下leveldb里的一些实现,可能网上已经有很多类型的文章,但看别人分析还不如自己深入源码,以前也做过类似的事情,比如redis和leveldb,但时间较久也没有沉淀下来,所以也陌生了。计划是后两个月分析下libco/pebble里的协程实现,后面大半年就分析单机版的kv存储如leveldb和消息中间件,比较倾向于rocketmq或消息队列phxqueue。

先列一下floyd框架,再分析下raft节点增加和减少的情况。
floyd是单进程多线程的框架项目,就直接上代码了,不画流程图和类图了。看了下example程序,在main中实例化一个floyd对象,如下:

368 Status Floyd::Open(const Options& options, Floyd** floyd) {
369   *floyd = NULL;
370   Status s;
371   FloydImpl *impl = new FloydImpl(options);
372   s = impl->Init();
373   if (s.ok()) {
374     *floyd = impl;
375   } else {
376     delete impl;
377   }
378   return s;
379 }

全部功能都放在Init中,部分代码如下:

274 Status FloydImpl::Init() {
275   slash::CreatePath(options_.path);
276   if (NewLogger(options_.path + "/LOG", &info_log_) != 0) {
277     return Status::Corruption("Open LOG failed, ", strerror(errno));
278   }
279 
280   // TODO(anan) set timeout and retry
281   worker_client_pool_ = new ClientPool(info_log_);
282 
283   // Create DB
284   rocksdb::Options options;
285   options.create_if_missing = true;
286   options.write_buffer_size = 1024 * 1024 * 1024;
287   options.max_background_flushes = 8;
288   rocksdb::Status s = rocksdb::DB::Open(options, options_.path + "/db/", &db_);
289   if (!s.ok()) {
291     return Status::Corruption("Open DB failed, " + s.ToString());
292   }
293 
294   s = rocksdb::DB::Open(options, options_.path + "/log/", &log_and_meta_);
295   if (!s.ok()) {  
297     return Status::Corruption("Open DB log_and_meta failed, " + s.ToString());
298   }
300   // Recover Context
301   raft_log_ = new RaftLog(log_and_meta_, info_log_);
302   raft_meta_ = new RaftMeta(log_and_meta_, info_log_);
303   raft_meta_->Init();
304   context_ = new FloydContext(options_);
305   context_->RecoverInit(raft_meta_);
306 
307   // Recover Members when exist
308   std::string mval;
309   Membership db_members;
310   s = db_->Get(rocksdb::ReadOptions(), kMemberConfigKey, &mval);
311   if (s.ok()
312       && db_members.ParseFromString(mval)) {
315     for (int i = 0; i < db_members.nodes_size(); i++) {
316       context_->members.insert(db_members.nodes(i));
317     }
318   } else {
319     BuildMembership(options_.members, &db_members);
320     if(!db_members.SerializeToString(&mval)) {
322       return Status::Corruption("Serialize Membership failed");
323     }
324     s = db_->Put(rocksdb::WriteOptions(), kMemberConfigKey, mval);
325     if (!s.ok()) {
327       return Status::Corruption("Record membership in db failed! error: " + s.ToString());
328     }
330     for (const auto& m : options_.members) {
331       context_->members.insert(m);
332     }
333   }
337   primary_ = new FloydPrimary(context_, &peers_, raft_meta_, options_, info_log_);
340   worker_ = new FloydWorker(options_.local_port, 1000, this);
341   int ret = 0;
342   if ((ret = worker_->Start()) != 0) {
344     return Status::Corruption("failed to start worker, return " + std::to_string(ret));
345   }
347   apply_ = new FloydApply(context_, db_, raft_meta_, raft_log_, this, info_log_);
348 
349   InitPeers();
354   if ((ret = primary_->Start()) != 0) {
356     return Status::Corruption("failed to start primary thread, return " + std::to_string(ret));
357   }
358   primary_->AddTask(kCheckLeader);
361   apply_->Start();
365   return Status::OK();
366 }

以上代码主要工作做了:创始日志,创建客户端对象池(里面会对每个server addr创建一条连接,源码在pink项目中)用于后续发送请求,创建两个db,用于state machine和log entry;从log中恢复状态数据比如term/voteip/voteport/commit_index/last_applied等;接着从db中或参数中恢复节点信息;创建FloydPrimary线程,此时还没启动,它的主要工作是心跳,定时检查leader,执行command,向其他节点发送vote rpc或append log entry rpc,体现在这三个函数中,由AddTask分发:

 62   static void LaunchHeartBeatWrapper(void *arg);
 63   void LaunchHeartBeat();
 64   static void LaunchCheckLeaderWrapper(void *arg);
 65   void LaunchCheckLeader();
 66   static void LaunchNewCommandWrapper(void *arg);
 67   void LaunchNewCommand();

接着创建FloydWorker线程并启动它,它的工作是处理请求,比如给节点线程发vote rpc/append log entry rpc任务,由DealMessage分发,然后根据request_type具体走不同的逻辑,调用FloydImpl类中的函数;
接着创建FloydApply线程,它的工作是执行command,即把log entry中的command apply到 state machine中,处理成员变更的情况;
接着根据节点个数,创建对应个数的节点线程并启动,主要工作是向对应server发送vote rpc/append log entry rpc请求和处理响应,维护状态等;
最后是启动FloydPrimary并立即发一个选举leader的任务(节点刚启动时都为follower),接着启动FloydApply;

以上线程,有些创建便启动,有些等其他线程启动完后再启动,这里有个顺序依赖,主要看谁驱动谁,比如FloydPrimary线程里会用到节点线程,就不能以相反的顺序start否则可能引起coredump;

大致整个框架差不多分析完了,pink中的相关源码没有在这里列出来,跳过了。

在成员变更的同时,需要保证安全必一,即“在任何时候,都不会出现双主。”
主要有两种方法:
One-Server变更:一阶段变更,要求每次成员组从G1变成G2时,G2相比G1加一个成员或者减一个成员。
Joint Consensus:支持任意的变更,即从成员组G1变成G2,不要求G1和G2有什么关联,比如可以完全没有交集。

第一种比较容易理解,实现起来也简单,floyd中也使用的第一种,这边大概说一下基本流程吧,至于为什么这种方法可行,可以参考下面链接的分析;

以增加成员为例,raft在收到增加server成员请求时,每次只增加一台server,后台程序的Leader执行流程如下:
-->AddServer-->BuildAddServerRequest-->DoCommand-->ExecuteCommand-->BuildLogEntry-->Append-->AddTask-->NoticePeerTask-->AddAppendEntriesTask

假设经过大多数的返回后,leader 把command apply到状态机中后,后续follower也推进apply id,把这条日志apply 状态机中去,此时流程如下:
-->Apply-->MembershipChange-->AddNewPeer

219 void FloydImpl::AddNewPeer(const std::string& server) {
220   if (IsSelf(server)) {
221     return;
222   } 
223   // Add Peer
224   auto peers_iter = peers_.find(server);
225   if (peers_iter == peers_.end()) {
226     LOGV(INFO_LEVEL, info_log_, "FloydImpl::ApplyAddMember server %s:%d add new peer thread %s",
227         options_.local_ip.c_str(), options_.local_port, server.c_str());
228     Peer* pt = new Peer(server, &peers_, context_, primary_, raft_meta_, raft_log_,
229         worker_client_pool_, apply_, options_, info_log_);
230     peers_.insert(std::pair<std::string, Peer*>(server, pt));
231     pt->Start();
232   }   
233 }

http://loopjump.com/raft_paper_note/
http://loopjump.com/raft_one_server_reconfiguration/

相关文章

  • Floyd&Raft的源码分析(三)

    这篇是这个系列的最后一篇了,整个分析持续了两个月多,包括floyd和pink源码,差不多就每个周日会花几个小时分析...

  • Floyd&Raft的源码分析(一)

    这篇博文是借助360开源的Floyd项目和百度开源的Braft项目来分析Raft的实现,至于Raft的理论(概念)...

  • Floyd&Raft的源码分析(二)

    这部分接着上一部分,主要分析日志的同步和安全性,以及成员变更等,在server成为leader后,会立即更新维护f...

  • etcd-raft 库源码阅读【WIP】

    Etcd 源码阅读 本文是 etcd-raft 库源码的阅读笔记。希望通过阅读 etcd-raft 库的源码,学习...

  • consul-rarf协议核心源码

    一、概要 这次将重点分析raft协议相关的核心源码 二、分析 上次看到api.go方法的最后会启动3个核心的线程。...

  • etcd-raft源码分析1-server启动

    以etcd源码中的一个kvstore的例子来分析基于raft算法的kvstore的实现,在etcd/contrib...

  • Consul源码分析——Raft实现

    背景 raft协议是维持整个consul生态中最重要的一个协议,它负责维护consul server之间的强一致性...

  • raft-example源码分析

    说明 raft-example是etcd中一个示例代码,阅读后对于理解etcd整个工作原理有很大帮助,这里面核心有...

  • Dledger 选主情况

    最近在看RocketMQ 的raft实现,名字叫Dledger。找了一篇源码分析的博客发现其中很多细节都解释的不是...

  • RocketMQ 的DLedger 选主机制

    最近在看RocketMQ 的raft实现,名字叫Dledger。找了一篇源码分析的博客发现其中很多细节都解释的不是...

网友评论

    本文标题:Floyd&Raft的源码分析(三)

    本文链接:https://www.haomeiwen.com/subject/flashftx.html