美文网首页
百度文件系统bfs源码分析系列(五)

百度文件系统bfs源码分析系列(五)

作者: fooboo | 来源:发表于2019-02-21 23:42 被阅读4次

之前几篇都是分析的写过程,但有些细节没有再说明,只是分析大概流程,像中间一些类的实现和作用,对于异常的情况处理,数据到chunkserver后怎么落地和更新等等,后续分析主要围绕这些来,这篇分析下读的过程,相比写简单些。

读流程

还是从具体使用方法说起,BfsGet从bfs上下载某个文件至本地:

153 int BfsGet(baidu::bfs::FS* fs, int argc, char* argv[]) {
154     //more code...
177     baidu::bfs::File* file;
178     if (fs->OpenFile(source.c_str(), O_RDONLY, &file, baidu::bfs::ReadOptions()) != 0) {
180         return 1;
181     }
182 
183     FILE* fp = fopen(target.c_str(), "wb");
184     if (fp == NULL) {
186         delete file;
187         return -1;
188     }
189     char buf[10240];
190     int64_t bytes = 0;
191     int32_t len = 0;
192     int32_t ret = 0;
193     while (1) {
194         len = file->Read(buf, sizeof(buf));
195         if (len <= 0) {
196             if (len < 0) {
198                 ret = 1;
199             }
200             break;
201         }
202         bytes += len;
203         fwrite(buf, len, 1, fp);
204     }
206     delete file;
207     fclose(fp);
208     return ret;
209 }

在这个接口中,会对输入的参数处理,比如source是否是目录,对target判断是否是目录或文件路径等情况;然后打开bfs上的文件,假设成功,接着判断target能否打开,然后开始进行写本地文件;

以读打开bfs文件和写bfs文件不一样的:

363 int32_t FSImpl::OpenFile(const char* path, int32_t flags, File** file, const ReadOptions& option    s) {
364     if (flags != O_RDONLY) {
365         return BAD_PARAMETER;
366     }       
368     int32_t ret = OK;
369     *file = NULL;
370 
371     FileLocationRequest request;
372     FileLocationResponse response;
373     request.set_file_name(path);
374     request.set_sequence_id(0);
375     bool rpc_ret = nameserver_client_->SendRequest(&NameServer_Stub::GetFileLocation,
376         &request, &response, 15, 1);
377     if (rpc_ret && response.status() == kOK) {
378         FileImpl* f = new FileImpl(this, rpc_client_, path, flags, options);
379         f->located_blocks_.CopyFrom(response.blocks());
380         *file = new FileImplWrapper(f);
381     } else {
383         //error...
388     }
389     return ret;
390 }

这里其实是sdk向nameserver查询source文件相关的信息分布情况,考虑只是文件的情形:

 656 void NameServerImpl::GetFileLocation(::google::protobuf::RpcController* controller,
 657                                      const FileLocationRequest* request,
 658                                      FileLocationResponse* response,
 659                                      ::google::protobuf::Closure* done) {
 660     if (!is_leader_) {
 661         //重定向到leader
 663         return;
 664     } 
 665     response->set_sequence_id(request->sequence_id());
 666     std::string path = NameSpace::NormalizePath(request->file_name());
 670 
 671     FileInfo info;
 672     FileLockGuard file_lock_guard(new ReadLock(path));
 673     if (!namespace_->GetFileInfo(path, &info)) {//获取文件所关联的信息
 677         response->set_status(kNsNotFound); 
 678     } else {
 679         for (int i = 0; i < info.blocks_size(); i++) {
 680             int64_t block_id = info.blocks(i);
 681             std::vector<int32_t> replica;
 682             int64_t block_size = 0;
 683             RecoverStat rs;
 684             if (!block_mapping_manager_->GetLocatedBlock(block_id, &replica, &block_size, &rs)) {
 686                 break;
 687             }
 688             LocatedBlock* lcblock = response->add_blocks();
 689             lcblock->set_block_id(block_id);
 690             lcblock->set_block_size(block_size);
 691             lcblock->set_status(rs);
 692             for (uint32_t i = 0; i < replica.size(); i++) {
 693                 int32_t server_id = replica[i];
 694                 std::string addr = chunkserver_manager_->GetChunkServerAddr(server_id);
 695                 if (addr == "") {
 697                     continue;
 698                 }
 700                 ChunkServerInfo* cs_info = lcblock->add_chains();
 701                 cs_info->set_address(addr);
 702             }
 705         }
 707         response->set_status(kOK);
 708     }
 709     done->Run();
 710 }

nameserver收到请求后,根据文件名查找相关的blockid信息(写文件时会对每个文件编码),这里只有一个block,然后block_mapping_manager_->GetLocatedBlock来获取blockid所表示文件的分布信息,即chunkserver的地址(这里每个chunkserver在nameserver那边Register时由nameserver分配的自增id,后面分析下,这里可能会有问题):

603 std::string ChunkServerManager::GetChunkServerAddr(int32_t id) {
604     MutexLock lock(&mu_, "GetChunkServerAddr", 10);
605     ChunkServerInfo* cs = NULL;
606     if (GetChunkServerPtr(id, &cs) && !cs->is_dead()) {
607         return cs->ipaddress();
608     }
609     return "";
610 }

如果该chunkserver状态健康的话则会发给sdk那边;sdk会收到该文件所对应的基本信息有:文件所表示的blockid,文件大小,状态,和分布于哪些chunkserver服务上;

回到sdk后,会创建FileImplWrapper对象,之后开始从bfs读文件,并写入本地,部分代码如下:

183     FILE* fp = fopen(target.c_str(), "wb");
193     while (1) {
194         len = file->Read(buf, sizeof(buf));
195         if (len <= 0) {
196             if (len < 0) {
198                 ret = 1;
199             }
200             break;
201         }
202         bytes += len;
203         fwrite(buf, len, 1, fp); //写target
204     }

299 int32_t FileImpl::Read(char* buf, int32_t read_len) {
302     if (open_flags_ != O_RDONLY) {
303         return BAD_PARAMETER;
304     }
305     MutexLock lock(&read_offset_mu_);
306     int32_t ret = Pread(buf, read_len, read_offset_, true);
308     if (ret >= 0) {
309         read_offset_ += ret;
310     }
311     return ret;
312 }

其中read_offset_记录上次读到的偏移量;

139 int32_t FileImpl::Pread(char* buf, int32_t read_len, int64_t offset, bool reada) {
140     if (read_len <= 0 || buf == NULL || offset < 0) {
143         return BAD_PARAMETER;
144     }
145     {
146         MutexLock lock(&mu_, "Pread read buffer", 1000);
147         if (last_read_offset_ == -1
148             || last_read_offset_ != offset) {
149             sequential_ratio_ /= 2;
152         } else {
153             sequential_ratio_++;
154         }
155         last_read_offset_ = offset + read_len;
156         if (reada_buffer_ && reada_base_ <= offset &&
157                 reada_base_ + reada_buf_len_ >= offset + read_len) {
158             memcpy(buf, reada_buffer_ + (offset - reada_base_), read_len);
160             return read_len;
161         }
162     }

如果reada_buffer_指向缓存区域,直接从本地缓存中读并返回,第一次读肯定不会走上面if的逻辑;
其中这六个变量last_read_offset_/reada_buffer_/reada_base_/reada_buf_len_/read_offset_ /sequential_ratio_分别如下面的注释,具体作用在后面的代码实现中体现:

153     int64_t read_offset_;               ///< last read offset
155     char* reada_buffer_;                ///< Read ahead buffer
156     int32_t reada_buf_len_;             ///< Read ahead buffer length
157     int64_t reada_base_;                ///< Read ahead base offset
158     int32_t sequential_ratio_;          ///< Sequential read ratio
159     int64_t last_read_offset_; 

接上:

164     LocatedBlock lcblock;
165     ChunkServer_Stub* chunk_server = NULL;
166     std::string cs_addr;
167     int64_t block_id;
168     {   
169         MutexLock lock(&mu_, "Pread GetStub", 1000);
170         if (located_blocks_.blocks_.empty()) {
171             return 0; //没有块信息
172         } else if (located_blocks_.blocks_[0].chains_size() == 0) {//没有chunkserver分布信息
173             if (located_blocks_.blocks_[0].block_size() == 0) {//文件大小为0
174                 return 0;
175             } else {
178                 return TIMEOUT;//没有chunkserver存有相应的文件
179             }
180         }
181         lcblock.CopyFrom(located_blocks_.blocks_[0]);
182         if (last_chunkserver_index_ == -1 || !chunkserver_) {
183             const std::string& local_host_name = fs_->local_host_name_;
184             for (int i = 0; i < lcblock.chains_size(); i++) {
185                 std::string addr = lcblock.chains(i).address();
186                 std::string cs_name = std::string(addr, 0, addr.find_last_of(':'));
187                 if (cs_name == local_host_name) {
188                     last_chunkserver_index_ = i;
189                     cs_addr = lcblock.chains(i).address();
190                     break;              
191                 }
192             }
193             if (last_chunkserver_index_ == -1) {
194                 int server_index = rand() % lcblock.chains_size();
195                 cs_addr = lcblock.chains(server_index).address();
196                 last_chunkserver_index_ = server_index;
197             }
198             fs_->rpc_client_->GetStub(cs_addr, &chunkserver_);
199         }
200         chunk_server = chunkserver_;
201         block_id = lcblock.block_id();
202     }

以上的实现逻辑也比较简单,对于从nameserver请求回来的数据,文件大小为0的情况,文件大小不为0但不在任何一个chunkserver服务上的情况,再或者空的block元数据信息等,都要处理了一遍;
然后根据chunkserver的地址,获取跟这个chunkserver的通信通道;这里从第一个开始查找,如果是本地地址的话直接与此通信,否则随机一个chunkserver地址;

接上:

204     ReadBlockRequest request;
205     ReadBlockResponse response;
206     request.set_sequence_id(common::timer::get_micros());
207     request.set_block_id(block_id);
208     request.set_offset(offset);
209     int32_t rlen = read_len;
210     if (sequential_ratio_ > 2
211         && reada
212         && read_len < FLAGS_sdk_file_reada_len) {
213         rlen = std::min(static_cast<int64_t>(FLAGS_sdk_file_reada_len),
214                         static_cast<int64_t>(sequential_ratio_) * read_len);
217     }
218     request.set_read_len(rlen);
219     bool ret = false;
221     for (int retry_times = 0; retry_times < lcblock.chains_size() * 2; retry_times++) {
222         LOG(DEBUG, "Start Pread: %s", cs_addr.c_str());
223         ret = fs_->rpc_client_->SendRequest(chunk_server, &ChunkServer_Stub::ReadBlock,
224                     &request, &response, 15, 3);
225 
226         if (!ret || response.status() != kOK) {
227             cs_addr = lcblock.chains((++last_chunkserver_index_) % lcblock.chains_size()).addres    s();
229             {
230                 MutexLock lock(&mu_, "Pread change chunkserver", 1000);
231                 if (chunk_server != chunkserver_) {
232                     chunk_server = chunkserver_;
233                 } else {
234                     bad_chunkservers_.insert(chunk_server);
235                     fs_->rpc_client_->GetStub(cs_addr, &chunk_server);
236                     chunkserver_ = chunk_server;
237                 }
238             }
239         } else {
240             break;
241         }
242     }
244     if (!ret || response.status() != kOK) {//错误处理
246         if (!ret) {
247             return TIMEOUT;
248         } else {
249             return GetErrorCode(response.status());
250         }
251     }

接着,设置要读的blockid,偏移量,和长度,这里长度会根据sequential_ratio_动态调整;接着请求ChunkServerImpl::ReadBlock读,如果不成功则会根据上一次连接到的chunkserver_index,选择下一个chunkserver,并建立通道;并把坏的chunkserver放到bad_chunkservers_;其实这里可能会在下一次还会选择到坏的,代码中并没有使用到bad_chunkservers_
最后:

255     int32_t ret_len = response.databuf().size();
256     if (read_len < ret_len) {//缓存多余的至本地
257         MutexLock lock(&mu_, "Pread fill buffer", 1000);
258         int32_t cache_len = ret_len - read_len;
259         if (cache_len > reada_buf_len_) {//处理cache内存不够情况
260             delete[] reada_buffer_;
261             reada_buffer_ = new char[cache_len];
262         }
263         reada_buf_len_ = cache_len;
264         memcpy(reada_buffer_, response.databuf().data() + read_len, cache_len);//缓存数据
265         reada_base_ = offset + read_len;//记录下一次从缓存中读的开始位置(相对于```read_offset_```)
266         ret_len = read_len;
267     }
269     memcpy(buf, response.databuf().data(), ret_len);
270     return ret_len;
271 }

从chunkserver读到内容后,如果读完则直接拷贝到buffer中,否则需要缓存一部分在本地,供下次读时直接从本地缓冲读取;

下面分析ChunkServerImpl::ReadBlock的实现:

634 void ChunkServerImpl::ReadBlock(...) {
638     int64_t block_id = request->block_id();
639     int64_t offset = request->offset();
640     int32_t read_len = request->read_len();
641     //more code...

656     StatusCode status = kOK;
659     Block* block = block_manager_->FindBlock(block_id);
660     if (block == NULL) {
661         status = kCsNotFound;
664     } else {
666         char* buf = new char[read_len];
667         int64_t len = block->Read(buf, read_len, offset);
669         if (len >= 0) {
670             response->mutable_databuf()->assign(buf, len);
671             //log 
681         } else {
682             status = kReadError;
685         }
686         delete[] buf;
687     }
688     response->set_status(status);
689     done->Run();
690     if (block) {
691         block->DecRef();
692     }
693 }

以上在chunkserver上定位block的实现比较简单,从block管理器中查找到block后,进行读数据:

332 Block* BlockManager::FindBlock(int64_t block_id) {
334     MutexLock lock(&mu_, "BlockManger::Find", 1000);
335     auto it = block_map_.find(block_id);
336     if (it == block_map_.end()) {
338         return NULL;
339     }   
340     Block* block = it->second;
341     // for user
342     block->AddRef();
343     return block;
344 }

具体怎么从block上读数据的,这部分不分析,后面会分析block组织时会详细分析,以上是整个从bfs读文件的实现方式,有些细节后面补充。

总的来说,读文件还是挺简单的,sdk与nameserver请求文件的信息,包括blockid,长度,和存储在chunkserver的位置信息,然后与chunkserver通信,请求文件具体数据,如果本地每次全部读完,则可能会动态调整一个比较,然后请求更多的数据,一部分到本地缓存供下次的读取;如果某个chunkserver返回错误,则会选择下一个chunkserver并重复做上面的事情;这里动态调整并没有根据网络状态或者chunkserver负载情况;

这里是否可以参考下ceph的实现,把文件按照固定大小,根据一定的策略,分散到不同的server上存储?可以多思考下;

最后对于出错时,并没有把下载到一半的文件给删除;下一篇分析下几个关键的类。

相关文章

网友评论

      本文标题:百度文件系统bfs源码分析系列(五)

      本文链接:https://www.haomeiwen.com/subject/fvafyqtx.html