美文网首页
百度文件系统bfs源码分析系列(四)

百度文件系统bfs源码分析系列(四)

作者: fooboo | 来源:发表于2019-02-19 19:40 被阅读2次

    写流程(最终写)

    这里考虑扇区写,假设写三个server,分别是a/b/c,在AddBlock的时候给每个chunkserver发送写请求,初始数据如下:

    338         write_windows_[addr] = new common::SlidingWindow<int>(100,
    339                                std::bind(&FileImpl::OnWriteCommit,
    340                                 std::placeholders::_1, std::placeholders::_2));
    341         cs_write_queue_[addr] = new WriteBufferQueue;
    
    343         WriteBlockRequest create_request;
    344         int64_t seq = common::timer::get_micros();
    345         create_request.set_sequence_id(seq);
    346         create_request.set_block_id(block_for_write_->block_id());
    347         create_request.set_databuf("", 0);
    348         create_request.set_offset(0);
    349         create_request.set_is_last(false);
    350         create_request.set_packet_seq(0);
    

    当前面的工作都准备好之后,开始写:

    253     char buf[10240];
    254     int64_t len = 0;
    255     int32_t bytes = 0;
    256     while ( (bytes = fread(buf, 1, sizeof(buf), fp)) > 0) {
    257         int32_t write_bytes = file->Write(buf, bytes);
    258         if (write_bytes < bytes) { //写失败
    260             ret = 2;
    261             break;
    262         }
    263         len += bytes;
    264     }
    265     fclose(fp);
    266     if (file->Close() != 0) {//关闭文件失败
    268         ret = 1;
    269     }
    
    388 int32_t FileImpl::Write(const char* buf, int32_t len) {
    389     //more code...
    419     int32_t w = 0;
    420     while (w < len) {
    421         MutexLock lock(&mu_, "WriteInternal", 1000);
    422         if (write_buf_ == NULL) {
    423             write_buf_ = new WriteBuffer(++last_seq_, 256*1024,
    424                                          block_for_write_->block_id(),
    425                                          block_for_write_->block_size());
    426         }
    427         if ( (len - w) < write_buf_->Available()) {
    428             write_buf_->Append(buf+w, len-w);
    429             w = len;
    430             break;
    431         } else {
    432             int n = write_buf_->Available();
    433             write_buf_->Append(buf+w, n);
    434             w += n;
    435         }
    436         if (write_buf_->Available() == 0) {
    437             StartWrite();
    438         }
    439     }
    441     common::atomic_add64(&write_offset_, w);
    442     common::atomic_dec(&back_writing_);
    443     return w;
    444 }
    

    FileImpl::Write中创建写缓冲区WriteBuffer,并把buf中的数据填到WriteBuffer中,直到塞满或要求的字节数,最后根据是否填满Available进行StartWrite,这里有可能不满,会在后面代码中处理这种情况:

     28 class WriteBuffer {
     29 public:     
            //first: seq=xxx buf_size=256kb block_id=xxx offset=0
     30     WriteBuffer(int32_t seq, int32_t buf_size, int64_t block_id, int64_t offset);
     31     ~WriteBuffer();
     45 private:
     46     int32_t buf_size_;
     47     int32_t data_size_;
     48     char*   buf_;
     49     int64_t block_id_;
     50     int64_t offset_;
     51     int32_t seq_id_;
     52     bool    is_last_;
     53     volatile int refs_;
     54 };
    
     27 WriteBuffer::WriteBuffer(int32_t seq, int32_t buf_size, int64_t block_id, int64_t offset)
     28     : buf_size_(buf_size), data_size_(0),
     29       block_id_(block_id), offset_(offset),
     30       seq_id_(seq), is_last_(false), refs_(0) {
     31     buf_= new char[buf_size];
     32 }
    
     37 int WriteBuffer::Available() const {
     38     return buf_size_ - data_size_;
     39 }
    
     40 int WriteBuffer::Append(const char* buf, int len) {
     41     assert(len + data_size_ <= buf_size_);
     42     memcpy(buf_ + data_size_, buf, len);
     43     data_size_ += len;
     44     return data_size_;
     45 }
    
    446 void FileImpl::StartWrite() {
    448     mu_.AssertHeld();
    449     WriteBuffer* cur_buf = write_buf_;
    450     write_buf_ = NULL;
    451     // add ref first to prevent deconstruct
    452     cur_buf->AddRefBy(cs_write_queue_.size());
    453     block_for_write_->set_block_size(block_for_write_->block_size() +
    454                                      cur_buf->Size());
    455     mu_.Unlock();
    456     for (auto it = cs_write_queue_.begin(); it != cs_write_queue_.end(); ++it) {
    457         const std::string& cs_addr = it->first;
    458         WriteBufferQueue* buffer_queue = it->second;
    459         {
    460             MutexLock lock(&(buffer_queue->mu));
    461             buffer_queue->buffers.push(cur_buf);
    462         }
    463         std::function<void ()> task =
    464             std::bind(&FileImpl::BackgroundWrite,
    465                     std::weak_ptr<FileImpl>(shared_from_this()), cs_addr);
    466         thread_pool_->AddTask(task);
    467         common::atomic_inc(&back_writing_);
    468     }
    469     mu_.Lock("StartWrite relock", 1000);
    470 }
    

    以上实现是保存write_buf_并重置,为异步写准备;设置当前块的写入字节数;然后根据要写几个server增加cur_buf引用计数防止提前释放;并压入各自的写队列中;并作为task由工作线程处理,这里的用法std::function<void ()> task = std::bind(&FileImpl::BackgroundWrite, std::weak_ptr<FileImpl>(shared_from_this()), cs_addr)可以参考之前的博客;

    517 void FileImpl::BackgroundWrite(std::weak_ptr<FileImpl> wk_fp,
    518                                const std::string& cs_addr) {
    519     std::shared_ptr<FileImpl> fp(wk_fp.lock());
    520     if (!fp) {
    522         return;
    523     }
    524     fp->BackgroundWriteInternal(cs_addr);
    525 }
    
    527 void FileImpl::BackgroundWriteInternal(const std::string& cs_addr) {
    528     {   
    529         MutexLock lock(&mu_);
    530         if (cs_errors_[cs_addr] == true) {//error
    531             common::atomic_dec(&back_writing_);    // for AddTask
    532             return;
    533         }
    534     }
    535     WriteBufferQueue* buffer_queue = cs_write_queue_[cs_addr];
    536     MutexLock lock(&(buffer_queue->mu), "BackgroundWrite", 1000);
    537     while(!buffer_queue->buffers.empty()) {
    538         WriteBuffer* buffer = buffer_queue->buffers.front();
    539         if (write_windows_[cs_addr]->UpBound() < buffer->Sequence()) {//不能在当前的块没写完写后面的块
    540             break;
    541         }
    542         buffer_queue->buffers.pop();
    543         buffer_queue->mu.Unlock();
    544 
    545         WriteBlockRequest* request = new WriteBlockRequest;
    546         int64_t offset = buffer->offset();
    547         int64_t seq = common::timer::get_micros();
    548         request->set_sequence_id(seq);
    549         request->set_block_id(buffer->block_id());
    550         request->set_databuf(buffer->Data(), buffer->Size());//具体内容
    551         request->set_offset(offset);//该块第一个字节在整个文件中的偏移量
    552         request->set_is_last(buffer->IsLast());//是否是最后一个块
    553         request->set_packet_seq(buffer->Sequence()); //第几个块
    554         request->set_sync_on_close(w_options_.sync_on_close);
    555         //request->add_desc("start");
    556         //request->add_timestamp(common::timer::get_micros());
    557         if (IsChainsWrite()) {
    558             //skip chain write
    562         }
    563         const int max_retry_times = FLAGS_sdk_write_retry_times;
    564         ChunkServer_Stub* stub = chunkservers_[cs_addr];
    565         std::function<void (const WriteBlockRequest*, WriteBlockResponse*, bool, int)> callback
    566             = std::bind(&FileImpl::WriteBlockCallback,
    567                     std::weak_ptr<FileImpl>(shared_from_this()),
    568                     std::placeholders::_1, std::placeholders::_2,
    569                     std::placeholders::_3, std::placeholders::_4,
    570                     max_retry_times, buffer, cs_addr);
    571 
    574         common::atomic_inc(&back_writing_);
    575         WriteBlockResponse* response = new WriteBlockResponse;
    576         rpc_client_->AsyncRequest(stub, &ChunkServer_Stub::WriteBlock,
    577                 request, response, callback, 60, 1);
    578         buffer_queue->mu.Lock("BackgroundWriteRelock", 1000);
    579     }
    580     common::atomic_dec(&back_writing_);    // for AddTask
    581     if (back_writing_ == 0) {
    582         sync_signal_.Broadcast();
    583     }
    584 }
    

    由于每个WriteBuffer有个自增的seq_id,即表示处于第几块的内容,由创建写缓冲区时指定,即把整个文件分割成块1/2/3...这样,然后指定序列号,然后由SlidingWindow起到类似滑动窗口机制,上限100;
    处理的时候,如果当前窗口没有处理完则不会进行下面的逻辑if (write_windows_[cs_addr]->UpBound() < buffer->Sequence());然后创建WriteBlockRequest,同初始化AddBlock时那样;然后请求ChunkServerImpl::WriteBlock,这里姑且认为返回成功,后面再分析;
    收到响应后:

    621 void FileImpl::WriteBlockCallback(std::weak_ptr<FileImpl> wk_fp,
    622                                   const WriteBlockRequest* request,
    623                                   WriteBlockResponse* response,
    624                                   bool failed, int error,
    625                                   int retry_times,
    626                                   WriteBuffer* buffer,
    627                                   const std::string& cs_addr) {
    628     std::shared_ptr<FileImpl> fp(wk_fp.lock());
    629     if (!fp) {
    631         buffer->DecRef();
    632         delete request;
    633         delete response;
    634         return;
    635     }
    636     fp->WriteBlockCallbackInternal(request, response, failed, error,
    637                                    retry_times, buffer, cs_addr);
    638 }
    
    640 void FileImpl::WriteBlockCallbackInternal(const WriteBlockRequest* request,
    641                                      WriteBlockResponse* response,
    642                                      bool failed, int error,
    643                                      int retry_times,
    644                                      WriteBuffer* buffer,
    645                                      const std::string& cs_addr) {
    646     if (failed || response->status() != kOK) {
    647         //处理失败的各种情况,跳过
    685     } else {
    694         int r = write_windows_[cs_addr]->Add(buffer->Sequence(), 0); //移动窗口
    695         assert(r == 0);
    696         buffer->DecRef();
    697         delete request;
    698     }
    710     std::function<void ()> task =
    711         std::bind(&FileImpl::BackgroundWrite,
    712                   std::weak_ptr<FileImpl>(shared_from_this()),
    713                   cs_addr);
    714     thread_pool_->AddTask(task); //继续下一个写任务
    715 }
    

    下面分析下移动窗口的设计,主要作用是防止先写后面的块数据,只有完成当前的块才能进行下一个块,是在sdk这边做的:

     20 template <typename Item>
     21 class SlidingWindow {
     22 public:
     23     typedef boost::function<void (int32_t, Item)> SlidingCallback;
     24     SlidingWindow(int32_t size, SlidingCallback callback)
     25       : bitmap_(NULL), items_(NULL), item_count_(0),
     26         callback_(callback), size_(size),
     27         base_offset_(0), max_offset_(-1), ready_(0), notifying_(false) {
     28         bitmap_ = new char[size];
     29         memset(bitmap_, 0, size);
     30         items_ = new Item[size];
     31         size_ = size;
     32     }
    
     51     void Notify() {
     52         mu_.AssertHeld();
     53         notifying_ = true;
     54         while (bitmap_[ready_] == 1) {
     55             mu_.Unlock();
     56             callback_(base_offset_, items_[ready_]);
     57             mu_.Lock("SlidingWindow::Notify relock");
     58             bitmap_[ready_] = 0;
     59             ++ready_;
     60             ++base_offset_;
     61             --item_count_;
     62             if (ready_ >= size_) {
     63                 ready_ = 0;
     64             }
     65         }
     66         notifying_ = false;
     67     }
    
     68     int32_t UpBound() const {
     69         MutexLock lock(&mu_);
     70         return base_offset_ + size_ - 1;
     71     }
    
     80     int Add(int32_t offset, Item item) {
     81         MutexLock lock(&mu_, "Slinding Add", 50000);
     82         int32_t pos = offset - base_offset_;
     83         if (pos >= size_) {//非窗口内的序列号
     84             return -1; 
     85         } else if (pos < 0) {//之前已经收到的序列号
     86             return 1;
     87         }
     88         pos = (pos + ready_) % size_;//在size_窗口内找下一个坑
     89         if (bitmap_[pos]) {//之前已经收到的序列号
     90             return 1;
     91         }
     92         bitmap_[pos] = 1;
     93         items_[pos] = item;
     94         ++item_count_;
     95         if (offset > max_offset_) {
     96             max_offset_ = offset;
     97         }
     98         if (!notifying_) Notify(); //没通知的话通知完成进行回调
     99         return 0;
    100     }
    105 private:
    106     char* bitmap_;
    107     Item* items_;
    108     int32_t item_count_;
    109     SlidingCallback callback_;//收到对应窗口后回调
    110     int32_t size_;
    111     int32_t base_offset_; //当前位置,可以大于size_,后面取模
    112     int32_t max_offset_;
    113     int32_t ready_;
    114     bool notifying_;
    115     mutable Mutex mu_;
    116 };
    
    717 void FileImpl::OnWriteCommit(int32_t, int) {
    718 }
    

    下面是判断有没有完成完成和副本数,根据窗口最后一个要写的序列号和FileImpl中的相比较,还有是否是链式写还是扇区写来判断副本数:

    472 int32_t FileImpl::FinishedNum() {
    473     mu_.AssertHeld();
    474     std::map<std::string, common::SlidingWindow<int>* >::iterator it;
    475     int count = 0;
    476     for (it = write_windows_.begin(); it != write_windows_.end(); ++it) {
    477         if (it->second->GetBaseOffset() == last_seq_ + 1) {
    478             count++;
    479         }
    480     }
    481     return count;
    482 }
    
    866 bool FileImpl::EnoughReplica() {
    867     int32_t last_write_finish_num = FinishedNum();
    868     int32_t replica_num = write_windows_.size();
    869     bool is_chains = IsChainsWrite();
    870     return is_chains ? last_write_finish_num == 1 :
    871                        last_write_finish_num >= replica_num - 1;
    872 }
    

    最后写完后,调用file->Close(),因为有可能还有未满256kb的buff要写,所以还会尝试写一下:

    787 int32_t FileImpl::Close() {
    788     common::timer::AutoTimer at(500, "Close", name_.c_str());
    789     MutexLock lock(&mu_, "Close", 1000);
    790     if (closed_) {
    791         return OK;
    792     }
    793     bool need_report_finish = false;
    794     int64_t block_id = -1;
    795     int32_t finished_num = 0;
    796     bool chains_write = IsChainsWrite();
    797     int32_t replica_num = write_windows_.size();
    798     if (block_for_write_ && (open_flags_ & O_WRONLY)) {
    799         need_report_finish = true;
    800         block_id = block_for_write_->block_id();
    801         if (!write_buf_) {
    802             write_buf_ = new WriteBuffer(++last_seq_, 32, block_id,
    803                                          block_for_write_->block_size());
    804         }
    805         write_buf_->SetLast();//最后一个块
    806         StartWrite();
    809         int wait_time = 0;
    810         finished_num = FinishedNum();
    811         while (!bg_error_) {
    812             if (finished_num == replica_num) {
    813                 break;
    814             }
    815             // TODO flag for wait_time?
    816             if (!chains_write && finished_num == replica_num - 1 && wait_time >= 3) {
    817                 std::string slow_cs = GetSlowChunkserver();//根据窗口机制选择出慢的节点
    820                 bg_error_ = true;
    821                 break;
    822             }
    823             bool finish = sync_signal_.TimeWait(1000, (name_ + " Close wait").c_str());
    824             if (!finish && ++wait_time > 30 && (wait_time % 10 == 0)) {
    825                 //timeout
    827             }
    828             finished_num = FinishedNum();
    829         }
    830     }
    838     if (need_report_finish) { //根namsever报告
    839         FinishBlockRequest request;
    840         FinishBlockResponse response;
    841         request.set_sequence_id(0);
    842         request.set_file_name(name_);
    843         request.set_block_id(block_id);
    844         request.set_block_version(last_seq_);
    845         request.set_block_size(write_offset_);//总的block大小
    846         request.set_close_with_error(bg_error_);
    847         bool rpc_ret = fs_->nameserver_client_->SendRequest(&NameServer_Stub::FinishBlock,
    848                                                    &request, &response, 15, 1);
    849         if (!(rpc_ret && response.status() == kOK))  {
    852             if (!rpc_ret) {
    853                 return TIMEOUT;
    854             } else {
    855                 return GetErrorCode(response.status());
    856             }
    857         }
    858     }
    859     return ret;
    860 }
    
    484 std::string FileImpl::GetSlowChunkserver() {
    485     mu_.AssertHeld();
    486     auto it = write_windows_.begin();
    487     for (; it != write_windows_.end(); ++it) {
    488         common::SlidingWindow<int>* sld = it->second;
    489         if (sld->GetBaseOffset() != last_seq_ + 1) {
    490             break;
    491         }
    492     }
    493     if (it == write_windows_.end()) {
    494         return "";
    495     } else {
    496         return it->first;
    497     }   
    498 }
    

    总结下写,就是把整个要写的文件,从源文件中读内容到buf中,并分多个块写,分别为每个块设置序列号并建立窗口机制,不允许当前要写的被后面要写的块超过,由于可能存在不足一个块的,会在最后close时进行一次写;每写的写都是多线程且安全的,其中主要使用了异步回调和引用计数;

    chunkserver收到写请求后的具体处理细节,详细情况参考百度文件系统bfs源码分析系列(三)

    回到sdk这边,写完后会请求NameServerImpl::FinishBlock报告写完成:

     595 void NameServerImpl::FinishBlock(::google::protobuf::RpcController* controller,
     596                                  const FinishBlockRequest* request,
     597                                  FinishBlockResponse* response,
     598                                  ::google::protobuf::Closure* done) {
     599     if (!is_leader_) {//重定向到leader
     602         return;
     603     }
     604     int64_t block_id = request->block_id();
     605     int64_t block_version = request->block_version();
     606     response->set_sequence_id(request->sequence_id());
     607     std::string file_name = NameSpace::NormalizePath(request->file_name());
     611     if (request->close_with_error()) {//处理发生的写错误
     613         block_mapping_manager_->MarkIncomplete(block_id);
     614         response->set_status(kOK);
     615         done->Run();
     616         return;
     617     }
     618     FileLockGuard file_lock_guard(new WriteLock(file_name));
     619     FileInfo file_info;
     620     if (!namespace_->GetFileInfo(file_name, &file_info)) {
     622         response->set_status(kNsNotFound);
     623         done->Run();
     624         return;
     625     }
     626 
     627     if (!CheckFileHasBlock(file_info, file_name, block_id)) {
     628         response->set_status(kNoPermission);
     629         done->Run();
     630         return;
     631     }
     632     file_info.set_version(block_version);
     633     file_info.set_size(request->block_size());
     634     NameServerLog log;
     635     if (!namespace_->UpdateFileInfo(file_info, &log)) {                      
     637         response->set_status(kUpdateError);
     638         done->Run();
     639         return;
     640     }
     641     StatusCode ret = block_mapping_manager_->CheckBlockVersion(block_id, block_version);
     642     response->set_status(ret);
     643     if (ret != kOK) {
     645         done->Run();
     646     } else {
     648         LogRemote(log, std::bind(&NameServerImpl::SyncLogCallback, this,
     649                                    controller, request, response, done,
     650                                    (std::vector<FileInfo>*)NULL,
     651                                    file_lock_guard,
     652                                    std::placeholders::_1));
     653     }
     654 }
    
     582 bool NameServerImpl::CheckFileHasBlock(const FileInfo& file_info,
     583                                        const std::string& file_name,
     584                                        int64_t block_id) {
     585     for (int i = 0; i < file_info.blocks_size(); i++) {
     586         if (file_info.blocks(i) == block_id) {
     587             return true;
     588         }
     589     }   
     592     return false;
     593 }
    
    955 void BlockMapping::MarkIncomplete(int64_t block_id) {
    956     MutexLock lock(&mu_);
    957     NSBlock* block = NULL;
    958     if (!GetBlockPtr(block_id, &block)) {
    959         return;
    960     }       
    961     // maybe cs is down and block have been marked with incomplete
    962     if (block->recover_stat == kBlockWriting) {
    963         for (std::set<int32_t>::iterator it = block->incomplete_replica.begin();
    964                 it != block->incomplete_replica.end(); ++it) {
    965             incomplete_[*it].insert(block_id);//设置每个chunkserver对应的blockid没有完成
    967         }
    968         SetState(block, kIncomplete);
    969     }
    970 }
    

    以上功能基本以检查为主,在namespace_->UpdateFileInfo时会更新blockid上限且保存;并更新file_info;检查版本号,因为在报告写完成后,此时的版本号是last_seq,即sdk那边实际分割的块数加上最后的一块(可能);在sdk向chunkserver上写内容时,在chunkserver服务上,每收到一个块,会由滑动窗口进行callback回调,并更新last_seq:

    341 void Block::WriteCallback(int32_t seq, Buffer buffer) {
    342     Append(seq, buffer.data_, buffer.len_);
    343     delete[] buffer.data_;
    344     disk_->counters_.writing_bytes.Sub(buffer.len_);
    345 }
    
    425 StatusCode Block::Append(int32_t seq, const char* buf, int64_t len) {
    426     //more code..
    455     last_seq_ = seq;
    456     return kOK;
    457 }
    
    494 void ChunkServerImpl::LocalWriteBlock(...) {
    495     //more code..
    585     if (block->IsComplete() &&
    586             block_manager_->CloseBlock(block, request->sync_on_close())) {
    590         ReportFinish(block);
    591     }
    
    340 bool ChunkServerImpl::ReportFinish(Block* block) {
    341     BlockReceivedRequest request;
    342     request.set_chunkserver_id(chunkserver_id_);
    343     request.set_chunkserver_addr(data_server_addr_);
    344         
    345     ReportBlockInfo* info = request.add_blocks();
    346     info->set_block_id(block->Id());
    347     info->set_block_size(block->Size());
    348     info->set_version(block->GetVersion());//设置版本号
    349     info->set_is_recover(block->IsRecover());
    350     BlockReceivedResponse response;
    351     if (!nameserver_->SendRequest(&NameServer_Stub::BlockReceived, &request, &response, 20)) {
    353         return false; 
    354     }   
    357     return true;
    358 }
    

    当nameserver 收到该请求时,会更新blockinfo信息:

     202 void NameServerImpl::BlockReceived(...) {
     203     //more code...
     228     for (int i = 0; i < blocks.size(); i++) {
     230         const ReportBlockInfo& block =  blocks.Get(i);
     231         int64_t block_id = block.block_id();
     232         int64_t block_size = block.block_size();
     233         int64_t block_version = block.version();
     236         // update block -> cs;
     238         if (block_mapping_manager_->UpdateBlockInfo(block_id, cs_id, block_size, block_version)     ) {//根据不同的状态处理
     241             chunkserver_manager_->AddBlock(cs_id, block_id);
     243         }
     247     }
    

    上面部分后续再分析,假设block_mapping_manager_->CheckBlockVersion为真,则后续会通过raft把这些日志同步到其他nameserver,并在某个点执行;整个过程差不多分析完成。

    还有具体的块修复,各状态处理,数据异常处理,和一些类在每个服务上的作用等细节会在下下一篇具体分析,下篇分析读过程。

    相关文章

      网友评论

          本文标题:百度文件系统bfs源码分析系列(四)

          本文链接:https://www.haomeiwen.com/subject/oypjeqtx.html