写流程(最终写)
这里考虑扇区写,假设写三个server,分别是a/b/c,在AddBlock
的时候给每个chunkserver发送写请求,初始数据如下:
338 write_windows_[addr] = new common::SlidingWindow<int>(100,
339 std::bind(&FileImpl::OnWriteCommit,
340 std::placeholders::_1, std::placeholders::_2));
341 cs_write_queue_[addr] = new WriteBufferQueue;
343 WriteBlockRequest create_request;
344 int64_t seq = common::timer::get_micros();
345 create_request.set_sequence_id(seq);
346 create_request.set_block_id(block_for_write_->block_id());
347 create_request.set_databuf("", 0);
348 create_request.set_offset(0);
349 create_request.set_is_last(false);
350 create_request.set_packet_seq(0);
当前面的工作都准备好之后,开始写:
253 char buf[10240];
254 int64_t len = 0;
255 int32_t bytes = 0;
256 while ( (bytes = fread(buf, 1, sizeof(buf), fp)) > 0) {
257 int32_t write_bytes = file->Write(buf, bytes);
258 if (write_bytes < bytes) { //写失败
260 ret = 2;
261 break;
262 }
263 len += bytes;
264 }
265 fclose(fp);
266 if (file->Close() != 0) {//关闭文件失败
268 ret = 1;
269 }
388 int32_t FileImpl::Write(const char* buf, int32_t len) {
389 //more code...
419 int32_t w = 0;
420 while (w < len) {
421 MutexLock lock(&mu_, "WriteInternal", 1000);
422 if (write_buf_ == NULL) {
423 write_buf_ = new WriteBuffer(++last_seq_, 256*1024,
424 block_for_write_->block_id(),
425 block_for_write_->block_size());
426 }
427 if ( (len - w) < write_buf_->Available()) {
428 write_buf_->Append(buf+w, len-w);
429 w = len;
430 break;
431 } else {
432 int n = write_buf_->Available();
433 write_buf_->Append(buf+w, n);
434 w += n;
435 }
436 if (write_buf_->Available() == 0) {
437 StartWrite();
438 }
439 }
441 common::atomic_add64(&write_offset_, w);
442 common::atomic_dec(&back_writing_);
443 return w;
444 }
在FileImpl::Write
中创建写缓冲区WriteBuffer
,并把buf中的数据填到WriteBuffer
中,直到塞满或要求的字节数,最后根据是否填满Available
进行StartWrite
,这里有可能不满,会在后面代码中处理这种情况:
28 class WriteBuffer {
29 public:
//first: seq=xxx buf_size=256kb block_id=xxx offset=0
30 WriteBuffer(int32_t seq, int32_t buf_size, int64_t block_id, int64_t offset);
31 ~WriteBuffer();
45 private:
46 int32_t buf_size_;
47 int32_t data_size_;
48 char* buf_;
49 int64_t block_id_;
50 int64_t offset_;
51 int32_t seq_id_;
52 bool is_last_;
53 volatile int refs_;
54 };
27 WriteBuffer::WriteBuffer(int32_t seq, int32_t buf_size, int64_t block_id, int64_t offset)
28 : buf_size_(buf_size), data_size_(0),
29 block_id_(block_id), offset_(offset),
30 seq_id_(seq), is_last_(false), refs_(0) {
31 buf_= new char[buf_size];
32 }
37 int WriteBuffer::Available() const {
38 return buf_size_ - data_size_;
39 }
40 int WriteBuffer::Append(const char* buf, int len) {
41 assert(len + data_size_ <= buf_size_);
42 memcpy(buf_ + data_size_, buf, len);
43 data_size_ += len;
44 return data_size_;
45 }
446 void FileImpl::StartWrite() {
448 mu_.AssertHeld();
449 WriteBuffer* cur_buf = write_buf_;
450 write_buf_ = NULL;
451 // add ref first to prevent deconstruct
452 cur_buf->AddRefBy(cs_write_queue_.size());
453 block_for_write_->set_block_size(block_for_write_->block_size() +
454 cur_buf->Size());
455 mu_.Unlock();
456 for (auto it = cs_write_queue_.begin(); it != cs_write_queue_.end(); ++it) {
457 const std::string& cs_addr = it->first;
458 WriteBufferQueue* buffer_queue = it->second;
459 {
460 MutexLock lock(&(buffer_queue->mu));
461 buffer_queue->buffers.push(cur_buf);
462 }
463 std::function<void ()> task =
464 std::bind(&FileImpl::BackgroundWrite,
465 std::weak_ptr<FileImpl>(shared_from_this()), cs_addr);
466 thread_pool_->AddTask(task);
467 common::atomic_inc(&back_writing_);
468 }
469 mu_.Lock("StartWrite relock", 1000);
470 }
以上实现是保存write_buf_
并重置,为异步写准备;设置当前块的写入字节数;然后根据要写几个server增加cur_buf
引用计数防止提前释放;并压入各自的写队列中;并作为task由工作线程处理,这里的用法std::function<void ()> task = std::bind(&FileImpl::BackgroundWrite, std::weak_ptr<FileImpl>(shared_from_this()), cs_addr)
可以参考之前的博客;
517 void FileImpl::BackgroundWrite(std::weak_ptr<FileImpl> wk_fp,
518 const std::string& cs_addr) {
519 std::shared_ptr<FileImpl> fp(wk_fp.lock());
520 if (!fp) {
522 return;
523 }
524 fp->BackgroundWriteInternal(cs_addr);
525 }
527 void FileImpl::BackgroundWriteInternal(const std::string& cs_addr) {
528 {
529 MutexLock lock(&mu_);
530 if (cs_errors_[cs_addr] == true) {//error
531 common::atomic_dec(&back_writing_); // for AddTask
532 return;
533 }
534 }
535 WriteBufferQueue* buffer_queue = cs_write_queue_[cs_addr];
536 MutexLock lock(&(buffer_queue->mu), "BackgroundWrite", 1000);
537 while(!buffer_queue->buffers.empty()) {
538 WriteBuffer* buffer = buffer_queue->buffers.front();
539 if (write_windows_[cs_addr]->UpBound() < buffer->Sequence()) {//不能在当前的块没写完写后面的块
540 break;
541 }
542 buffer_queue->buffers.pop();
543 buffer_queue->mu.Unlock();
544
545 WriteBlockRequest* request = new WriteBlockRequest;
546 int64_t offset = buffer->offset();
547 int64_t seq = common::timer::get_micros();
548 request->set_sequence_id(seq);
549 request->set_block_id(buffer->block_id());
550 request->set_databuf(buffer->Data(), buffer->Size());//具体内容
551 request->set_offset(offset);//该块第一个字节在整个文件中的偏移量
552 request->set_is_last(buffer->IsLast());//是否是最后一个块
553 request->set_packet_seq(buffer->Sequence()); //第几个块
554 request->set_sync_on_close(w_options_.sync_on_close);
555 //request->add_desc("start");
556 //request->add_timestamp(common::timer::get_micros());
557 if (IsChainsWrite()) {
558 //skip chain write
562 }
563 const int max_retry_times = FLAGS_sdk_write_retry_times;
564 ChunkServer_Stub* stub = chunkservers_[cs_addr];
565 std::function<void (const WriteBlockRequest*, WriteBlockResponse*, bool, int)> callback
566 = std::bind(&FileImpl::WriteBlockCallback,
567 std::weak_ptr<FileImpl>(shared_from_this()),
568 std::placeholders::_1, std::placeholders::_2,
569 std::placeholders::_3, std::placeholders::_4,
570 max_retry_times, buffer, cs_addr);
571
574 common::atomic_inc(&back_writing_);
575 WriteBlockResponse* response = new WriteBlockResponse;
576 rpc_client_->AsyncRequest(stub, &ChunkServer_Stub::WriteBlock,
577 request, response, callback, 60, 1);
578 buffer_queue->mu.Lock("BackgroundWriteRelock", 1000);
579 }
580 common::atomic_dec(&back_writing_); // for AddTask
581 if (back_writing_ == 0) {
582 sync_signal_.Broadcast();
583 }
584 }
由于每个WriteBuffer
有个自增的seq_id
,即表示处于第几块的内容,由创建写缓冲区时指定,即把整个文件分割成块1/2/3...这样,然后指定序列号,然后由SlidingWindow
起到类似滑动窗口机制,上限100;
处理的时候,如果当前窗口没有处理完则不会进行下面的逻辑if (write_windows_[cs_addr]->UpBound() < buffer->Sequence())
;然后创建WriteBlockRequest
,同初始化AddBlock
时那样;然后请求ChunkServerImpl::WriteBlock
,这里姑且认为返回成功,后面再分析;
收到响应后:
621 void FileImpl::WriteBlockCallback(std::weak_ptr<FileImpl> wk_fp,
622 const WriteBlockRequest* request,
623 WriteBlockResponse* response,
624 bool failed, int error,
625 int retry_times,
626 WriteBuffer* buffer,
627 const std::string& cs_addr) {
628 std::shared_ptr<FileImpl> fp(wk_fp.lock());
629 if (!fp) {
631 buffer->DecRef();
632 delete request;
633 delete response;
634 return;
635 }
636 fp->WriteBlockCallbackInternal(request, response, failed, error,
637 retry_times, buffer, cs_addr);
638 }
640 void FileImpl::WriteBlockCallbackInternal(const WriteBlockRequest* request,
641 WriteBlockResponse* response,
642 bool failed, int error,
643 int retry_times,
644 WriteBuffer* buffer,
645 const std::string& cs_addr) {
646 if (failed || response->status() != kOK) {
647 //处理失败的各种情况,跳过
685 } else {
694 int r = write_windows_[cs_addr]->Add(buffer->Sequence(), 0); //移动窗口
695 assert(r == 0);
696 buffer->DecRef();
697 delete request;
698 }
710 std::function<void ()> task =
711 std::bind(&FileImpl::BackgroundWrite,
712 std::weak_ptr<FileImpl>(shared_from_this()),
713 cs_addr);
714 thread_pool_->AddTask(task); //继续下一个写任务
715 }
下面分析下移动窗口的设计,主要作用是防止先写后面的块数据,只有完成当前的块才能进行下一个块,是在sdk这边做的:
20 template <typename Item>
21 class SlidingWindow {
22 public:
23 typedef boost::function<void (int32_t, Item)> SlidingCallback;
24 SlidingWindow(int32_t size, SlidingCallback callback)
25 : bitmap_(NULL), items_(NULL), item_count_(0),
26 callback_(callback), size_(size),
27 base_offset_(0), max_offset_(-1), ready_(0), notifying_(false) {
28 bitmap_ = new char[size];
29 memset(bitmap_, 0, size);
30 items_ = new Item[size];
31 size_ = size;
32 }
51 void Notify() {
52 mu_.AssertHeld();
53 notifying_ = true;
54 while (bitmap_[ready_] == 1) {
55 mu_.Unlock();
56 callback_(base_offset_, items_[ready_]);
57 mu_.Lock("SlidingWindow::Notify relock");
58 bitmap_[ready_] = 0;
59 ++ready_;
60 ++base_offset_;
61 --item_count_;
62 if (ready_ >= size_) {
63 ready_ = 0;
64 }
65 }
66 notifying_ = false;
67 }
68 int32_t UpBound() const {
69 MutexLock lock(&mu_);
70 return base_offset_ + size_ - 1;
71 }
80 int Add(int32_t offset, Item item) {
81 MutexLock lock(&mu_, "Slinding Add", 50000);
82 int32_t pos = offset - base_offset_;
83 if (pos >= size_) {//非窗口内的序列号
84 return -1;
85 } else if (pos < 0) {//之前已经收到的序列号
86 return 1;
87 }
88 pos = (pos + ready_) % size_;//在size_窗口内找下一个坑
89 if (bitmap_[pos]) {//之前已经收到的序列号
90 return 1;
91 }
92 bitmap_[pos] = 1;
93 items_[pos] = item;
94 ++item_count_;
95 if (offset > max_offset_) {
96 max_offset_ = offset;
97 }
98 if (!notifying_) Notify(); //没通知的话通知完成进行回调
99 return 0;
100 }
105 private:
106 char* bitmap_;
107 Item* items_;
108 int32_t item_count_;
109 SlidingCallback callback_;//收到对应窗口后回调
110 int32_t size_;
111 int32_t base_offset_; //当前位置,可以大于size_,后面取模
112 int32_t max_offset_;
113 int32_t ready_;
114 bool notifying_;
115 mutable Mutex mu_;
116 };
717 void FileImpl::OnWriteCommit(int32_t, int) {
718 }
下面是判断有没有完成完成和副本数,根据窗口最后一个要写的序列号和FileImpl中的相比较,还有是否是链式写还是扇区写来判断副本数:
472 int32_t FileImpl::FinishedNum() {
473 mu_.AssertHeld();
474 std::map<std::string, common::SlidingWindow<int>* >::iterator it;
475 int count = 0;
476 for (it = write_windows_.begin(); it != write_windows_.end(); ++it) {
477 if (it->second->GetBaseOffset() == last_seq_ + 1) {
478 count++;
479 }
480 }
481 return count;
482 }
866 bool FileImpl::EnoughReplica() {
867 int32_t last_write_finish_num = FinishedNum();
868 int32_t replica_num = write_windows_.size();
869 bool is_chains = IsChainsWrite();
870 return is_chains ? last_write_finish_num == 1 :
871 last_write_finish_num >= replica_num - 1;
872 }
最后写完后,调用file->Close()
,因为有可能还有未满256kb的buff要写,所以还会尝试写一下:
787 int32_t FileImpl::Close() {
788 common::timer::AutoTimer at(500, "Close", name_.c_str());
789 MutexLock lock(&mu_, "Close", 1000);
790 if (closed_) {
791 return OK;
792 }
793 bool need_report_finish = false;
794 int64_t block_id = -1;
795 int32_t finished_num = 0;
796 bool chains_write = IsChainsWrite();
797 int32_t replica_num = write_windows_.size();
798 if (block_for_write_ && (open_flags_ & O_WRONLY)) {
799 need_report_finish = true;
800 block_id = block_for_write_->block_id();
801 if (!write_buf_) {
802 write_buf_ = new WriteBuffer(++last_seq_, 32, block_id,
803 block_for_write_->block_size());
804 }
805 write_buf_->SetLast();//最后一个块
806 StartWrite();
809 int wait_time = 0;
810 finished_num = FinishedNum();
811 while (!bg_error_) {
812 if (finished_num == replica_num) {
813 break;
814 }
815 // TODO flag for wait_time?
816 if (!chains_write && finished_num == replica_num - 1 && wait_time >= 3) {
817 std::string slow_cs = GetSlowChunkserver();//根据窗口机制选择出慢的节点
820 bg_error_ = true;
821 break;
822 }
823 bool finish = sync_signal_.TimeWait(1000, (name_ + " Close wait").c_str());
824 if (!finish && ++wait_time > 30 && (wait_time % 10 == 0)) {
825 //timeout
827 }
828 finished_num = FinishedNum();
829 }
830 }
838 if (need_report_finish) { //根namsever报告
839 FinishBlockRequest request;
840 FinishBlockResponse response;
841 request.set_sequence_id(0);
842 request.set_file_name(name_);
843 request.set_block_id(block_id);
844 request.set_block_version(last_seq_);
845 request.set_block_size(write_offset_);//总的block大小
846 request.set_close_with_error(bg_error_);
847 bool rpc_ret = fs_->nameserver_client_->SendRequest(&NameServer_Stub::FinishBlock,
848 &request, &response, 15, 1);
849 if (!(rpc_ret && response.status() == kOK)) {
852 if (!rpc_ret) {
853 return TIMEOUT;
854 } else {
855 return GetErrorCode(response.status());
856 }
857 }
858 }
859 return ret;
860 }
484 std::string FileImpl::GetSlowChunkserver() {
485 mu_.AssertHeld();
486 auto it = write_windows_.begin();
487 for (; it != write_windows_.end(); ++it) {
488 common::SlidingWindow<int>* sld = it->second;
489 if (sld->GetBaseOffset() != last_seq_ + 1) {
490 break;
491 }
492 }
493 if (it == write_windows_.end()) {
494 return "";
495 } else {
496 return it->first;
497 }
498 }
总结下写,就是把整个要写的文件,从源文件中读内容到buf中,并分多个块写,分别为每个块设置序列号并建立窗口机制,不允许当前要写的被后面要写的块超过,由于可能存在不足一个块的,会在最后close时进行一次写;每写的写都是多线程且安全的,其中主要使用了异步回调和引用计数;
chunkserver收到写请求后的具体处理细节,详细情况参考百度文件系统bfs源码分析系列(三)
回到sdk这边,写完后会请求NameServerImpl::FinishBlock报告写完成:
595 void NameServerImpl::FinishBlock(::google::protobuf::RpcController* controller,
596 const FinishBlockRequest* request,
597 FinishBlockResponse* response,
598 ::google::protobuf::Closure* done) {
599 if (!is_leader_) {//重定向到leader
602 return;
603 }
604 int64_t block_id = request->block_id();
605 int64_t block_version = request->block_version();
606 response->set_sequence_id(request->sequence_id());
607 std::string file_name = NameSpace::NormalizePath(request->file_name());
611 if (request->close_with_error()) {//处理发生的写错误
613 block_mapping_manager_->MarkIncomplete(block_id);
614 response->set_status(kOK);
615 done->Run();
616 return;
617 }
618 FileLockGuard file_lock_guard(new WriteLock(file_name));
619 FileInfo file_info;
620 if (!namespace_->GetFileInfo(file_name, &file_info)) {
622 response->set_status(kNsNotFound);
623 done->Run();
624 return;
625 }
626
627 if (!CheckFileHasBlock(file_info, file_name, block_id)) {
628 response->set_status(kNoPermission);
629 done->Run();
630 return;
631 }
632 file_info.set_version(block_version);
633 file_info.set_size(request->block_size());
634 NameServerLog log;
635 if (!namespace_->UpdateFileInfo(file_info, &log)) {
637 response->set_status(kUpdateError);
638 done->Run();
639 return;
640 }
641 StatusCode ret = block_mapping_manager_->CheckBlockVersion(block_id, block_version);
642 response->set_status(ret);
643 if (ret != kOK) {
645 done->Run();
646 } else {
648 LogRemote(log, std::bind(&NameServerImpl::SyncLogCallback, this,
649 controller, request, response, done,
650 (std::vector<FileInfo>*)NULL,
651 file_lock_guard,
652 std::placeholders::_1));
653 }
654 }
582 bool NameServerImpl::CheckFileHasBlock(const FileInfo& file_info,
583 const std::string& file_name,
584 int64_t block_id) {
585 for (int i = 0; i < file_info.blocks_size(); i++) {
586 if (file_info.blocks(i) == block_id) {
587 return true;
588 }
589 }
592 return false;
593 }
955 void BlockMapping::MarkIncomplete(int64_t block_id) {
956 MutexLock lock(&mu_);
957 NSBlock* block = NULL;
958 if (!GetBlockPtr(block_id, &block)) {
959 return;
960 }
961 // maybe cs is down and block have been marked with incomplete
962 if (block->recover_stat == kBlockWriting) {
963 for (std::set<int32_t>::iterator it = block->incomplete_replica.begin();
964 it != block->incomplete_replica.end(); ++it) {
965 incomplete_[*it].insert(block_id);//设置每个chunkserver对应的blockid没有完成
967 }
968 SetState(block, kIncomplete);
969 }
970 }
以上功能基本以检查为主,在namespace_->UpdateFileInfo
时会更新blockid上限且保存;并更新file_info
;检查版本号,因为在报告写完成后,此时的版本号是last_seq,即sdk那边实际分割的块数加上最后的一块(可能);在sdk向chunkserver上写内容时,在chunkserver服务上,每收到一个块,会由滑动窗口进行callback回调,并更新last_seq:
341 void Block::WriteCallback(int32_t seq, Buffer buffer) {
342 Append(seq, buffer.data_, buffer.len_);
343 delete[] buffer.data_;
344 disk_->counters_.writing_bytes.Sub(buffer.len_);
345 }
425 StatusCode Block::Append(int32_t seq, const char* buf, int64_t len) {
426 //more code..
455 last_seq_ = seq;
456 return kOK;
457 }
494 void ChunkServerImpl::LocalWriteBlock(...) {
495 //more code..
585 if (block->IsComplete() &&
586 block_manager_->CloseBlock(block, request->sync_on_close())) {
590 ReportFinish(block);
591 }
340 bool ChunkServerImpl::ReportFinish(Block* block) {
341 BlockReceivedRequest request;
342 request.set_chunkserver_id(chunkserver_id_);
343 request.set_chunkserver_addr(data_server_addr_);
344
345 ReportBlockInfo* info = request.add_blocks();
346 info->set_block_id(block->Id());
347 info->set_block_size(block->Size());
348 info->set_version(block->GetVersion());//设置版本号
349 info->set_is_recover(block->IsRecover());
350 BlockReceivedResponse response;
351 if (!nameserver_->SendRequest(&NameServer_Stub::BlockReceived, &request, &response, 20)) {
353 return false;
354 }
357 return true;
358 }
当nameserver 收到该请求时,会更新blockinfo信息:
202 void NameServerImpl::BlockReceived(...) {
203 //more code...
228 for (int i = 0; i < blocks.size(); i++) {
230 const ReportBlockInfo& block = blocks.Get(i);
231 int64_t block_id = block.block_id();
232 int64_t block_size = block.block_size();
233 int64_t block_version = block.version();
236 // update block -> cs;
238 if (block_mapping_manager_->UpdateBlockInfo(block_id, cs_id, block_size, block_version) ) {//根据不同的状态处理
241 chunkserver_manager_->AddBlock(cs_id, block_id);
243 }
247 }
上面部分后续再分析,假设block_mapping_manager_->CheckBlockVersion
为真,则后续会通过raft把这些日志同步到其他nameserver,并在某个点执行;整个过程差不多分析完成。
还有具体的块修复,各状态处理,数据异常处理,和一些类在每个服务上的作用等细节会在下下一篇具体分析,下篇分析读过程。
网友评论