之前几篇都是分析的写过程,但有些细节没有再说明,只是分析大概流程,像中间一些类的实现和作用,对于异常的情况处理,数据到chunkserver后怎么落地和更新等等,后续分析主要围绕这些来,这篇分析下读的过程,相比写简单些。
读流程
还是从具体使用方法说起,BfsGet
从bfs上下载某个文件至本地:
153 int BfsGet(baidu::bfs::FS* fs, int argc, char* argv[]) {
154 //more code...
177 baidu::bfs::File* file;
178 if (fs->OpenFile(source.c_str(), O_RDONLY, &file, baidu::bfs::ReadOptions()) != 0) {
180 return 1;
181 }
182
183 FILE* fp = fopen(target.c_str(), "wb");
184 if (fp == NULL) {
186 delete file;
187 return -1;
188 }
189 char buf[10240];
190 int64_t bytes = 0;
191 int32_t len = 0;
192 int32_t ret = 0;
193 while (1) {
194 len = file->Read(buf, sizeof(buf));
195 if (len <= 0) {
196 if (len < 0) {
198 ret = 1;
199 }
200 break;
201 }
202 bytes += len;
203 fwrite(buf, len, 1, fp);
204 }
206 delete file;
207 fclose(fp);
208 return ret;
209 }
在这个接口中,会对输入的参数处理,比如source是否是目录,对target判断是否是目录或文件路径等情况;然后打开bfs上的文件,假设成功,接着判断target能否打开,然后开始进行写本地文件;
以读打开bfs文件和写bfs文件不一样的:
363 int32_t FSImpl::OpenFile(const char* path, int32_t flags, File** file, const ReadOptions& option s) {
364 if (flags != O_RDONLY) {
365 return BAD_PARAMETER;
366 }
368 int32_t ret = OK;
369 *file = NULL;
370
371 FileLocationRequest request;
372 FileLocationResponse response;
373 request.set_file_name(path);
374 request.set_sequence_id(0);
375 bool rpc_ret = nameserver_client_->SendRequest(&NameServer_Stub::GetFileLocation,
376 &request, &response, 15, 1);
377 if (rpc_ret && response.status() == kOK) {
378 FileImpl* f = new FileImpl(this, rpc_client_, path, flags, options);
379 f->located_blocks_.CopyFrom(response.blocks());
380 *file = new FileImplWrapper(f);
381 } else {
383 //error...
388 }
389 return ret;
390 }
这里其实是sdk向nameserver查询source文件相关的信息分布情况,考虑只是文件的情形:
656 void NameServerImpl::GetFileLocation(::google::protobuf::RpcController* controller,
657 const FileLocationRequest* request,
658 FileLocationResponse* response,
659 ::google::protobuf::Closure* done) {
660 if (!is_leader_) {
661 //重定向到leader
663 return;
664 }
665 response->set_sequence_id(request->sequence_id());
666 std::string path = NameSpace::NormalizePath(request->file_name());
670
671 FileInfo info;
672 FileLockGuard file_lock_guard(new ReadLock(path));
673 if (!namespace_->GetFileInfo(path, &info)) {//获取文件所关联的信息
677 response->set_status(kNsNotFound);
678 } else {
679 for (int i = 0; i < info.blocks_size(); i++) {
680 int64_t block_id = info.blocks(i);
681 std::vector<int32_t> replica;
682 int64_t block_size = 0;
683 RecoverStat rs;
684 if (!block_mapping_manager_->GetLocatedBlock(block_id, &replica, &block_size, &rs)) {
686 break;
687 }
688 LocatedBlock* lcblock = response->add_blocks();
689 lcblock->set_block_id(block_id);
690 lcblock->set_block_size(block_size);
691 lcblock->set_status(rs);
692 for (uint32_t i = 0; i < replica.size(); i++) {
693 int32_t server_id = replica[i];
694 std::string addr = chunkserver_manager_->GetChunkServerAddr(server_id);
695 if (addr == "") {
697 continue;
698 }
700 ChunkServerInfo* cs_info = lcblock->add_chains();
701 cs_info->set_address(addr);
702 }
705 }
707 response->set_status(kOK);
708 }
709 done->Run();
710 }
nameserver收到请求后,根据文件名查找相关的blockid信息(写文件时会对每个文件编码),这里只有一个block,然后block_mapping_manager_->GetLocatedBlock
来获取blockid所表示文件的分布信息,即chunkserver的地址(这里每个chunkserver在nameserver那边Register时由nameserver分配的自增id,后面分析下,这里可能会有问题):
603 std::string ChunkServerManager::GetChunkServerAddr(int32_t id) {
604 MutexLock lock(&mu_, "GetChunkServerAddr", 10);
605 ChunkServerInfo* cs = NULL;
606 if (GetChunkServerPtr(id, &cs) && !cs->is_dead()) {
607 return cs->ipaddress();
608 }
609 return "";
610 }
如果该chunkserver状态健康的话则会发给sdk那边;sdk会收到该文件所对应的基本信息有:文件所表示的blockid,文件大小,状态,和分布于哪些chunkserver服务上;
回到sdk后,会创建FileImplWrapper
对象,之后开始从bfs读文件,并写入本地,部分代码如下:
183 FILE* fp = fopen(target.c_str(), "wb");
193 while (1) {
194 len = file->Read(buf, sizeof(buf));
195 if (len <= 0) {
196 if (len < 0) {
198 ret = 1;
199 }
200 break;
201 }
202 bytes += len;
203 fwrite(buf, len, 1, fp); //写target
204 }
299 int32_t FileImpl::Read(char* buf, int32_t read_len) {
302 if (open_flags_ != O_RDONLY) {
303 return BAD_PARAMETER;
304 }
305 MutexLock lock(&read_offset_mu_);
306 int32_t ret = Pread(buf, read_len, read_offset_, true);
308 if (ret >= 0) {
309 read_offset_ += ret;
310 }
311 return ret;
312 }
其中read_offset_
记录上次读到的偏移量;
139 int32_t FileImpl::Pread(char* buf, int32_t read_len, int64_t offset, bool reada) {
140 if (read_len <= 0 || buf == NULL || offset < 0) {
143 return BAD_PARAMETER;
144 }
145 {
146 MutexLock lock(&mu_, "Pread read buffer", 1000);
147 if (last_read_offset_ == -1
148 || last_read_offset_ != offset) {
149 sequential_ratio_ /= 2;
152 } else {
153 sequential_ratio_++;
154 }
155 last_read_offset_ = offset + read_len;
156 if (reada_buffer_ && reada_base_ <= offset &&
157 reada_base_ + reada_buf_len_ >= offset + read_len) {
158 memcpy(buf, reada_buffer_ + (offset - reada_base_), read_len);
160 return read_len;
161 }
162 }
如果reada_buffer_
指向缓存区域,直接从本地缓存中读并返回,第一次读肯定不会走上面if的逻辑;
其中这六个变量last_read_offset_/reada_buffer_/reada_base_/reada_buf_len_/read_offset_ /sequential_ratio_
分别如下面的注释,具体作用在后面的代码实现中体现:
153 int64_t read_offset_; ///< last read offset
155 char* reada_buffer_; ///< Read ahead buffer
156 int32_t reada_buf_len_; ///< Read ahead buffer length
157 int64_t reada_base_; ///< Read ahead base offset
158 int32_t sequential_ratio_; ///< Sequential read ratio
159 int64_t last_read_offset_;
接上:
164 LocatedBlock lcblock;
165 ChunkServer_Stub* chunk_server = NULL;
166 std::string cs_addr;
167 int64_t block_id;
168 {
169 MutexLock lock(&mu_, "Pread GetStub", 1000);
170 if (located_blocks_.blocks_.empty()) {
171 return 0; //没有块信息
172 } else if (located_blocks_.blocks_[0].chains_size() == 0) {//没有chunkserver分布信息
173 if (located_blocks_.blocks_[0].block_size() == 0) {//文件大小为0
174 return 0;
175 } else {
178 return TIMEOUT;//没有chunkserver存有相应的文件
179 }
180 }
181 lcblock.CopyFrom(located_blocks_.blocks_[0]);
182 if (last_chunkserver_index_ == -1 || !chunkserver_) {
183 const std::string& local_host_name = fs_->local_host_name_;
184 for (int i = 0; i < lcblock.chains_size(); i++) {
185 std::string addr = lcblock.chains(i).address();
186 std::string cs_name = std::string(addr, 0, addr.find_last_of(':'));
187 if (cs_name == local_host_name) {
188 last_chunkserver_index_ = i;
189 cs_addr = lcblock.chains(i).address();
190 break;
191 }
192 }
193 if (last_chunkserver_index_ == -1) {
194 int server_index = rand() % lcblock.chains_size();
195 cs_addr = lcblock.chains(server_index).address();
196 last_chunkserver_index_ = server_index;
197 }
198 fs_->rpc_client_->GetStub(cs_addr, &chunkserver_);
199 }
200 chunk_server = chunkserver_;
201 block_id = lcblock.block_id();
202 }
以上的实现逻辑也比较简单,对于从nameserver请求回来的数据,文件大小为0的情况,文件大小不为0但不在任何一个chunkserver服务上的情况,再或者空的block元数据信息等,都要处理了一遍;
然后根据chunkserver的地址,获取跟这个chunkserver的通信通道;这里从第一个开始查找,如果是本地地址的话直接与此通信,否则随机一个chunkserver地址;
接上:
204 ReadBlockRequest request;
205 ReadBlockResponse response;
206 request.set_sequence_id(common::timer::get_micros());
207 request.set_block_id(block_id);
208 request.set_offset(offset);
209 int32_t rlen = read_len;
210 if (sequential_ratio_ > 2
211 && reada
212 && read_len < FLAGS_sdk_file_reada_len) {
213 rlen = std::min(static_cast<int64_t>(FLAGS_sdk_file_reada_len),
214 static_cast<int64_t>(sequential_ratio_) * read_len);
217 }
218 request.set_read_len(rlen);
219 bool ret = false;
221 for (int retry_times = 0; retry_times < lcblock.chains_size() * 2; retry_times++) {
222 LOG(DEBUG, "Start Pread: %s", cs_addr.c_str());
223 ret = fs_->rpc_client_->SendRequest(chunk_server, &ChunkServer_Stub::ReadBlock,
224 &request, &response, 15, 3);
225
226 if (!ret || response.status() != kOK) {
227 cs_addr = lcblock.chains((++last_chunkserver_index_) % lcblock.chains_size()).addres s();
229 {
230 MutexLock lock(&mu_, "Pread change chunkserver", 1000);
231 if (chunk_server != chunkserver_) {
232 chunk_server = chunkserver_;
233 } else {
234 bad_chunkservers_.insert(chunk_server);
235 fs_->rpc_client_->GetStub(cs_addr, &chunk_server);
236 chunkserver_ = chunk_server;
237 }
238 }
239 } else {
240 break;
241 }
242 }
244 if (!ret || response.status() != kOK) {//错误处理
246 if (!ret) {
247 return TIMEOUT;
248 } else {
249 return GetErrorCode(response.status());
250 }
251 }
接着,设置要读的blockid,偏移量,和长度,这里长度会根据sequential_ratio_
动态调整;接着请求ChunkServerImpl::ReadBlock
读,如果不成功则会根据上一次连接到的chunkserver_index
,选择下一个chunkserver,并建立通道;并把坏的chunkserver放到bad_chunkservers_
;其实这里可能会在下一次还会选择到坏的,代码中并没有使用到bad_chunkservers_
;
最后:
255 int32_t ret_len = response.databuf().size();
256 if (read_len < ret_len) {//缓存多余的至本地
257 MutexLock lock(&mu_, "Pread fill buffer", 1000);
258 int32_t cache_len = ret_len - read_len;
259 if (cache_len > reada_buf_len_) {//处理cache内存不够情况
260 delete[] reada_buffer_;
261 reada_buffer_ = new char[cache_len];
262 }
263 reada_buf_len_ = cache_len;
264 memcpy(reada_buffer_, response.databuf().data() + read_len, cache_len);//缓存数据
265 reada_base_ = offset + read_len;//记录下一次从缓存中读的开始位置(相对于```read_offset_```)
266 ret_len = read_len;
267 }
269 memcpy(buf, response.databuf().data(), ret_len);
270 return ret_len;
271 }
从chunkserver读到内容后,如果读完则直接拷贝到buffer中,否则需要缓存一部分在本地,供下次读时直接从本地缓冲读取;
下面分析ChunkServerImpl::ReadBlock
的实现:
634 void ChunkServerImpl::ReadBlock(...) {
638 int64_t block_id = request->block_id();
639 int64_t offset = request->offset();
640 int32_t read_len = request->read_len();
641 //more code...
656 StatusCode status = kOK;
659 Block* block = block_manager_->FindBlock(block_id);
660 if (block == NULL) {
661 status = kCsNotFound;
664 } else {
666 char* buf = new char[read_len];
667 int64_t len = block->Read(buf, read_len, offset);
669 if (len >= 0) {
670 response->mutable_databuf()->assign(buf, len);
671 //log
681 } else {
682 status = kReadError;
685 }
686 delete[] buf;
687 }
688 response->set_status(status);
689 done->Run();
690 if (block) {
691 block->DecRef();
692 }
693 }
以上在chunkserver上定位block的实现比较简单,从block管理器中查找到block后,进行读数据:
332 Block* BlockManager::FindBlock(int64_t block_id) {
334 MutexLock lock(&mu_, "BlockManger::Find", 1000);
335 auto it = block_map_.find(block_id);
336 if (it == block_map_.end()) {
338 return NULL;
339 }
340 Block* block = it->second;
341 // for user
342 block->AddRef();
343 return block;
344 }
具体怎么从block上读数据的,这部分不分析,后面会分析block组织时会详细分析,以上是整个从bfs读文件的实现方式,有些细节后面补充。
总的来说,读文件还是挺简单的,sdk与nameserver请求文件的信息,包括blockid,长度,和存储在chunkserver的位置信息,然后与chunkserver通信,请求文件具体数据,如果本地每次全部读完,则可能会动态调整一个比较,然后请求更多的数据,一部分到本地缓存供下次的读取;如果某个chunkserver返回错误,则会选择下一个chunkserver并重复做上面的事情;这里动态调整并没有根据网络状态或者chunkserver负载情况;
这里是否可以参考下ceph的实现,把文件按照固定大小,根据一定的策略,分散到不同的server上存储?可以多思考下;
最后对于出错时,并没有把下载到一半的文件给删除;下一篇分析下几个关键的类。
网友评论