45 // rpc_server
46 sofa::pbrpc::RpcServerOptions options;
48 sofa::pbrpc::RpcServer rpc_server(options);
50 // Server
51 baidu::bfs::metaserver::MetaServer* metaserver_service = new baidu::bfs::metaserver::MetaServerImpl();
53 // Register
54 if (!rpc_server.RegisterService(metaserver_service)) {
55 return EXIT_FAILURE;
56 }
58 // Start
59 std::vector<std::string> metaserver_nodes;
60 baidu::common::SplitString(FLAGS_metaserver_nodes, ",", &metaserver_nodes);
61 std::string server_addr = metaserver_nodes[FLAGS_node_index];
62 std::string listen_addr = std::string("") + server_addr.substr(server_addr.rfind(':'));
63 if (!rpc_server.Start(listen_addr)) {
64 return EXIT_FAILURE;
65 }
68 rpc_server.Run();
54 service MetaServer {
55 rpc AddBlock(AddBlockRequest) returns(AddBlockResponse);
56 rpc SyncBlock(SyncBlockRequest) returns(SyncBlockResponse);
57 rpc FinishBlock(FinishBlockRequest) returns(FinishBlockResponse);
58 rpc RemoveBlock(RemoveBlockRequest) returns(RemoveBlockResponse);
59 }
1175 class MetaServer : public ::google::protobuf::Service {
1176 protected:
1177 // This class should be treated as an abstract interface.
1178 inline MetaServer() {};
1179 public:
1180 virtual ~MetaServer();
1186 virtual void AddBlock(::google::protobuf::RpcController* controller,
1187 const ::baidu::bfs::metaserver::AddBlockRequest* request,
1188 ::baidu::bfs::metaserver::AddBlockResponse* response,
1189 ::google::protobuf::Closure* done);
1190 virtual void SyncBlock(::google::protobuf::RpcController* controller,
1191 const ::baidu::bfs::metaserver::SyncBlockRequest* request,
1192 ::baidu::bfs::metaserver::SyncBlockResponse* response,
1193 ::google::protobuf::Closure* done);
1194 //more code...
42 class MetaServerImpl : public MetaServer {
43 public:
44 MetaServerImpl();
45 virtual ~MetaServerImpl();
79 private:
80 //more code...
90 private:
91 ThreadPool* read_thread_pool_;
92 ThreadPool* work_thread_pool_;
93 ThreadPool* report_thread_pool_;
94 ThreadPool* heartbeat_thread_pool_;
95 ChunkServerManager* chunkserver_manager_;
96 BlockMappingManager* block_mapping_manager_;
98 volatile bool readonly_;
99 volatile int recover_timeout_;
100 RecoverMode recover_mode_;
101 int64_t start_time_;
102 };
但在类似实现virtual rpc method的时候,有些问题:
88 void MetaServerImpl::HeartBeat(::google::protobuf::RpcController* controller,
89 const HeartBeatRequest* request,
90 HeartBeatResponse* response,
91 ::google::protobuf::Closure* done) {
92 if (!is_leader_) { //这里有is_leader???
93 response->set_status(kIsFollower);
94 done->Run();
95 return;
96 }
97 g_heart_beat.Inc();
99 int64_t version = request->namespace_version();
100 if (version == namespace_->Version()) {
101 chunkserver_manager_->HandleHeartBeat(request, response);
102 } else {
103 response->set_status(kVersionError);
104 }
105 response->set_namespace_version(namespace_->Version());
106 done->Run();
107 }
类似的这几个函数同nameserver impl类中相同,但在后者的类声明中有定义相应的数据成员,而后者有is_leader_之类的是因为raft,而metaserver没有这个功能,不知道这里是不是拷贝那边的代码?没有用到的功能我这边不会分析,比如上面说的。
在bfs_client.cc中,put命令是把本地文件写到bfs中,如usage中那般put <localfile> <bfsfile> : copy file from local to bfs
211 int BfsPut(baidu::bfs::FS* fs, int argc, char* argv[]) {
212 //check argv...
247 baidu::bfs::File* file;
248 if (fs->OpenFile(target.c_str(), O_WRONLY | O_TRUNC, st.st_mode, &file, baidu::bfs::WriteOptions()) != 0) {
249 //do error logic...
252 }
253 char buf[10240];
254 int64_t len = 0;
255 int32_t bytes = 0;
256 while ( (bytes = fread(buf, 1, sizeof(buf), fp)) > 0) {
257 int32_t write_bytes = file->Write(buf, bytes);
258 if (write_bytes < bytes) {
260 ret = 2;
261 break;
262 }
263 len += bytes;
264 }
265 fclose(fp);
266 if (file->Close() != 0) {
269 }
270 //other code...
273 }
322 int32_t FSImpl::OpenFile(const char* path, int32_t flags, int32_t mode,
323 File** file, const WriteOptions& options) {
324 *file = NULL;
328 WriteOptions write_option = options;
329 //set options.write_mode
341 CreateFileRequest request;
342 CreateFileResponse response;
343 //set request params
348 bool rpc_ret = nameserver_client_->SendRequest(&NameServer_Stub::CreateFile,
349 &request, &response, 15, 1);
350 if (!rpc_ret || response.status() != kOK) {
353 if (!rpc_ret) {
354 return TIMEOUT;
355 } else {
356 return GetErrorCode(response.status());
357 }
358 } else {
359 *file = new FileImplWrapper(this, rpc_client_, path, flags, write_option);
360 }
361 return OK;
362 }
40 message CreateFileRequest {
41 optional int64 sequence_id = 1;////TODO 0
42 optional string file_name = 2;
43 optional int32 mode = 3;
44 optional int32 flags = 4;
45 optional int32 replica_num = 5;
46 optional string user = 7;
47 }
49 message CreateFileResponse {
50 optional int64 sequence_id = 1;
51 optional StatusCode status = 2;
52 }
对象,关于protobuf rpc这块后期有时间分析下原理。
369 void NameServerImpl::CreateFile(::google::protobuf::RpcController* controller,
370 const CreateFileRequest* request,
371 CreateFileResponse* response,
372 ::google::protobuf::Closure* done) {
373 if (!is_leader_) {
374 response->set_status(kIsFollower);
375 done->Run();
376 return;
377 }
378 g_create_file.Inc();
379 response->set_sequence_id(request->sequence_id());
380 std::string path = NameSpace::NormalizePath(request->file_name());
381 int flags = request->flags();
382 int mode = request->mode();
383 if (mode == 0) {
384 mode = 0644; // default mode
385 }
386 int replica_num = request->replica_num();///副本数量
387 NameServerLog log;
388 std::vector<int64_t> blocks_to_remove;
389 FileLockGuard file_lock(new WriteLock(path));//TODO 1
390 StatusCode status = namespace_->CreateFile(path, flags, mode, replica_num, &blocks_to_remov e, &log);
391 //more code...
406 }
292 StatusCode NameSpace::CreateFile(const std::string& file_name, int flags, int mode, int replica_ num,
293 std::vector<int64_t>* blocks_to_remove, NameServerLog* log) {
294 if (file_name == "/") {
295 return kBadParameter;
296 }
297 FileInfo file_info;
298 std::string fname, info_value;
299 StatusCode status = BuildPath(file_name, &file_info, &fname, log);
300 //more code...
338 }
处理每一级目录,对每一层目录名所在的深度进行大端编码EncodingStoreKey(parent_id, name, &key_str)
254 StatusCode NameSpace::BuildPath(const std::string& path, FileInfo* file_info, std::string* fname ,
255 NameServerLog* log) {
256 std::vector<std::string> paths;
257 if (!common::util::SplitPath(path, &paths)) {
259 return kBadParameter;
260 }
262 /// Find parent directory, create if not exist.
263 int64_t parent_id = kRootEntryid;
264 int depth = paths.size();
265 /// if parent is root, set "file_info"
266 file_info->set_entry_id(kRootEntryid);
267 std::string info_value;
268 for (int i = 0; i < depth - 1; ++i) {
269 if (!LookUp(parent_id, paths[i], file_info)) {
270 file_info->set_type((1 << 9) | 01755);//drwxr-xr-x
271 file_info->set_ctime(time(NULL));
272 file_info->set_entry_id(common::atomic_add64(&last_entry_id_, 1) + 1);
273 file_info->SerializeToString(&info_value);
274 std::string key_str;
275 EncodingStoreKey(parent_id, paths[i], &key_str);
276 leveldb::Status s = db_->Put(leveldb::WriteOptions(), key_str, info_value);//元数据序列化到leveldb
277 assert(s.ok());
278 EncodeLog(log, kSyncWrite, key_str, info_value); //记录日志
280 } else {
281 if (GetFileType(file_info->type()) != kDir) {//判断是否为目录结构
283 return kBadParameter;
284 }
285 }
286 parent_id = file_info->entry_id();
287 }
288 *fname = paths[depth - 1]; //文件名
289 return kOK;
290 }
194 bool NameSpace::LookUp(int64_t parent_id, const std::string& name, FileInfo* info) {
195 std::string key_str;
196 EncodingStoreKey(parent_id, name, &key_str);
197 if (!GetFromStore(key_str, info)) {
199 return false;
200 }
202 return true;
203 }
135 bool NameSpace::GetFromStore(const std::string& key, FileInfo* info) {
136 std::string value;
137 leveldb::Status s = db_->Get(leveldb::ReadOptions(), key, &value);
138 if (!s.ok()) {
141 return false;
142 }
143 if (!info->ParseFromString(value)) {
145 return false;
146 }
147 return true;
148 }
,则一共执行2次,分别是home为parent_id =1,entry_id=2,dirx为parent_id =2,entry_id=3,filex为文件名(目录)不处理[因为在client端时,会对source进行检查,必须是xxx或xxx/xxx而不能是xxx/xxx/,而target可以为诸如xxx/xxxx/或xxx/xxxx];至于为什么要这么编码,后续会分析。以上以1home为key,其他数据为value。
为文件名的上一级目录;然后LookUp(parent_id, fname, &file_info)
292 StatusCode NameSpace::CreateFile(const std::string& file_name, int flags, int mode, int replica_ num,
293 std::vector<int64_t>* blocks_to_remove, NameServerLog* log) {
294 //more code...
299 StatusCode status = BuildPath(file_name, &file_info, &fname, log);
300 if (status != kOK) {
301 return status;
302 }
303 int64_t parent_id = file_info.entry_id();
304 bool exist = LookUp(parent_id, fname, &file_info);
306 if (exist) {
307 if ((flags & O_TRUNC) == 0) {//对于存在的文件(目录)但不截断则返回错误
309 return kFileExists;
310 } else {
311 if (GetFileType(file_info.type()) == kDir) {
313 return kFileExists;//文件名是目录则返回
314 }
315 for (int i = 0; i < file_info.blocks_size(); i++) {
316 blocks_to_remove->push_back(file_info.blocks(i));//即将截断
317 }
318 }
319 }
320 //设置file_info数据并写leveldb
处理的是要截断的文件,需要删除block元数据;其中NameServerLog* log
391 for (size_t i = 0; i < blocks_to_remove.size(); i++) {
392 block_mapping_manager_->RemoveBlock(blocks_to_remove[i]);
393 }
408 bool NameServerImpl::LogRemote(const NameServerLog& log,
409 std::function<void (bool)> callback) {
410 if (sync_ == NULL) {
411 if (callback) {
412 sync_callback_thread_pool_->AddTask(std::bind(callback, true));
413 }
414 return true;
415 }
416 std::string logstr;
417 if (!log.SerializeToString(&logstr)) {
419 }
420 if (callback) {
421 sync_->Log(logstr, callback);
422 return true;
423 } else {
424 return sync_->Log(logstr, FLAGS_log_replicate_timeout * 1000);
425 }
426 }
42 void RaftImpl::Log(const std::string& entry, std::function<void (bool)> callback) {
43 raft_node_->AppendLog(entry, callback);
44 }
450 void RaftNodeImpl::AppendLog(const std::string& log, std::function<void (bool)> callback) {
451 MutexLock lock(&mu_);
452 int64_t index = log_index_ + 1;
453 ///TODO: optimize lock
454 if (!StoreLog(current_term_, index, log)) {
455 thread_pool_->AddTask(std::bind(callback,false));
456 return;
457 }
458 callback_map_.insert(std::make_pair(index, callback));
459 log_index_ ++;
460 for (uint32_t i = 0; i < nodes_.size(); i++) {
461 if (follower_context_[i]) {
462 follower_context_[i]->condition.Signal();
463 }
464 }
465 }
367 while (last_applied_ < commit_index) {
368 last_applied_ ++;
370 std::map<int64_t, std::function<void (bool)> >::iterator cb_it =
371 callback_map_.find(last_applied_);
372 if (cb_it != callback_map_.end()) {
373 std::function<void (bool)> callback = cb_it->second;
374 callback_map_.erase(cb_it);
375 mu_.Unlock();
377 callback(true);//执行回调
378 mu_.Lock();
379 } else {
381 }