美文网首页
百度文件系统bfs源码分析系列(一)

百度文件系统bfs源码分析系列(一)

作者: fooboo | 来源:发表于2019-02-09 20:22 被阅读16次

这一篇会分析写文件的整个工作流程,因为在分析源码时,单独分析下src/metaserver/下的几个文件有些疑问,在metaserver_main.cc启动时,会注册rpc服务并处理相应的消息:

 45     // rpc_server
 46     sofa::pbrpc::RpcServerOptions options;
 47 
 48     sofa::pbrpc::RpcServer rpc_server(options);
 49 
 50     // Server
 51     baidu::bfs::metaserver::MetaServer* metaserver_service = new baidu::bfs::metaserver::MetaServerImpl();
 52 
 53     // Register
 54     if (!rpc_server.RegisterService(metaserver_service)) {
 55         return EXIT_FAILURE;
 56     }
 57 
 58     // Start
 59     std::vector<std::string> metaserver_nodes;
 60     baidu::common::SplitString(FLAGS_metaserver_nodes, ",", &metaserver_nodes);
 61     std::string server_addr = metaserver_nodes[FLAGS_node_index];
 62     std::string listen_addr = std::string("0.0.0.0") + server_addr.substr(server_addr.rfind(':'));
 63     if (!rpc_server.Start(listen_addr)) {
 64         return EXIT_FAILURE;
 65     }
 68     rpc_server.Run();

然后在metaserver_impl.h文件中有些函数定义中使用的一些成员变量并没有在相应的类声明中定义,通过metaserver.proto转换后的pb.cc文件中:

 54 service MetaServer {
 55     rpc AddBlock(AddBlockRequest) returns(AddBlockResponse);
 56     rpc SyncBlock(SyncBlockRequest) returns(SyncBlockResponse);
 57     rpc FinishBlock(FinishBlockRequest) returns(FinishBlockResponse);
 58     rpc RemoveBlock(RemoveBlockRequest) returns(RemoveBlockResponse);
 59 }
1175 class MetaServer : public ::google::protobuf::Service {
1176  protected:
1177   // This class should be treated as an abstract interface.
1178   inline MetaServer() {};
1179  public:
1180   virtual ~MetaServer();

1186   virtual void AddBlock(::google::protobuf::RpcController* controller,
1187                        const ::baidu::bfs::metaserver::AddBlockRequest* request,
1188                        ::baidu::bfs::metaserver::AddBlockResponse* response,
1189                        ::google::protobuf::Closure* done);
1190   virtual void SyncBlock(::google::protobuf::RpcController* controller,
1191                        const ::baidu::bfs::metaserver::SyncBlockRequest* request,
1192                        ::baidu::bfs::metaserver::SyncBlockResponse* response,
1193                        ::google::protobuf::Closure* done);
1194   //more code...
 42 class MetaServerImpl : public MetaServer {
 43 public:
 44     MetaServerImpl();
 45     virtual ~MetaServerImpl();
 79 private:
 80     //more code...
 90 private:
 91     ThreadPool* read_thread_pool_;
 92     ThreadPool* work_thread_pool_;
 93     ThreadPool* report_thread_pool_;
 94     ThreadPool* heartbeat_thread_pool_;
 95     ChunkServerManager* chunkserver_manager_;
 96     BlockMappingManager* block_mapping_manager_;
 97 
 98     volatile bool readonly_;
 99     volatile int recover_timeout_;
100     RecoverMode recover_mode_;
101     int64_t start_time_;
102 };

但在类似实现virtual rpc method的时候,有些问题:

 88 void MetaServerImpl::HeartBeat(::google::protobuf::RpcController* controller,
 89                          const HeartBeatRequest* request,
 90                          HeartBeatResponse* response,
 91                          ::google::protobuf::Closure* done) {
 92     if (!is_leader_) { //这里有is_leader???
 93         response->set_status(kIsFollower);
 94         done->Run();
 95         return;
 96     }
 97     g_heart_beat.Inc();
 99     int64_t version = request->namespace_version();
100     if (version == namespace_->Version()) {
101         chunkserver_manager_->HandleHeartBeat(request, response);
102     } else {
103         response->set_status(kVersionError);
104     }
105     response->set_namespace_version(namespace_->Version());
106     done->Run();
107 }

类似的这几个函数同nameserver impl类中相同,但在后者的类声明中有定义相应的数据成员,而后者有is_leader_之类的是因为raft,而metaserver没有这个功能,不知道这里是不是拷贝那边的代码?没有用到的功能我这边不会分析,比如上面说的。

下面正式分析写流程,因为macbook上没什么办公用品,一些图就不画了。

写流程(上)

在bfs_client.cc中,put命令是把本地文件写到bfs中,如usage中那般put <localfile> <bfsfile> : copy file from local to bfs;部分代码如下:

211 int BfsPut(baidu::bfs::FS* fs, int argc, char* argv[]) {
212     //check argv...
247     baidu::bfs::File* file;
248     if (fs->OpenFile(target.c_str(), O_WRONLY | O_TRUNC, st.st_mode, &file, baidu::bfs::WriteOptions()) != 0) {
249         //do error logic...
252     }

253     char buf[10240];
254     int64_t len = 0;
255     int32_t bytes = 0;
256     while ( (bytes = fread(buf, 1, sizeof(buf), fp)) > 0) {
257         int32_t write_bytes = file->Write(buf, bytes);
258         if (write_bytes < bytes) {
260             ret = 2;
261             break;
262         }
263         len += bytes;
264     }
265     fclose(fp);
266     if (file->Close() != 0) {
269     }
270     //other code...
273 }

以上功能对输入参数作些检查,然后OpenFile,以同样的st_mode值,如果文件存在则截断为零,然后返回File对象,File实现逻辑复杂,后续分析;之后开始file->Write;最后file->Close。从这里分析出,貌似不支持断点续传???这个过程还是比较简单的。

这里的写有两种方式,分别是kWriteChainskWriteFanout,具体点“链式写有可能存在慢节点,一旦链中某个结点速度较慢,则会拖慢整个链的写入速度,现在也支持扇出写,同时写四个副本,只要有三个成功就返回成功,可以去规避慢节点。”(bfs_qa),具体怎么实现后续在代码中分析。

322 int32_t FSImpl::OpenFile(const char* path, int32_t flags, int32_t mode,
323                          File** file, const WriteOptions& options) {
324     *file = NULL;
328     WriteOptions write_option = options;
329     //set options.write_mode
341     CreateFileRequest request;
342     CreateFileResponse response;
343     //set request params
348     bool rpc_ret = nameserver_client_->SendRequest(&NameServer_Stub::CreateFile,
349         &request, &response, 15, 1);
350     if (!rpc_ret || response.status() != kOK) {
353         if (!rpc_ret) {
354             return TIMEOUT;
355         } else {
356             return GetErrorCode(response.status());
357         }
358     } else {
359         *file = new FileImplWrapper(this, rpc_client_, path, flags, write_option);
360     }
361     return OK;
362 }

///proto
 40 message CreateFileRequest {
 41     optional int64 sequence_id = 1;////TODO 0
 42     optional string file_name = 2;
 43     optional int32 mode = 3;
 44     optional int32 flags = 4;
 45     optional int32 replica_num = 5;
 46     optional string user = 7;
 47 }
 48 
 49 message CreateFileResponse {
 50     optional int64 sequence_id = 1;
 51     optional StatusCode status = 2;
 52 }

以上功能是设置写的模式,然后设置请求参数,调用SendRequest请求哪个service的哪个rpc,带上request和response,超时时间和重试次数;返回成功后会返回个File对象,关于protobuf rpc这块后期有时间分析下原理。

CreateFileRequest请求到达nameserver服务上时:

 369 void NameServerImpl::CreateFile(::google::protobuf::RpcController* controller,
 370                                 const CreateFileRequest* request,
 371                                 CreateFileResponse* response,
 372                                 ::google::protobuf::Closure* done) {
 373     if (!is_leader_) {
 374         response->set_status(kIsFollower);
 375         done->Run();
 376         return;
 377     }
 378     g_create_file.Inc();
 379     response->set_sequence_id(request->sequence_id());
 380     std::string path = NameSpace::NormalizePath(request->file_name());
 381     int flags = request->flags();
 382     int mode = request->mode();
 383     if (mode == 0) {    
 384         mode = 0644;    // default mode
 385     }
 386     int replica_num = request->replica_num();///副本数量
 387     NameServerLog log;
 388     std::vector<int64_t> blocks_to_remove;
 389     FileLockGuard file_lock(new WriteLock(path));//TODO 1
 390     StatusCode status = namespace_->CreateFile(path, flags, mode, replica_num, &blocks_to_remov     e, &log);
 391     //more code...
 406 }

以上如果不是leader的话则返回给client并重定向到leader服务上;然后设置参数,加锁(实现后面分析)进行串行化;然后调用namespace_->CreateFile,后者是一些元数据,用leveldb存储:

292 StatusCode NameSpace::CreateFile(const std::string& file_name, int flags, int mode, int replica_    num,
293                                  std::vector<int64_t>* blocks_to_remove, NameServerLog* log) {
294     if (file_name == "/") {
295         return kBadParameter;
296     }
297     FileInfo file_info;
298     std::string fname, info_value;
299     StatusCode status = BuildPath(file_name, &file_info, &fname, log);
300     //more code...
338 }

以上会根据file_name进行split,由BuildPath处理;BuildPath主要做的事情是:对file_name处理每一级目录,对每一层目录名所在的深度进行大端编码EncodingStoreKey(parent_id, name, &key_str),并在leveldb中查找是否存在,除了最后一个文件名:

254 StatusCode NameSpace::BuildPath(const std::string& path, FileInfo* file_info, std::string* fname    ,
255                                 NameServerLog* log) {
256     std::vector<std::string> paths;
257     if (!common::util::SplitPath(path, &paths)) {
259         return kBadParameter;
260     }
262     /// Find parent directory, create if not exist.
263     int64_t parent_id = kRootEntryid;
264     int depth = paths.size();
265     /// if parent is root,  set "file_info"
266     file_info->set_entry_id(kRootEntryid);
267     std::string info_value;
268     for (int i = 0; i < depth - 1; ++i) {
269         if (!LookUp(parent_id, paths[i], file_info)) {
270             file_info->set_type((1 << 9) | 01755);//drwxr-xr-x
271             file_info->set_ctime(time(NULL));
272             file_info->set_entry_id(common::atomic_add64(&last_entry_id_, 1) + 1);
273             file_info->SerializeToString(&info_value);
274             std::string key_str;
275             EncodingStoreKey(parent_id, paths[i], &key_str);
276             leveldb::Status s = db_->Put(leveldb::WriteOptions(), key_str, info_value);//元数据序列化到leveldb
277             assert(s.ok());
278             EncodeLog(log, kSyncWrite, key_str, info_value); //记录日志   
280         } else {
281             if (GetFileType(file_info->type()) != kDir) {//判断是否为目录结构
283                 return kBadParameter;
284             }   
285         }       
286         parent_id = file_info->entry_id();
287     }   
288     *fname = paths[depth - 1]; //文件名
289     return kOK;
290 }

194 bool NameSpace::LookUp(int64_t parent_id, const std::string& name, FileInfo* info) {
195     std::string key_str;
196     EncodingStoreKey(parent_id, name, &key_str);
197     if (!GetFromStore(key_str, info)) {
199         return false;
200     }
202     return true;
203 }

135 bool NameSpace::GetFromStore(const std::string& key, FileInfo* info) {
136     std::string value;
137     leveldb::Status s = db_->Get(leveldb::ReadOptions(), key, &value);
138     if (!s.ok()) {
141         return false;
142     }
143     if (!info->ParseFromString(value)) {
145         return false;
146     }
147     return true;
148 }

其中last_entry_id_是在NameSpace::RebuildBlockMap过程中,从leveldb历史文件元数据中的最大值;然后对每层string一个个处理,如果不存在则编码并格式化,否则作些校验,并更新parent_id;上面的过程其实是对path进行编码;比如一开始leveldb为空的,然后path为/home/dirx/filex,则一共执行2次,分别是home为parent_id =1,entry_id=2,dirx为parent_id =2,entry_id=3,filex为文件名(目录)不处理[因为在client端时,会对source进行检查,必须是xxx或xxx/xxx而不能是xxx/xxx/,而target可以为诸如xxx/xxxx/或xxx/xxxx];至于为什么要这么编码,后续会分析。以上以1home为key,其他数据为value。

NameSpace::CreateFile中,经过NameSpace::BuildPath返回的file_info为文件名的上一级目录;然后LookUp(parent_id, fname, &file_info)判断文件(目录)是否存在;

292 StatusCode NameSpace::CreateFile(const std::string& file_name, int flags, int mode, int replica_    num,
293                                  std::vector<int64_t>* blocks_to_remove, NameServerLog* log) {
294     //more code...
299     StatusCode status = BuildPath(file_name, &file_info, &fname, log);
300     if (status != kOK) {
301         return status;
302     }
303     int64_t parent_id = file_info.entry_id();
304     bool exist = LookUp(parent_id, fname, &file_info);
305 
306     if (exist) {
307         if ((flags & O_TRUNC) == 0) {//对于存在的文件(目录)但不截断则返回错误
309             return kFileExists;
310         } else {
311             if (GetFileType(file_info.type()) == kDir) {
313                 return kFileExists;//文件名是目录则返回
314             }
315             for (int i = 0; i < file_info.blocks_size(); i++) {
316                 blocks_to_remove->push_back(file_info.blocks(i));//即将截断
317             }
318         }
319     }
320     //设置file_info数据并写leveldb

其中blocks_to_remove处理的是要截断的文件,需要删除block元数据;其中NameServerLog* log为key和value;
以上只是增加块数据和删除块数据,在leader服务上,接着:

 391     for (size_t i = 0; i < blocks_to_remove.size(); i++) {
 392         block_mapping_manager_->RemoveBlock(blocks_to_remove[i]);
 393     }

删除block的信息,至此,并未真正分配block,会在client写数据时才分配,会在下篇分析。

LogRemote做的事情有些复杂,会在后面分析。这里简单说一下,这里会判断是否是ha功能,如果不是则作为异步task等待工作线程执行;否则序列化NameServerLog并记录到raft中,后续会回调SyncLogCallback

 408 bool NameServerImpl::LogRemote(const NameServerLog& log,
 409                                std::function<void (bool)> callback) {
 410     if (sync_ == NULL) {
 411         if (callback) {
 412             sync_callback_thread_pool_->AddTask(std::bind(callback, true));
 413         }
 414         return true;
 415     }
 416     std::string logstr;
 417     if (!log.SerializeToString(&logstr)) {
 419     }
 420     if (callback) {
 421         sync_->Log(logstr, callback);
 422         return true;
 423     } else {
 424         return sync_->Log(logstr, FLAGS_log_replicate_timeout * 1000);
 425     }
 426 }

 42 void RaftImpl::Log(const std::string& entry, std::function<void (bool)> callback) {
 43     raft_node_->AppendLog(entry, callback);
 44 }

450 void RaftNodeImpl::AppendLog(const std::string& log, std::function<void (bool)> callback) {
451     MutexLock lock(&mu_);
452     int64_t index = log_index_ + 1;
453     ///TODO: optimize lock
454     if (!StoreLog(current_term_, index, log)) {
455         thread_pool_->AddTask(std::bind(callback,false));
456         return;
457     }
458     callback_map_.insert(std::make_pair(index, callback));
459     log_index_ ++;
460     for (uint32_t i = 0; i < nodes_.size(); i++) {
461         if (follower_context_[i]) {
462             follower_context_[i]->condition.Signal();
463         }
464     }
465 }

在某个时候:

367                         while (last_applied_ < commit_index) {
368                             last_applied_ ++;
370                             std::map<int64_t, std::function<void (bool)> >::iterator cb_it =
371                                 callback_map_.find(last_applied_);
372                             if (cb_it != callback_map_.end()) {
373                                 std::function<void (bool)> callback = cb_it->second;
374                                 callback_map_.erase(cb_it);
375                                 mu_.Unlock();
377                                 callback(true);//执行回调
378                                 mu_.Lock();
379                             } else {
381                             }

TODO
某些具体的技术实现在上面分析时跳过了,比如为什么数据结构这么设计,锁的实现,还有raft的实现及log等后续补上,下一篇的分析会补上BlockMappingManager/FileLockManager这两个;还有一些异常处理也没仔细考虑。

相关文章

网友评论

      本文标题:百度文件系统bfs源码分析系列(一)

      本文链接:https://www.haomeiwen.com/subject/yoqosqtx.html