这一篇会分析写文件的整个工作流程,因为在分析源码时,单独分析下src/metaserver/下的几个文件有些疑问,在metaserver_main.cc启动时,会注册rpc服务并处理相应的消息:
45 // rpc_server
46 sofa::pbrpc::RpcServerOptions options;
47
48 sofa::pbrpc::RpcServer rpc_server(options);
49
50 // Server
51 baidu::bfs::metaserver::MetaServer* metaserver_service = new baidu::bfs::metaserver::MetaServerImpl();
52
53 // Register
54 if (!rpc_server.RegisterService(metaserver_service)) {
55 return EXIT_FAILURE;
56 }
57
58 // Start
59 std::vector<std::string> metaserver_nodes;
60 baidu::common::SplitString(FLAGS_metaserver_nodes, ",", &metaserver_nodes);
61 std::string server_addr = metaserver_nodes[FLAGS_node_index];
62 std::string listen_addr = std::string("0.0.0.0") + server_addr.substr(server_addr.rfind(':'));
63 if (!rpc_server.Start(listen_addr)) {
64 return EXIT_FAILURE;
65 }
68 rpc_server.Run();
然后在metaserver_impl.h文件中有些函数定义中使用的一些成员变量并没有在相应的类声明中定义,通过metaserver.proto转换后的pb.cc文件中:
54 service MetaServer {
55 rpc AddBlock(AddBlockRequest) returns(AddBlockResponse);
56 rpc SyncBlock(SyncBlockRequest) returns(SyncBlockResponse);
57 rpc FinishBlock(FinishBlockRequest) returns(FinishBlockResponse);
58 rpc RemoveBlock(RemoveBlockRequest) returns(RemoveBlockResponse);
59 }
1175 class MetaServer : public ::google::protobuf::Service {
1176 protected:
1177 // This class should be treated as an abstract interface.
1178 inline MetaServer() {};
1179 public:
1180 virtual ~MetaServer();
1186 virtual void AddBlock(::google::protobuf::RpcController* controller,
1187 const ::baidu::bfs::metaserver::AddBlockRequest* request,
1188 ::baidu::bfs::metaserver::AddBlockResponse* response,
1189 ::google::protobuf::Closure* done);
1190 virtual void SyncBlock(::google::protobuf::RpcController* controller,
1191 const ::baidu::bfs::metaserver::SyncBlockRequest* request,
1192 ::baidu::bfs::metaserver::SyncBlockResponse* response,
1193 ::google::protobuf::Closure* done);
1194 //more code...
42 class MetaServerImpl : public MetaServer {
43 public:
44 MetaServerImpl();
45 virtual ~MetaServerImpl();
79 private:
80 //more code...
90 private:
91 ThreadPool* read_thread_pool_;
92 ThreadPool* work_thread_pool_;
93 ThreadPool* report_thread_pool_;
94 ThreadPool* heartbeat_thread_pool_;
95 ChunkServerManager* chunkserver_manager_;
96 BlockMappingManager* block_mapping_manager_;
97
98 volatile bool readonly_;
99 volatile int recover_timeout_;
100 RecoverMode recover_mode_;
101 int64_t start_time_;
102 };
但在类似实现virtual rpc method的时候,有些问题:
88 void MetaServerImpl::HeartBeat(::google::protobuf::RpcController* controller,
89 const HeartBeatRequest* request,
90 HeartBeatResponse* response,
91 ::google::protobuf::Closure* done) {
92 if (!is_leader_) { //这里有is_leader???
93 response->set_status(kIsFollower);
94 done->Run();
95 return;
96 }
97 g_heart_beat.Inc();
99 int64_t version = request->namespace_version();
100 if (version == namespace_->Version()) {
101 chunkserver_manager_->HandleHeartBeat(request, response);
102 } else {
103 response->set_status(kVersionError);
104 }
105 response->set_namespace_version(namespace_->Version());
106 done->Run();
107 }
类似的这几个函数同nameserver impl类中相同,但在后者的类声明中有定义相应的数据成员,而后者有is_leader_之类的是因为raft,而metaserver没有这个功能,不知道这里是不是拷贝那边的代码?没有用到的功能我这边不会分析,比如上面说的。
下面正式分析写流程,因为macbook上没什么办公用品,一些图就不画了。
写流程(上)
在bfs_client.cc中,put命令是把本地文件写到bfs中,如usage中那般put <localfile> <bfsfile> : copy file from local to bfs
;部分代码如下:
211 int BfsPut(baidu::bfs::FS* fs, int argc, char* argv[]) {
212 //check argv...
247 baidu::bfs::File* file;
248 if (fs->OpenFile(target.c_str(), O_WRONLY | O_TRUNC, st.st_mode, &file, baidu::bfs::WriteOptions()) != 0) {
249 //do error logic...
252 }
253 char buf[10240];
254 int64_t len = 0;
255 int32_t bytes = 0;
256 while ( (bytes = fread(buf, 1, sizeof(buf), fp)) > 0) {
257 int32_t write_bytes = file->Write(buf, bytes);
258 if (write_bytes < bytes) {
260 ret = 2;
261 break;
262 }
263 len += bytes;
264 }
265 fclose(fp);
266 if (file->Close() != 0) {
269 }
270 //other code...
273 }
以上功能对输入参数作些检查,然后OpenFile
,以同样的st_mode值,如果文件存在则截断为零,然后返回File
对象,File
实现逻辑复杂,后续分析;之后开始file->Write
;最后file->Close
。从这里分析出,貌似不支持断点续传???这个过程还是比较简单的。
这里的写有两种方式,分别是kWriteChains
和kWriteFanout
,具体点“链式写有可能存在慢节点,一旦链中某个结点速度较慢,则会拖慢整个链的写入速度,现在也支持扇出写,同时写四个副本,只要有三个成功就返回成功,可以去规避慢节点。”(bfs_qa),具体怎么实现后续在代码中分析。
322 int32_t FSImpl::OpenFile(const char* path, int32_t flags, int32_t mode,
323 File** file, const WriteOptions& options) {
324 *file = NULL;
328 WriteOptions write_option = options;
329 //set options.write_mode
341 CreateFileRequest request;
342 CreateFileResponse response;
343 //set request params
348 bool rpc_ret = nameserver_client_->SendRequest(&NameServer_Stub::CreateFile,
349 &request, &response, 15, 1);
350 if (!rpc_ret || response.status() != kOK) {
353 if (!rpc_ret) {
354 return TIMEOUT;
355 } else {
356 return GetErrorCode(response.status());
357 }
358 } else {
359 *file = new FileImplWrapper(this, rpc_client_, path, flags, write_option);
360 }
361 return OK;
362 }
///proto
40 message CreateFileRequest {
41 optional int64 sequence_id = 1;////TODO 0
42 optional string file_name = 2;
43 optional int32 mode = 3;
44 optional int32 flags = 4;
45 optional int32 replica_num = 5;
46 optional string user = 7;
47 }
48
49 message CreateFileResponse {
50 optional int64 sequence_id = 1;
51 optional StatusCode status = 2;
52 }
以上功能是设置写的模式,然后设置请求参数,调用SendRequest请求哪个service的哪个rpc,带上request和response,超时时间和重试次数;返回成功后会返回个File
对象,关于protobuf rpc这块后期有时间分析下原理。
当CreateFileRequest
请求到达nameserver
服务上时:
369 void NameServerImpl::CreateFile(::google::protobuf::RpcController* controller,
370 const CreateFileRequest* request,
371 CreateFileResponse* response,
372 ::google::protobuf::Closure* done) {
373 if (!is_leader_) {
374 response->set_status(kIsFollower);
375 done->Run();
376 return;
377 }
378 g_create_file.Inc();
379 response->set_sequence_id(request->sequence_id());
380 std::string path = NameSpace::NormalizePath(request->file_name());
381 int flags = request->flags();
382 int mode = request->mode();
383 if (mode == 0) {
384 mode = 0644; // default mode
385 }
386 int replica_num = request->replica_num();///副本数量
387 NameServerLog log;
388 std::vector<int64_t> blocks_to_remove;
389 FileLockGuard file_lock(new WriteLock(path));//TODO 1
390 StatusCode status = namespace_->CreateFile(path, flags, mode, replica_num, &blocks_to_remov e, &log);
391 //more code...
406 }
以上如果不是leader的话则返回给client并重定向到leader服务上;然后设置参数,加锁(实现后面分析)进行串行化;然后调用namespace_->CreateFile
,后者是一些元数据,用leveldb存储:
292 StatusCode NameSpace::CreateFile(const std::string& file_name, int flags, int mode, int replica_ num,
293 std::vector<int64_t>* blocks_to_remove, NameServerLog* log) {
294 if (file_name == "/") {
295 return kBadParameter;
296 }
297 FileInfo file_info;
298 std::string fname, info_value;
299 StatusCode status = BuildPath(file_name, &file_info, &fname, log);
300 //more code...
338 }
以上会根据file_name
进行split,由BuildPath
处理;BuildPath
主要做的事情是:对file_name
处理每一级目录,对每一层目录名所在的深度进行大端编码EncodingStoreKey(parent_id, name, &key_str)
,并在leveldb中查找是否存在,除了最后一个文件名:
254 StatusCode NameSpace::BuildPath(const std::string& path, FileInfo* file_info, std::string* fname ,
255 NameServerLog* log) {
256 std::vector<std::string> paths;
257 if (!common::util::SplitPath(path, &paths)) {
259 return kBadParameter;
260 }
262 /// Find parent directory, create if not exist.
263 int64_t parent_id = kRootEntryid;
264 int depth = paths.size();
265 /// if parent is root, set "file_info"
266 file_info->set_entry_id(kRootEntryid);
267 std::string info_value;
268 for (int i = 0; i < depth - 1; ++i) {
269 if (!LookUp(parent_id, paths[i], file_info)) {
270 file_info->set_type((1 << 9) | 01755);//drwxr-xr-x
271 file_info->set_ctime(time(NULL));
272 file_info->set_entry_id(common::atomic_add64(&last_entry_id_, 1) + 1);
273 file_info->SerializeToString(&info_value);
274 std::string key_str;
275 EncodingStoreKey(parent_id, paths[i], &key_str);
276 leveldb::Status s = db_->Put(leveldb::WriteOptions(), key_str, info_value);//元数据序列化到leveldb
277 assert(s.ok());
278 EncodeLog(log, kSyncWrite, key_str, info_value); //记录日志
280 } else {
281 if (GetFileType(file_info->type()) != kDir) {//判断是否为目录结构
283 return kBadParameter;
284 }
285 }
286 parent_id = file_info->entry_id();
287 }
288 *fname = paths[depth - 1]; //文件名
289 return kOK;
290 }
194 bool NameSpace::LookUp(int64_t parent_id, const std::string& name, FileInfo* info) {
195 std::string key_str;
196 EncodingStoreKey(parent_id, name, &key_str);
197 if (!GetFromStore(key_str, info)) {
199 return false;
200 }
202 return true;
203 }
135 bool NameSpace::GetFromStore(const std::string& key, FileInfo* info) {
136 std::string value;
137 leveldb::Status s = db_->Get(leveldb::ReadOptions(), key, &value);
138 if (!s.ok()) {
141 return false;
142 }
143 if (!info->ParseFromString(value)) {
145 return false;
146 }
147 return true;
148 }
其中last_entry_id_
是在NameSpace::RebuildBlockMap
过程中,从leveldb历史文件元数据中的最大值;然后对每层string一个个处理,如果不存在则编码并格式化,否则作些校验,并更新parent_id
;上面的过程其实是对path进行编码;比如一开始leveldb为空的,然后path为/home/dirx/filex
,则一共执行2次,分别是home为parent_id =1,entry_id=2,dirx为parent_id =2,entry_id=3,filex为文件名(目录)不处理[因为在client端时,会对source进行检查,必须是xxx或xxx/xxx而不能是xxx/xxx/,而target可以为诸如xxx/xxxx/或xxx/xxxx];至于为什么要这么编码,后续会分析。以上以1home为key,其他数据为value。
在NameSpace::CreateFile
中,经过NameSpace::BuildPath
返回的file_info
为文件名的上一级目录;然后LookUp(parent_id, fname, &file_info)
判断文件(目录)是否存在;
292 StatusCode NameSpace::CreateFile(const std::string& file_name, int flags, int mode, int replica_ num,
293 std::vector<int64_t>* blocks_to_remove, NameServerLog* log) {
294 //more code...
299 StatusCode status = BuildPath(file_name, &file_info, &fname, log);
300 if (status != kOK) {
301 return status;
302 }
303 int64_t parent_id = file_info.entry_id();
304 bool exist = LookUp(parent_id, fname, &file_info);
305
306 if (exist) {
307 if ((flags & O_TRUNC) == 0) {//对于存在的文件(目录)但不截断则返回错误
309 return kFileExists;
310 } else {
311 if (GetFileType(file_info.type()) == kDir) {
313 return kFileExists;//文件名是目录则返回
314 }
315 for (int i = 0; i < file_info.blocks_size(); i++) {
316 blocks_to_remove->push_back(file_info.blocks(i));//即将截断
317 }
318 }
319 }
320 //设置file_info数据并写leveldb
其中blocks_to_remove
处理的是要截断的文件,需要删除block元数据;其中NameServerLog* log
为key和value;
以上只是增加块数据和删除块数据,在leader服务上,接着:
391 for (size_t i = 0; i < blocks_to_remove.size(); i++) {
392 block_mapping_manager_->RemoveBlock(blocks_to_remove[i]);
393 }
删除block的信息,至此,并未真正分配block,会在client写数据时才分配,会在下篇分析。
LogRemote
做的事情有些复杂,会在后面分析。这里简单说一下,这里会判断是否是ha功能,如果不是则作为异步task等待工作线程执行;否则序列化NameServerLog
并记录到raft中,后续会回调SyncLogCallback
:
408 bool NameServerImpl::LogRemote(const NameServerLog& log,
409 std::function<void (bool)> callback) {
410 if (sync_ == NULL) {
411 if (callback) {
412 sync_callback_thread_pool_->AddTask(std::bind(callback, true));
413 }
414 return true;
415 }
416 std::string logstr;
417 if (!log.SerializeToString(&logstr)) {
419 }
420 if (callback) {
421 sync_->Log(logstr, callback);
422 return true;
423 } else {
424 return sync_->Log(logstr, FLAGS_log_replicate_timeout * 1000);
425 }
426 }
42 void RaftImpl::Log(const std::string& entry, std::function<void (bool)> callback) {
43 raft_node_->AppendLog(entry, callback);
44 }
450 void RaftNodeImpl::AppendLog(const std::string& log, std::function<void (bool)> callback) {
451 MutexLock lock(&mu_);
452 int64_t index = log_index_ + 1;
453 ///TODO: optimize lock
454 if (!StoreLog(current_term_, index, log)) {
455 thread_pool_->AddTask(std::bind(callback,false));
456 return;
457 }
458 callback_map_.insert(std::make_pair(index, callback));
459 log_index_ ++;
460 for (uint32_t i = 0; i < nodes_.size(); i++) {
461 if (follower_context_[i]) {
462 follower_context_[i]->condition.Signal();
463 }
464 }
465 }
在某个时候:
367 while (last_applied_ < commit_index) {
368 last_applied_ ++;
370 std::map<int64_t, std::function<void (bool)> >::iterator cb_it =
371 callback_map_.find(last_applied_);
372 if (cb_it != callback_map_.end()) {
373 std::function<void (bool)> callback = cb_it->second;
374 callback_map_.erase(cb_it);
375 mu_.Unlock();
377 callback(true);//执行回调
378 mu_.Lock();
379 } else {
381 }
TODO
某些具体的技术实现在上面分析时跳过了,比如为什么数据结构这么设计,锁的实现,还有raft的实现及log等后续补上,下一篇的分析会补上BlockMappingManager/FileLockManager这两个;还有一些异常处理也没仔细考虑。
网友评论