这篇是这个系列的最后一篇了,整个分析持续了两个月多,包括floyd和pink源码,差不多就每个周日会花几个小时分析一下,和查些资料,毕竟自己也是在学习过程中,可能会有些理解不正确的地方。
在学习raft的过程中,看了好些文章和别人的分析,理解起来似乎比较容易,但如果是自己从零实现一个可能要花些时间,况且不会灵活的运用到具体项目中,和结合现有的开源项目,比如leveldb和rocksdb等。
这篇结束后,准备分析下leveldb里的一些实现,可能网上已经有很多类型的文章,但看别人分析还不如自己深入源码,以前也做过类似的事情,比如redis和leveldb,但时间较久也没有沉淀下来,所以也陌生了。计划是后两个月分析下libco/pebble里的协程实现,后面大半年就分析单机版的kv存储如leveldb和消息中间件,比较倾向于rocketmq或消息队列phxqueue。
先列一下floyd框架,再分析下raft节点增加和减少的情况。
floyd是单进程多线程的框架项目,就直接上代码了,不画流程图和类图了。看了下example程序,在main中实例化一个floyd对象,如下:
368 Status Floyd::Open(const Options& options, Floyd** floyd) {
369 *floyd = NULL;
370 Status s;
371 FloydImpl *impl = new FloydImpl(options);
372 s = impl->Init();
373 if (s.ok()) {
374 *floyd = impl;
375 } else {
376 delete impl;
377 }
378 return s;
379 }
全部功能都放在Init中,部分代码如下:
274 Status FloydImpl::Init() {
275 slash::CreatePath(options_.path);
276 if (NewLogger(options_.path + "/LOG", &info_log_) != 0) {
277 return Status::Corruption("Open LOG failed, ", strerror(errno));
278 }
279
280 // TODO(anan) set timeout and retry
281 worker_client_pool_ = new ClientPool(info_log_);
282
283 // Create DB
284 rocksdb::Options options;
285 options.create_if_missing = true;
286 options.write_buffer_size = 1024 * 1024 * 1024;
287 options.max_background_flushes = 8;
288 rocksdb::Status s = rocksdb::DB::Open(options, options_.path + "/db/", &db_);
289 if (!s.ok()) {
291 return Status::Corruption("Open DB failed, " + s.ToString());
292 }
293
294 s = rocksdb::DB::Open(options, options_.path + "/log/", &log_and_meta_);
295 if (!s.ok()) {
297 return Status::Corruption("Open DB log_and_meta failed, " + s.ToString());
298 }
300 // Recover Context
301 raft_log_ = new RaftLog(log_and_meta_, info_log_);
302 raft_meta_ = new RaftMeta(log_and_meta_, info_log_);
303 raft_meta_->Init();
304 context_ = new FloydContext(options_);
305 context_->RecoverInit(raft_meta_);
306
307 // Recover Members when exist
308 std::string mval;
309 Membership db_members;
310 s = db_->Get(rocksdb::ReadOptions(), kMemberConfigKey, &mval);
311 if (s.ok()
312 && db_members.ParseFromString(mval)) {
315 for (int i = 0; i < db_members.nodes_size(); i++) {
316 context_->members.insert(db_members.nodes(i));
317 }
318 } else {
319 BuildMembership(options_.members, &db_members);
320 if(!db_members.SerializeToString(&mval)) {
322 return Status::Corruption("Serialize Membership failed");
323 }
324 s = db_->Put(rocksdb::WriteOptions(), kMemberConfigKey, mval);
325 if (!s.ok()) {
327 return Status::Corruption("Record membership in db failed! error: " + s.ToString());
328 }
330 for (const auto& m : options_.members) {
331 context_->members.insert(m);
332 }
333 }
337 primary_ = new FloydPrimary(context_, &peers_, raft_meta_, options_, info_log_);
340 worker_ = new FloydWorker(options_.local_port, 1000, this);
341 int ret = 0;
342 if ((ret = worker_->Start()) != 0) {
344 return Status::Corruption("failed to start worker, return " + std::to_string(ret));
345 }
347 apply_ = new FloydApply(context_, db_, raft_meta_, raft_log_, this, info_log_);
348
349 InitPeers();
354 if ((ret = primary_->Start()) != 0) {
356 return Status::Corruption("failed to start primary thread, return " + std::to_string(ret));
357 }
358 primary_->AddTask(kCheckLeader);
361 apply_->Start();
365 return Status::OK();
366 }
以上代码主要工作做了:创始日志,创建客户端对象池(里面会对每个server addr创建一条连接,源码在pink项目中)用于后续发送请求,创建两个db,用于state machine和log entry;从log中恢复状态数据比如term/voteip/voteport/commit_index/last_applied等;接着从db中或参数中恢复节点信息;创建FloydPrimary线程,此时还没启动,它的主要工作是心跳,定时检查leader,执行command,向其他节点发送vote rpc或append log entry rpc,体现在这三个函数中,由AddTask分发:
62 static void LaunchHeartBeatWrapper(void *arg);
63 void LaunchHeartBeat();
64 static void LaunchCheckLeaderWrapper(void *arg);
65 void LaunchCheckLeader();
66 static void LaunchNewCommandWrapper(void *arg);
67 void LaunchNewCommand();
接着创建FloydWorker线程并启动它,它的工作是处理请求,比如给节点线程发vote rpc/append log entry rpc任务,由DealMessage分发,然后根据request_type具体走不同的逻辑,调用FloydImpl类中的函数;
接着创建FloydApply线程,它的工作是执行command,即把log entry中的command apply到 state machine中,处理成员变更的情况;
接着根据节点个数,创建对应个数的节点线程并启动,主要工作是向对应server发送vote rpc/append log entry rpc请求和处理响应,维护状态等;
最后是启动FloydPrimary并立即发一个选举leader的任务(节点刚启动时都为follower),接着启动FloydApply;
以上线程,有些创建便启动,有些等其他线程启动完后再启动,这里有个顺序依赖,主要看谁驱动谁,比如FloydPrimary线程里会用到节点线程,就不能以相反的顺序start否则可能引起coredump;
大致整个框架差不多分析完了,pink中的相关源码没有在这里列出来,跳过了。
在成员变更的同时,需要保证安全必一,即“在任何时候,都不会出现双主。”
主要有两种方法:
One-Server变更:一阶段变更,要求每次成员组从G1变成G2时,G2相比G1加一个成员或者减一个成员。
Joint Consensus:支持任意的变更,即从成员组G1变成G2,不要求G1和G2有什么关联,比如可以完全没有交集。
第一种比较容易理解,实现起来也简单,floyd中也使用的第一种,这边大概说一下基本流程吧,至于为什么这种方法可行,可以参考下面链接的分析;
以增加成员为例,raft在收到增加server成员请求时,每次只增加一台server,后台程序的Leader执行流程如下:
-->AddServer-->BuildAddServerRequest-->DoCommand-->ExecuteCommand-->BuildLogEntry-->Append-->AddTask-->NoticePeerTask-->AddAppendEntriesTask
假设经过大多数的返回后,leader 把command apply到状态机中后,后续follower也推进apply id,把这条日志apply 状态机中去,此时流程如下:
-->Apply-->MembershipChange-->AddNewPeer
219 void FloydImpl::AddNewPeer(const std::string& server) {
220 if (IsSelf(server)) {
221 return;
222 }
223 // Add Peer
224 auto peers_iter = peers_.find(server);
225 if (peers_iter == peers_.end()) {
226 LOGV(INFO_LEVEL, info_log_, "FloydImpl::ApplyAddMember server %s:%d add new peer thread %s",
227 options_.local_ip.c_str(), options_.local_port, server.c_str());
228 Peer* pt = new Peer(server, &peers_, context_, primary_, raft_meta_, raft_log_,
229 worker_client_pool_, apply_, options_, info_log_);
230 peers_.insert(std::pair<std::string, Peer*>(server, pt));
231 pt->Start();
232 }
233 }
http://loopjump.com/raft_paper_note/
http://loopjump.com/raft_one_server_reconfiguration/
网友评论