美文网首页
百度文件系统bfs源码分析系列(三)

百度文件系统bfs源码分析系列(三)

作者: fooboo | 来源:发表于2019-02-16 14:52 被阅读8次

    写流程(下)

    综合前两篇博文的分析,在正式开始写文件前,会先根据文件路径信息,在nameserver服务上会作些预处理,每一步都可能出现问题及不一致性,怎么解决会在后面统一回答。

    这里先总结下之前的,put一个文件时,所发生的事情,此时并没有把文件中的内容上传至bfs:

    1. 对文件绝对路径所在每一层目录进行block编码,写入leveldb,除了该文件;
    2. 检查文件是否存在;
    3. 可能会删除原有文件已存在的block元数据;(nameserver/chunkserver);
    4. 存该文件的block编码信息入leveldb;
    5. 可能要删除4)中的block元数据;
    6. 如果有ha功能,则保存log,后期通过raft同步log到follower后,会执行log_callback_,即1)和4),follower节点收到日志后会在某个时间点进行apply:
    602 void RaftNodeImpl::Init(std::function<void (const std::string& log)> callback,
    603                         std::function<void (int32_t, std::string*)> /*snapshot_callback*/) {
    604     log_callback_ = callback; //NameSpace::TailLog
    605     ApplyLog();
    606 }
    
    494 void RaftNodeImpl::ApplyLog() {
    495     MutexLock lock(&mu_);
    496     if (applying_ || !log_callback_) {
    497         return;
    498     }
    499     applying_ = true;
    500     for (int64_t i = last_applied_ + 1; i <= commit_index_; i++) {
    501         std::string log;
    502         StatusCode s = log_db_->Read(i, &log);
    503         if (s != kOK) {
    505         }
    506         LogEntry entry;
    507         bool ret = entry.ParseFromString(log);
    508         assert(ret);
    509         if (entry.type() == kUserLog) {
    510             mu_.Unlock();
    513                 log_callback_(entry.log_data());///NameSpace::TailLog
    514             mu_.Lock();
    515         }
    516         last_applied_ = entry.index();
    517     }
    519     StoreContext("last_applied", last_applied_);
    520     applying_ = false;
    521 }
    
    756 void NameSpace::TailLog(const std::string& logstr) {
    757     NameServerLog log;
    758     if(!log.ParseFromString(logstr)) {
    760     }
    761     for (int i = 0; i < log.entries_size(); i++) {
    762         const NsLogEntry& entry = log.entries(i);
    763         int type = entry.type();
    764         leveldb::Status s;
    765         if (type == kSyncWrite) {
    766             s = db_->Put(leveldb::WriteOptions(), entry.key(), entry.value());
    767         } else if (type == kSyncDelete) {
    768             s = db_->Delete(leveldb::WriteOptions(), entry.key());
    769         }
    770         if (!s.ok()) {
    772         }
    773     }
    774 }
    
    593     if (leader_commit > commit_index_) {//apply log
    594         commit_index_ = leader_commit;
    595     }
    596     if (commit_index_ > last_applied_) {
    597         thread_pool_->AddTask(std::bind(&RaftNodeImpl::ApplyLog, this));
    598     }
    

    以上说的block编码只是由parent_id和文件名(目录名)组成的key,由文件(目录)基本信息组成的value,且和chunkserver上的block不一样的,引用“可以通过Namespace中的内容以及ChunkServer的block report重建出来。每次NameServer重启时都会重建Blockmapping中的内容。耗时主要是顺序扫描leveldb的过程。”:

    328     EncodingStoreKey(parent_id, fname, &file_key);
    
    115 void NameSpace::EncodingStoreKey(int64_t entry_id,
    116                                  const std::string& path,
    117                                  std::string* key_str) {
    118     key_str->resize(8);
    119     common::util::EncodeBigEndian(&(*key_str)[0], (uint64_t)entry_id);
    120     key_str->append(path);
    121 }
    

    nameserver_client_->SendRequest返回成功后,会实例化FileImplWrapper类对象*file = new FileImplWrapper(this, rpc_client_, path, flags, write_option)

     62 class File {
     63 public:     
     64     File() {}
     65     virtual ~File() {}
     66     virtual int32_t Pread(char* buf, int32_t read_size, int64_t offset, bool reada = false) = 0;
     67     //for files opened with O_WRONLY, only support Seek(0, SEEK_CUR)
     68     virtual int64_t Seek(int64_t offset, int32_t whence) = 0;
     69     virtual int32_t Read(char* buf, int32_t read_size) = 0;
     70     virtual int32_t Write(const char* buf, int32_t write_size) = 0;
     71     virtual int32_t Flush() = 0;
     72     virtual int32_t Sync() = 0;
     73     virtual int32_t Close() = 0;
     74 private:
     75     // No copying allowed
     76     File(const File&);
     77     void operator=(const File&);
     78 };
    
     20 class FileImplWrapper : public File {
     21 public:
     22     FileImplWrapper(FSImpl* fs, RpcClient* rpc_client,
     23             const std::string& name, int32_t flags, const WriteOptions& options);
     26     FileImplWrapper(FileImpl* file_impl);
     27     virtual ~FileImplWrapper();
     28     //接口声明原型同File一样
     35 private:
     36     std::shared_ptr<FileImpl> impl_; 
     37     // No copying allowed
     38     FileImplWrapper(const FileImplWrapper&);
     39     void operator=(const FileImplWrapper&);
     40 };
    
     12 FileImplWrapper::FileImplWrapper(FSImpl* fs, RpcClient* rpc_client,
     13            const std::string& name, int32_t flags, const WriteOptions& options) :
     14            impl_(new FileImpl(fs, rpc_client, name, flags, options)) {}
    

    上面只是封装的操作文件的类,最后还是调用FileImpl的接口,FileImpl太复杂,这里只列举因为写bfs而进行打开,写,关闭操作来分析,后面分析因写本地文件而读bfs的时候再分析:

     69 class FileImpl : public File, public std::enable_shared_from_this<FileImpl> {
     70 public:
     71     FileImpl(FSImpl* fs, RpcClient* rpc_client, const std::string& name,
     72              int32_t flags, const WriteOptions& options);
     75     ~FileImpl ();
     79     int32_t Write(const char* buf, int32_t write_size);
     99     int32_t Flush();
    100     int32_t Sync();
    101     int32_t Close();
    150     std::map<std::string, ChunkServer_Stub*> chunkservers_; ///< located chunkservers
    126 private:
    

    以下是部分BfsPut的实现:

    253     char buf[10240];
    255     int32_t bytes = 0;
    256     while ( (bytes = fread(buf, 1, sizeof(buf), fp)) > 0) {//从源文件读
    257         int32_t write_bytes = file->Write(buf, bytes);//写file对象
    258         if (write_bytes < bytes) { ///判断是否写失败
    260             ret = 2;
    261             break;
    262         }
    264     }
    265     fclose(fp);//关闭文件
    266     if (file->Close() != 0) { //关闭file
    268         ret = 1;
    269     }
    

    下面开始真正的写file对象:

    388 int32_t FileImpl::Write(const char* buf, int32_t len) {
    391     //more code...
    402     if (open_flags_ & O_WRONLY) {
    403         // Add block
    404         MutexLock lock(&mu_, "Write AddBlock", 1000);
    405         if (chunkservers_.empty()) {
    406             int ret = 0;
    407             for (int i = 0; i < FLAGS_sdk_createblock_retry; i++) {
    408                 ret = AddBlock();       
    409                 if (ret == kOK) break;
    410                 sleep(10);              
    411             }
    412             if (ret != kOK) {           
    414                 common::atomic_dec(&back_writing_);
    415                 return ret;
    416             }
    417         }
    418     }
    419     //more code...
    

    chunkservers_类似与chunkserver服务的连接,具体把内容写到哪台服务上,一开始为空,后面会创建;这里的实现是一个文件一个block,并没有按照一定的大小切割成多个block,分别存在不同的chunkserver中;下面是请求创建block的协议和实现:

      9 message ChunkServerInfo {//
     10     optional int32 id = 1;
     11     optional string address = 2;
     13     optional string start_time = 17;
     14     optional int32 last_heartbeat = 3;
     15     optional int64 data_size = 4;
     17     optional int32 block_num = 7;
     18     optional bool is_dead = 8;
     19     optional ChunkServerStatus status = 9;
     24     //more
     38 }
    
     54 message LocatedBlock {
     55     optional int64 block_id = 1;
     56     optional int64 block_size = 2;
     57     repeated ChunkServerInfo chains = 3;
     58     optional int32 status = 4;
     59 }
    
    105 message AddBlockRequest {
    106     optional int64 sequence_id = 1;
    107     optional string file_name = 2;
    108     optional string client_address = 3;
    109 }
    
    110 message AddBlockResponse {
    111     optional int64 sequence_id = 1;
    112     optional StatusCode status = 2;
    113     optional LocatedBlock block = 3;
    114 }
    
    314 int32_t FileImpl::AddBlock() {
    315     AddBlockRequest request;
    316     AddBlockResponse response;
    317     request.set_sequence_id(0);
    318     request.set_file_name(name_);
    319     const std::string& local_host_name = fs_->local_host_name_;
    320     request.set_client_address(local_host_name);
    321     bool ret = fs_->nameserver_client_->SendRequest(&NameServer_Stub::AddBlock,
    322                                                     &request, &response, 15, 1);
    323     //假设addblock成功(后面分析)
    332     block_for_write_ = new LocatedBlock(response.block());
    333     bool chains_write = IsChainsWrite();
    334     int cs_size = chains_write ? 1 : block_for_write_->chains_size();
    335     for (int i = 0; i < cs_size; i++) {
    336         const std::string& addr = block_for_write_->chains(i).address();
    337         rpc_client_->GetStub(addr, &chunkservers_[addr]);
    338         write_windows_[addr] = new common::SlidingWindow<int>(100,
    339                                std::bind(&FileImpl::OnWriteCommit,
    340                                 std::placeholders::_1, std::placeholders::_2));
    341         cs_write_queue_[addr] = new WriteBufferQueue;
    342         cs_errors_[addr] = false;
    343         WriteBlockRequest create_request;
    344         int64_t seq = common::timer::get_micros();
    345         create_request.set_sequence_id(seq);
    346         create_request.set_block_id(block_for_write_->block_id());
    347         create_request.set_databuf("", 0);//第一次写
    348         create_request.set_offset(0);//第一次写
    349         create_request.set_is_last(false); //第一次写
    350         create_request.set_packet_seq(0);//第一次写
    351         WriteBlockResponse create_response;
    352         if (chains_write) {
    353             for (int i = 0; i < block_for_write_->chains_size(); i++) {
    354                 const std::string& cs_addr = block_for_write_->chains(i).address();
    355                 create_request.add_chunkservers(cs_addr);
    356             }
    357         }
    358         bool ret = rpc_client_->SendRequest(chunkservers_[addr],
    359                                             &ChunkServer_Stub::WriteBlock,
    360                                             &create_request, &create_response,
    361                                             25, 1);
    362         if (!ret || create_response.status() != 0) {
    365             //失败的处理
    382         }
    383         write_windows_[addr]->Add(0, 0);
    384     }
    385     last_seq_ = 0;
    386     return OK;
    387 }
    

    以上是AddBlock实现,大概是sdk请求nameserver服务创建block管理数据,这部分后面分析,姑且认为这里创建成功并返回给sdk;然后IsChainsWrite()判断是否是链式写还是扇区写;循环要写的备份数,分别与chunkserver通信(protobuf rpc),并建立类似窗口机制SlidingWindow,回调函数为OnWriteCommit,这里的实现后面分析,并为每个chunkserver维护写队列,其中这里的WriteBlockRequest对象为初始化数据,还没写:

    138     std::map<std::string, common::SlidingWindow<int>* > write_windows_;
    139     struct WriteBufferQueue {
    140         Mutex mu;
    141         std::queue<WriteBuffer*> buffers;
    142     };
    143     std::map<std::string, WriteBufferQueue*> cs_write_queue_;
    

    然后sdk带着nameserver分配的block_id,请求chunkserver写block,即WriteBlockRequest,这里会判断是否是链式写,是的话由chunkserver写完交给下一个chunkserver写,最后返回到sdk,否则异步写chunkserver,后面会进行成功和失败的处理,跳过;此时还未真正的写文件内容至bfs,目前做的是sdk跟nameserver通信,分配blockid等这些,然后sdk跟chunkserver通信,写block请求,这里可能有点模糊,这里具体解释下先。

    一)sdk与nameserver的通信,请求和响应协议为AddBlockRequest/AddBlockResponse,nameserver收到AddBlock请求后:

     449 void NameServerImpl::AddBlock(...) {
     453     if (!is_leader_) {
     454         //不是leader
     456         return;
     457     }
     458     response->set_sequence_id(request->sequence_id());
     459     if (readonly_) {//默认为true,在LeaveReadOnly为false
     460         //只能读不能写
     463         return;
     464     }
     466     std::string path = NameSpace::NormalizePath(request->file_name());
     467     FileLockGuard file_lock_guard(new WriteLock(path));
     468     FileInfo file_info;
     469     if (!namespace_->GetFileInfo(path, &file_info)) {
     470         //无法获取路径信息,由之前的createfile编码
     473         return;
     474     }
     476     if (file_info.blocks_size() > 0) {//移除之前的block信息,新文件一般不存在这样的情况
     477         std::map<int64_t, std::set<int32_t> > block_cs;
     478         block_mapping_manager_->RemoveBlocksForFile(file_info, &block_cs);
     479         for (std::map<int64_t, std::set<int32_t> >::iterator it = block_cs.begin();
     480                 it != block_cs.end(); ++it) {
     481             const std::set<int32_t>& cs = it->second;
     482             for (std::set<int32_t>::iterator cs_it = cs.begin(); cs_it != cs.end(); ++cs_it) {
     483                 chunkserver_manager_->RemoveBlock(*cs_it, it->first);
     484             }                  
     485         }
     486         file_info.clear_blocks();//这一步在createfile中没有
     487     }
    

    代码行476至487处理的事情是接着createfile处理的,在createfile中,如果文件存在但有截断选项会进行block_mapping_manager_->RemoveBlock(block_id),这里可以认为是删除block_id列表(动态构建出来的),并没有chunkserver_manager_->RemoveBlock,后面多进行了一次RemoveBlocksForFile(具体原因可能要参考后面代码),且真正的file_info.clear_blocks()

    然后根据副本数进行一定规则选出chunkserver服务地址ChunkServerManager::GetChunkServerChains,后续由sdk与之通信;

     489     int replica_num = file_info.replicas();
     490     /// check lease for write 
     491     std::vector<std::pair<int32_t, std::string> > chains;
     492     common::timer::TimeChecker add_block_timer;
     493     if (chunkserver_manager_->GetChunkServerChains(replica_num, &chains, request->client_addres     s())) {
     495         int64_t new_block_id = namespace_->GetNewBlockId(); 
     498         file_info.add_blocks(new_block_id);
     499         file_info.set_version(-1); 
     500         ///TODO: Lost update? Get&Update not atomic. 
     501         for (int i = 0; i < replica_num; i++) {
     502             file_info.add_cs_addrs(chunkserver_manager_->GetChunkServerAddr(chains[i].first));
     503         }  
     504         NameServerLog log;
     505         if (!namespace_->UpdateFileInfo(file_info, &log)) {
     507             response->set_status(kUpdateError);
     508         }
     509         LocatedBlock* block = response->mutable_block();
     510         std::vector<int32_t> replicas;
     511         for (int i = 0; i < replica_num; i++) {
     512             ChunkServerInfo* info = block->add_chains();
     513             int32_t cs_id = chains[i].first;
     514             info->set_address(chains[i].second);
     517             replicas.push_back(cs_id);
     518             // update cs -> block
     519             add_block_timer.Reset();
     520             chunkserver_manager_->AddBlock(cs_id, new_block_id);
     521             add_block_timer.Check(50 * 1000, "AddBlock");
     522         }
     523         block_mapping_manager_->AddBlock(new_block_id, replica_num, replicas);
     525         block->set_block_id(new_block_id);
     526         response->set_status(kOK);
     527         LogRemote(log, std::bind(&NameServerImpl::SyncLogCallback, this,
     528                                    controller, request, response, done,
     529                                    (std::vector<FileInfo>*)NULL,
     530                                    file_lock_guard, std::placeholders::_1));
     531     }//失败处理
    

    分配下一个new_block_id,设置file_info数据,根据副本数格式化LocatedBlock给sdk,并设置chunkserver_manager_->AddBlock(cs_id, new_block_id),即哪个chunkserver管理着哪些block:

     22 class Blocks {
     23 public:
     24     Blocks(int32_t cs_id) : report_id_(-1), cs_id_(cs_id) {}
     25     int64_t GetReportId();
     26     void Insert(int64_t block_id);
     27     void Remove(int64_t block_id);
     28     void CleanUp(std::set<int64_t>* blocks);
     29     void MoveNew();
     30     int64_t CheckLost(int64_t report_id, const std::set<int64_t>& blocks,
     31                       int64_t start, int64_t end, std::vector<int64_t>* lost);
     32 private:
     33     Mutex block_mu_;
     34     std::set<int64_t> blocks_; ////后面分析为啥会有new_blocks_
     35     Mutex new_blocks_mu_;
     36     std::set<int64_t> new_blocks_;
     37     int64_t report_id_;
     38     int32_t cs_id_;                 // for debug msg
     39 };
    
     62 void Blocks::MoveNew() {
     63     MutexLock blocks_lock(&block_mu_);
     64     MutexLock new_block_lock(&new_blocks_mu_);
     65     std::set<int64_t> tmp;
     66     blocks_.insert(new_blocks_.begin(), new_blocks_.end());
     67     std::swap(tmp, new_blocks_);
     68 }
    

    接着上面,然后block_mapping_manager_->AddBlock,接着raft做同步,逻辑同createfile中一样:

     86 void BlockMapping::AddBlock(int64_t block_id, int32_t replica,
     87                             const std::vector<int32_t>& init_replicas) {
     88     NSBlock* nsblock = NULL;
     89     nsblock = new NSBlock(block_id, replica, -1, 0);
     90     if (nsblock->recover_stat == kNotInRecover) {
     91         nsblock->replica.insert(init_replicas.begin(), init_replicas.end());
     92     } else {                        
     93         nsblock->incomplete_replica.insert(init_replicas.begin(), init_replicas.end());
     94     }
     95     //more code...
    

    二)sdk与chunkserver的通信,请求和响应协议为WriteBlockRequest/WriteBlockResponse,当chunkserver服务收到请求后:

    360 void ChunkServerImpl::WriteBlock(...) {
    364     int64_t block_id = request->block_id();
    365     const std::string& databuf = request->databuf();
    366     int64_t offset = request->offset();
    367     int32_t packet_seq = request->packet_seq();
    368 
    369     if (!response->has_sequence_id() &&
    370             g_unfinished_bytes.Add(databuf.size()) > FLAGS_chunkserver_max_unfinished_bytes) {
    371         response->set_sequence_id(request->sequence_id());
    372         //有太多的写没完成
    375         response->set_status(kCsTooMuchUnfinishedWrite);
    376         g_unfinished_bytes.Sub(databuf.size());
    377         done->Run();
    378         return;
    379     }
    380     if (!response->has_sequence_id()) {
    381         response->set_sequence_id(request->sequence_id());
    382         /// Flow control
    383         if (g_block_buffers.Get() > FLAGS_chunkserver_max_pending_buffers) {
    384             response->set_status(kCsTooMuchPendingBuffer);
    388             g_unfinished_bytes.Sub(databuf.size());
    389             done->Run();
    390             g_refuse_ops.Inc();
    391             return;
    392         }
    395         response->add_timestamp(common::timer::get_micros());
    396         std::function<void ()> task =
    397             std::bind(&ChunkServerImpl::WriteBlock, this, controller, request, response, done);
    398         work_thread_pool_->AddTask(task);
    399         return;
    400     }
    

    sdk过来的请求,response的sequence_id是没有值的,默认为false,所以最后都进381行代码,然后交给工作线程继续执行这个函数ChunkServerImpl::WriteBlock,后续respone就有sequence_id:

    1615 inline void WriteBlockResponse::set_has_sequence_id() {
    1616   _has_bits_[0] |= 0x00000002u;
    1617 }
    
    360 void ChunkServerImpl::WriteBlock(...) {
    361     //more code...
    406     int next_cs_offset = -1;
    407     for (int i = 0; i < request->chunkservers_size(); i++) {
    408         if (request->chunkservers(i) == data_server_addr_) {
    409             next_cs_offset = i + 1;
    410             break;
    411         }
    412     }
    413     if (next_cs_offset >= 0 && next_cs_offset < request->chunkservers_size()) {//链式写
    414         // share same write request
    415         const WriteBlockRequest* next_request = request;
    416         WriteBlockResponse* next_response = new WriteBlockResponse();
    417         ChunkServer_Stub* stub = NULL;
    418         const std::string& next_server = request->chunkservers(next_cs_offset); //下一个chunkserver的地址
    419         rpc_client_->GetStub(next_server, &stub);
    420         WriteNext(next_server, stub, next_request, next_response, request, response, done);
    421     } else {//扇区写
    422         std::function<void ()> callback =
    423             std::bind(&ChunkServerImpl::LocalWriteBlock, this, request, response, done);
    424         work_thread_pool_->AddTask(callback);
    425     }
    426 }
    

    以上判断是链式写还是扇区写,即chunkservers个数大于1的时候是链式写;

    如果是链式写,那么会把写请求转发给下一个chunkserver,代码行413〜420,即WriteNext(next_server, stub, next_request, next_response, request, response, done)

    428 void ChunkServerImpl::WriteNext(...) {
    439     std::function<void (const WriteBlockRequest*, WriteBlockResponse*, bool, int)> callback =
    440         std::bind(&ChunkServerImpl::WriteNextCallback,
    441             this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
    442             std::placeholders::_4, next_server, std::make_pair(request, response), done, stub);
    443     rpc_client_->AsyncRequest(stub, &ChunkServer_Stub::WriteBlock,
    444         next_request, next_response, callback, 30, 3);
    445 }
    
     75     template <class Stub, class Request, class Response, class Callback>
     76     void AsyncRequest(Stub* stub, void(Stub::*func)(
     77                     google::protobuf::RpcController*,
     78                     const Request*, Response*, Callback*),
     79                     const Request* request, Response* response,
     80                     std::function<void (const Request*, Response*, bool, int)> callback,
     81                     int32_t rpc_timeout, int retry_times) {
     82         sofa::pbrpc::RpcController* controller = new sofa::pbrpc::RpcController();
     83         controller->SetTimeout(rpc_timeout * 1000L);
     84         google::protobuf::Closure* done =
     85             sofa::pbrpc::NewClosure(&RpcClient::template RpcCallback<Request, Response, Callback    >,
     86                                           controller, request, response, callback);//todo 
     87         (stub->*func)(controller, request, response, done);
     88     }
    

    即在下一个chunkserver下执行WriteBlock,回调为WriteNextCallback,下一个chunkserver执行完后会回调它,再判断是否要重试:

    447 void ChunkServerImpl::WriteNextCallback(const WriteBlockRequest* next_request,
    448                         WriteBlockResponse* next_response,
    449                         bool failed, int error,
    450                         const std::string& next_server,
    451                         std::pair<const WriteBlockRequest*, WriteBlockResponse*> origin, ...) {
    454     const WriteBlockRequest* request = origin.first;
    455     WriteBlockResponse* response = origin.second;
    456     /// If RPC_ERROR_SEND_BUFFER_FULL retry send.
    457     if (failed && error == sofa::pbrpc::RPC_ERROR_SEND_BUFFER_FULL) { //重试
    458         std::function<void ()> callback =
    459             std::bind(&ChunkServerImpl::WriteNext, this, next_server,
    460                         stub, next_request, next_response, request, response, done);
    461         work_thread_pool_->DelayTask(10, callback);
    462         return;
    463     }
    464     delete stub;
    467     const std::string& databuf = request->databuf();
    470     if (failed || next_response->status() != kOK) {
    471         //failed
    480         delete next_response;
    481         g_unfinished_bytes.Sub(databuf.size());
    482         done->Run();
    483         return;
    484     } else {
    485         //success
    486         delete next_response;
    487     }
    488 
    489     std::function<void ()> callback =
    490         std::bind(&ChunkServerImpl::LocalWriteBlock, this, request, response, done);
    491     work_thread_pool_->AddTask(callback);
    492 }
    

    最终调用LocalWriteBlock进行本地写,扇区写也是如此:

    494 void ChunkServerImpl::LocalWriteBlock(const WriteBlockRequest* request,
    495                         WriteBlockResponse* response,
    496                         ::google::protobuf::Closure* done) {
    497     int64_t block_id = request->block_id();
    498     const std::string& databuf = request->databuf();
    499     int64_t offset = request->offset();
    500     int32_t packet_seq = request->packet_seq();
    503     //set response->set_status(kOK);
    506     int64_t find_start = common::timer::get_micros();
    507     /// search;
    508     Block* block = NULL;
    509 
    510     if (packet_seq == 0) {//FileImpl::AddBlock时packet_seq为0
    511         StatusCode s;
    512         block = block_manager_->CreateBlock(block_id, &s);
    513         if (s != kOK) {
    516             if (s == kBlockExist) {
    517                 int64_t expected_size = block->GetExpectedSize();
    518                 if (expected_size != request->total_size()) {
    522                     s = kWriteError;
    523                 } else {
    524                     response->set_current_size(block->Size());
    525                     response->set_current_seq(block->GetLastSeq());
    528                 }
    529             }
    530             response->set_status(s);
    531             g_unfinished_bytes.Sub(databuf.size());
    532             done->Run();
    533             return;
    

    这里姑且认为不存在处理,则会进行block_manager_->CreateBlock

      4 message BlockMeta {
      5     optional int64 block_id = 1;
      6     optional int64 block_size = 2;
      7     optional int64 checksum = 3;
      8     optional int64 version = 4 [default = -1];
      9     optional string store_path = 5;
     10 }
    
    236 Block* BlockManager::CreateBlock(int64_t block_id, StatusCode* status) {
    237     BlockMeta meta;
    238     meta.set_block_id(block_id);
    239     Disk* disk = PickDisk(block_id);//选择负载较小的disk
    240     if (!disk) {
    241         *status = kNotEnoughQuota;
    243         return NULL;
    244     }
    245     meta.set_store_path(disk->Path());
    246     Block* block = new Block(meta, disk, file_cache_); //disk_file_ = meta.store_path() + BuildFilePath(meta_.block_id());
    
    247     //more code...
    276     return block;
    277 }
    
    152 bool Disk::SyncBlockMeta(const BlockMeta& meta) {
    153     std::string idstr = BlockId2Str(meta.block_id());
    154     leveldb::WriteOptions options;
    155     // options.sync = true;
    156     std::string meta_buf;
    157     meta.SerializeToString(&meta_buf);
    158     leveldb::Status s = metadb_->Put(options, idstr, meta_buf);
    159     if (!s.ok()) {
    161         return false;
    162     }   
    163     return true;
    164 }
    

    千呼万唤始出来,真正存储数据的block,后面在写具体内容时再分析:

     39 class Block { 
     40 public:
     41     Block(const BlockMeta& meta, Disk* disk, FileCache* file_cache);
     42     ~Block();
     83 private:
     84     enum Type {
     85         InDisk,
     86         InMem,
     87     };
    101     std::string disk_file_;
    114 };
    

    packet_seq不为0则进行查找,接着写真正的内容:

    494 void ChunkServerImpl::LocalWriteBlock(...) {
    565     if (!block->Write(packet_seq, offset, databuf.data(),
    566                       databuf.size(), &add_used)) {
    567         block->DecRef();
    568         response->set_status(kWriteError);
    569         g_unfinished_bytes.Sub(databuf.size());
    570         done->Run();
    571         return;
    572     }
    574     if (request->is_last()) {
    575         if (request->has_recover_version()) {
    576             block->SetVersion(request->recover_version());
    579         }
    580         block->SetSliceNum(packet_seq + 1);
    581     }
    
    583     // If complete, close block, and report only once(close block return true).
    584     int64_t report_start = write_end;
    585     if (block->IsComplete() &&
    586             block_manager_->CloseBlock(block, request->sync_on_close())) {
    590         ReportFinish(block);
    591     }
    
    340 bool ChunkServerImpl::ReportFinish(Block* block) {
    341     BlockReceivedRequest request;
    342     request.set_chunkserver_id(chunkserver_id_);
    343     request.set_chunkserver_addr(data_server_addr_);
    344 
    345     ReportBlockInfo* info = request.add_blocks();
    346     info->set_block_id(block->Id());
    347     info->set_block_size(block->Size());
    348     info->set_version(block->GetVersion());
    349     info->set_is_recover(block->IsRecover());
    350     BlockReceivedResponse response;
    351     if (!nameserver_->SendRequest(&NameServer_Stub::BlockReceived, &request, &response, 20)) {
    353         return false;
    354     }
    357     return true;
    358 }
    

    具体的写后面再分析,这里put一个文件时,会写一个的0的block(实际实现过程中,Sdk在AddBlock时,会首先发送一个长度为0的写请求,促使ChunkServer提前创建好写block所需的文件,但是这时该block的大小为0);并向nameserver报告写完,nameserver更新状态:

     202 void NameServerImpl::BlockReceived(...) {
     206     if (!is_leader_) {
     207         response->set_status(kIsFollower);
     208         done->Run();
     209         return;
     210     }
     211     g_block_report.Inc();
     212     response->set_sequence_id(request->sequence_id());
     213     int32_t cs_id = request->chunkserver_id();
     216     const ::google::protobuf::RepeatedPtrField<ReportBlockInfo>& blocks = request->blocks();
     217 
     219     int old_id = chunkserver_manager_->GetChunkServerId(request->chunkserver_addr());
     221     if (cs_id != old_id) {
     224         response->set_status(kUnknownCs);
     225         done->Run();
     226         return;
     227     }
     228     for (int i = 0; i < blocks.size(); i++) {
     230         const ReportBlockInfo& block =  blocks.Get(i);
     231         int64_t block_id = block.block_id();
     232         int64_t block_size = block.block_size();
     233         int64_t block_version = block.version();
     236         // update block -> cs;
     238         if (block_mapping_manager_->UpdateBlockInfo(block_id, cs_id, block_size, block_version)     ) {
     240             // update cs -> block
     241             chunkserver_manager_->AddBlock(cs_id, block_id);
     243         } else {
     246         }
     247     }
     248     response->set_status(kOK);
     249     done->Run();
     250 }
    

    以上差不多是nameserver和chunkserver整个写的大概过程,有些细节没有具体分析,会在下一篇中分析。

    像nameserver上有些数据是动态维护的,并不会保存到db中,如果在某个删除过程中造成不一致,比如sdk与nameserver通信,nameserver处理到一半,然后sdk那边返回错误,则会在后面由chunkserver通过心跳或者report等消息,重新恢复不一致的数据,后期分析的差不多会总结下整个系统的设计。

    但是有些不一致性的数据没多大关系,比如put文件时,先对路径编码进leveldb,后面如果不传文件或者sdk返回错误,通信中断啥的,虽然在nameserver上占用一定的空间,可以在后面定时的处理那些空的目录啥的情况。

    以上建立初始化后,真正向bfs写文件内容,即向chunkserver传输字节内容,放在下一篇分析,这部分内容也多。

    AsyncRequestSendRequest从代码实现和接口名上来看,应该是个异步和同步调用,可能要深入析下sofa::pbrpc代码实现;

    这里raft使用的log是自己实现的,有时间分析下。

    后面分析下每个server启动时做了哪些工作,初始化哪些数据结构,和通信。

    一些bfs qa
    Q:BlockMapping内的东西不落地么?重启需重新加载么?耗时多少?
    A:BlockMapping内的东西确实是不落地的。可以通过Namespace中的内容以及ChunkServer的block report重建出来。每次NameServer重启时都会重建Blockmapping中的内容。耗时主要是顺序扫描leveldb的过程。

    Q:AddBlock在什么时候会被调用?
    A:AddBlock在SDK首次向 一个新创建的文件调用Write时被调用,然后sdk会与NameServer通信,Namserver为其分配block id,以及挑选出足够的ChunkServer供sdk写入使用。

    Q:为什么不在创建文件的时候就直接AddBlock?
    A:因为有可能为一些空文件,这里使用lazy的模式,到最后一步再去创建。

    Q:什么时候chunkserver上会有一个大小为零的block?
    A:Sdk创建了一个文件后,根本没进行过Write,这时,只有在NameServer中有该文件的元信息,但是ChunkServer上是没有的。
    Sdk向一个新建的文件发起了写请求,但是该写请求在反映到ChunkServer中时,ChunkServer刚创建完文件就挂了,此里该block的大小为0。
    实际实现过程中,Sdk在AddBlock时,会首先发送一个长度为0的写请求,促使ChunkServer提前创建好写block所需的文件,但是这时该block的大小为0.(@世光和@丽媛昨天说的应该是这种情况?)

    相关文章

      网友评论

          本文标题:百度文件系统bfs源码分析系列(三)

          本文链接:https://www.haomeiwen.com/subject/ldhceqtx.html