美文网首页
百度文件系统bfs源码分析系列(六)

百度文件系统bfs源码分析系列(六)

作者: fooboo | 来源:发表于2019-02-24 21:20 被阅读5次

    这部分主要是分析几个类的实现及作用,会在分析时标注是哪个进程下的,因为可能会有相同的类名但不同的实现。
    在nameserver启动时,会在nameserver构造函数中会初始化一些数据成员,部分代码如下:

      59 NameServerImpl::NameServerImpl(Sync* sync) {
      60     //more init code...
      64     block_mapping_manager_ = new BlockMappingManager(FLAGS_blockmapping_bucket_num);
      69     chunkserver_manager_ = new ChunkServerManager(work_thread_pool_, block_mapping_manager_);
      70     namespace_ = new NameSpace(false);
      74     if (sync_) {
      75         //ha
      84     }
      85     CheckLeader();
      88 }
    
     20 BlockMappingManager::BlockMappingManager(int32_t bucket_num) :
     21     blockmapping_bucket_num_(bucket_num) {
     22     thread_pool_ = new ThreadPool(FLAGS_blockmapping_working_thread_num);
     23     block_mapping_.resize(blockmapping_bucket_num_);
     24     for (size_t i = 0; i < block_mapping_.size(); i++) {
     25         block_mapping_[i] = new BlockMapping(thread_pool_);
     26     }
     28 }
    
    107 ChunkServerManager::ChunkServerManager(ThreadPool* thread_pool, BlockMappingManager* block_mapping_manager)             
    108     : thread_pool_(thread_pool),
    109       block_mapping_manager_(block_mapping_manager),
    110       chunkserver_num_(0),
    111       next_chunkserver_id_(1) {
    113     thread_pool_->AddTask(std::bind(&ChunkServerManager::DeadCheck, this));
    114     thread_pool_->AddTask(std::bind(&ChunkServerManager::LogStats, this));
    117     //set params_ code...
    123 }
    
     35 NameSpace::NameSpace(bool standalone): version_(0), last_entry_id_(1),
     36     block_id_upbound_(1), next_block_id_(1) {
     37     leveldb::Options options;
     38     options.create_if_missing = true;
     39     db_cache_ = leveldb::NewLRUCache(FLAGS_namedb_cache_size * 1024L * 1024L);
     40     options.block_cache = db_cache_;
     41     leveldb::Status s = leveldb::DB::Open(options, FLAGS_namedb_path, &db_);
     42     if (!s.ok()) {
     43         db_ = NULL;
     45         exit(EXIT_FAILURE);
     46     }
     47     if (standalone) {
     48         Activate(NULL, NULL);
     49     }
     50 }
    

    以上是三个比较重要的类,从类名可以看出,一个是block管理器,一个是chunkserver管理器,一个是元数据管理器,具体作用和实现接着分析;

      93 void NameServerImpl::CheckLeader() {
      94     if (!sync_ || sync_->IsLeader()) {
      96         NameServerLog log;
      97         std::function<void (const FileInfo&)> task =
      98             std::bind(&NameServerImpl::RebuildBlockMapCallback, this, std::placeholders::_1);
      99         namespace_->Activate(task, &log);
     100         if (!LogRemote(log, std::function<void (bool)>())) {
     102         }
     103         recover_timeout_ = FLAGS_nameserver_start_recover_timeout;
     105         work_thread_pool_->DelayTask(1000, std::bind(&NameServerImpl::CheckRecoverMode, this));
     106         is_leader_ = true;
     107     } else {
     108         is_leader_ = false;
     109         work_thread_pool_->DelayTask(100, std::bind(&NameServerImpl::CheckLeader, this));
     111     }
     112 }
    

    在nameserver启动后,待前面的都初始化好后进行CheckLeader,如果有ha功能且选举成功则进行ActivateLogRemote

     52 void NameSpace::Activate(std::function<void (const FileInfo&)> callback, NameServerLog* log) {
     53     std::string version_key(8, 0);
     54     version_key.append("version");
     55     std::string version_str;
     56     leveldb::Status s = db_->Get(leveldb::ReadOptions(), version_key, &version_str);
     57     if (s.ok()) {//读取版本号
     58         if (version_str.size() != sizeof(int64_t)) {
     60         }
     61         version_ = *(reinterpret_cast<int64_t*>(&version_str[0]));
     63     } else {//更新
     64         version_ = common::timer::get_micros();
     65         version_str.resize(8);
     66         *(reinterpret_cast<int64_t*>(&version_str[0])) = version_;
     67 
     68         leveldb::Status s = db_->Put(leveldb::WriteOptions(), version_key, version_str);
     69         if (!s.ok()) {
     71         }
     72         EncodeLog(log, kSyncWrite, version_key, version_str);
     74     }
     75     SetupRoot();//初始化root_path_
     76     RebuildBlockMap(callback);
     77     InitBlockIdUpbound(log);//初始化block_id_upbound_
     78 }
    
    675 bool NameSpace::RebuildBlockMap(std::function<void (const FileInfo&)> callback) {
    679     std::set<int64_t> entry_id_set;
    680     entry_id_set.insert(root_path_.entry_id());
    681     leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
    682     for (it->Seek(std::string(7, '\0') + '\1'); it->Valid(); it->Next()) {
    683         FileInfo file_info;
    684         bool ret = file_info.ParseFromArray(it->value().data(), it->value().size());
    685         assert(ret);
    686         if (last_entry_id_ < file_info.entry_id()) {
    687             last_entry_id_ = file_info.entry_id();
    688         }
    689         FileType file_type = GetFileType(file_info.type());
    690         if (file_type == kDefault) {
    691             //a file
    692             for (int i = 0; i < file_info.blocks_size(); i++) {
    693                 if (file_info.blocks(i) >= next_block_id_) {
    694                     next_block_id_ = file_info.blocks(i) + 1;
    695                     block_id_upbound_ = next_block_id_;
    696                 }
    698             }
    700             if (callback) {
    701                 callback(file_info);
    702             }
    703         } else if (file_type == kSymlink) {
    705         } else {
    706             entry_id_set.insert(file_info.entry_id());
    707         }
    708     }
    709     //more code...
    730     delete it;
    731     return true;
    732 }
    

    以上从namespace中初始化块的元数据,主要是从leveldb中读取块的数据,并更新next_block_id_/block_id_upbound_/last_entry_id_几个重要的字段,并进行RebuildBlockMapCallback回调;而entry_id_set保存的是目录,并在后面再次遍历leveldb中的元数据,查找出哪些是已经删除的目录在还存在leveldb中:

    714         for (it->Seek(std::string(7, '\0') + '\1'); it->Valid(); it->Next()) {
    715             FileInfo file_info;
    716             bool ret = file_info.ParseFromArray(it->value().data(), it->value().size());
    717             assert(ret);
    718             int64_t parent_entry_id = 0;
    719             std::string filename;
    720             DecodingStoreKey(it->key().ToString(), &parent_entry_id, &filename);
    721             if (entry_id_set.find(parent_entry_id) == entry_id_set.end()) {
    722                 //Orphan entry PE
    726             }
    727         }
    

    InitBlockIdUpbound做的事情也比较简单,简单的说就是预先自增一定的大小,避免每次都加1并更新leveldb,后面使用next_block_id_时可以暂时不用管block_id_upbound_

    元数据声明如下:

      4 message FileInfo {
      5     optional int64 entry_id = 1;
      6     optional int64 version = 2;
      7     optional int32 type = 3 [default = 0755];
      8     repeated int64 blocks = 4;
      9     optional uint32 ctime = 5;
     10     optional string name = 6;
     11     optional int64 size = 7;
     12     optional int32 replicas = 8;
     13     optional int64 parent_entry_id = 9;
     14     optional int32 owner = 10;
     15     repeated string cs_addrs = 11;
     16     optional string sym_link = 12;
     17 }
    

    RebuildBlockMapCallback则会根据块的block_id,副本数,版本号(seq),大小去创建block数据:

    105 void BlockMapping::RebuildBlock(int64_t block_id, int32_t replica,
    106                                 int64_t version, int64_t size) {
    107     NSBlock* nsblock = NULL;
    108     nsblock = new NSBlock(block_id, replica, version, size);
    109     if (size) {//新建的时候会这样,后面由chunkserver同步过来
    110         nsblock->recover_stat = kLost;
    111         lost_blocks_.insert(block_id);
    112     } else {
    113         nsblock->recover_stat = kBlockWriting;
    114     }
    115     
    116     if (version < 0) {        
    118     } else {
    120     }                                         
    121     
    123     MutexLock lock(&mu_);
    125     std::pair<NSBlockMap::iterator, bool> ret =
    126         block_map_.insert(std::make_pair(block_id, nsblock));
    129 }
    
     23 struct NSBlock {
     24     int64_t id;
     25     int64_t version;
     26     std::set<int32_t> replica; //同步
     27     int64_t block_size;       
     28     uint32_t expect_replica_num; //期望副本数则配置设置
     29     RecoverStat recover_stat;
     30     std::set<int32_t> incomplete_replica; //同步
     31     NSBlock();                                
     32     NSBlock(int64_t block_id, int32_t replica, int64_t version, int64_t size);
     33     bool operator<(const NSBlock &b) const {
     34         return (this->replica.size() >= b.replica.size());
     35     }
     36 };
    
     30 NSBlock::NSBlock(int64_t block_id, int32_t replica,
     31                  int64_t block_version, int64_t block_size)
     32     : id(block_id), version(block_version),
     33       block_size(block_size), expect_replica_num(replica),
     34       recover_stat(block_version < 0 ? kBlockWriting : kNotInRecover) {
     35 }
    

    之后创建和初始化rpc服务这些,包括开启raft等,这些就是nameserer要做的事情,介绍的比较粗糙,其中nameserver下ChunkServerManager模块的一些逻辑会同接下来chunkserver服务一起分析。

    在chunkserver服务启动时,部分代码如下:

     76 ChunkServerImpl::ChunkServerImpl()
     77     : chunkserver_id_(-1),
     78      heartbeat_task_id_(-1),
     79      blockreport_task_id_(-1),
     80      last_report_blockid_(-1),
     81      report_id_(0),
     82      is_first_round_(true),
     83      first_round_report_start_(-1),
     84      service_stop_(false) {
     93     block_manager_ = new BlockManager(FLAGS_block_store_path);
     94     bool s_ret = block_manager_->LoadStorage();
     95     assert(s_ret == true);
     96     rpc_client_ = new RpcClient();
     97     nameserver_ = new NameServerClient(rpc_client_, FLAGS_nameserver_nodes);
     98     heartbeat_thread_->AddTask(std::bind(&ChunkServerImpl::LogStatus, this, true));
     99     heartbeat_thread_->AddTask(std::bind(&ChunkServerImpl::Register, this));
    100 }
    
     36 BlockManager::BlockManager(const std::string& store_path)
     37     : thread_pool_(new ThreadPool(1)), disk_quota_(0), counter_manager_(new DiskCounterManager)     {   
     38     CheckStorePath(store_path);
     39     file_cache_ = new FileCache(FLAGS_chunkserver_file_cache_size);
     41 }
    
     95 void BlockManager::CheckStorePath(const std::string& store_path) {
     96     std::string fsid_str;
     97     struct statfs fs_info;
     98     std::string home_fs;
     99     if (statfs("/home", &fs_info) == 0) {
    100         home_fs.assign((const char*)&fs_info.f_fsid, sizeof(fs_info.f_fsid));
    101     } else if (statfs("/", &fs_info) == 0) {
    103         home_fs.assign((const char*)&fs_info.f_fsid, sizeof(fs_info.f_fsid));
    104     } else {
    106     }
    107     
    108     std::vector<std::string> store_path_list;
    109     common::SplitString(store_path, ",", &store_path_list);
    110     //more code...处理每一个store_path_list中的空格并去重
    117     std::sort(store_path_list.begin(), store_path_list.end());
    118     auto it = std::unique(store_path_list.begin(), store_path_list.end());
    119     store_path_list.resize(std::distance(store_path_list.begin(), it));
    120 
    121     std::set<std::string> fsids;
    123     for (uint32_t i = 0; i < store_path_list.size(); ++i) {
    124         std::string& disk_path = store_path_list[i];
    125         int stat_ret = statfs(disk_path.c_str(), &fs_info);
    126         std::string fs_tmp((const char*)&fs_info.f_fsid, sizeof(fs_info.f_fsid));
    127         if (stat_ret != 0 ||
    128             (!FLAGS_chunkserver_multi_path_on_one_disk && fsids.find(fs_tmp) != fsids.end()) ||
    129             (!FLAGS_chunkserver_use_root_partition && fs_tmp == home_fs)) {
    130             // statfs failed
    131             // do not allow multi data path on the same disk
    132             // do not allow using root as data path
    133             if (stat_ret != 0) {
    136             } else {
    138             }
    139             store_path_list[i] = store_path_list[store_path_list.size() - 1];
    140             store_path_list.resize(store_path_list.size() - 1);
    141             --i;
    142         } else {
    143             int64_t disk_size = fs_info.f_blocks * fs_info.f_bsize;
    144             int64_t user_quota = fs_info.f_bavail * fs_info.f_bsize;
    145             int64_t super_quota = fs_info.f_bfree * fs_info.f_bsize;
    152             Disk* disk = new Disk(store_path_list[i], user_quota);
    153             disks_.push_back(std::make_pair(DiskStat(), disk));
    154             fsids.insert(fs_tmp);
    155         }
    156     }
    158     assert(store_path_list.size() > 0);
    159     CheckChunkserverMeta(store_path_list);
    160 }
    

    以上逻辑还是以目录存储路径去获取磁盘相关的信息,并根据目录和使用情况建立映射关系;CheckChunkserverMeta从leveldb中检查目录版本号;

     1 struct statfs 
     2 { 
     3   long f_type; /* 文件系统类型*/ 
     4   long f_bsize; /* 经过优化的传输块大小*/ 
     5   long f_blocks; /* 文件系统数据块总数*/ 
     6   long f_bfree; /* 可用块数*/
     7   long f_bavail; /* 非超级用户可获取的块数*/ 
     8   long f_files; /* 文件结点总数*/ 
     9   long f_ffree; /* 可用文件结点数*/ 
    10   fsid_t f_fsid; /* 文件系统标识*/ 
    11   long f_namelen; /* 文件名的最大长度*/ 
    12 };
    

    statfs结构中可用空间块数有两种f_bfree和 f_bavail,前者是硬盘所有剩余空间,后者为非root用户剩余空间;

    FileCache结构类似leveldb的cache一样实现leveldb/util/cache.cc,对fd和file_path进行了lru缓存,部分实现如下:

     39 common::Cache::Handle* FileCache::FindFile(const std::string& file_path) {
     40     common::Slice key(file_path);
     41     common::Cache::Handle* handle = cache_->Lookup(key);
     42     if (handle == NULL) {
     43         int fd = open(file_path.c_str(), O_RDONLY);
     44         if (fd < 0) {
     46             return NULL;
     47         }
     48         FileEntity* file = new FileEntity;
     49         file->fd = fd;
     50         file->file_name = file_path;
     51         handle = cache_->Insert(key, file, 1, &DeleteEntry);
     52     }
     53     return handle;
     54 }
    
    167 bool BlockManager::LoadStorage() {
    168     bool ret = true;
    169     for (auto it = disks_.begin(); it != disks_.end(); ++it) {
    170         Disk* disk = it->second;
    171         ret = ret && disk->LoadStorage(std::bind(&BlockManager::AddBlock,
    172                                                       this, std::placeholders::_1,
    173                                                       std::placeholders::_2,
    174                                                       std::placeholders::_3));
    175         disk_quota_ += disk->GetQuota();
    176     }
    177     return ret;
    178 }
    
     38 bool Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback) {
     57     //more code...从leveldb中读并检查版本号
     58     leveldb::Iterator* it = metadb_->NewIterator(leveldb::ReadOptions());
     59     for (it->Seek(version_key+'\0'); it->Valid(); it->Next()) {
     60         int64_t block_id = 0;
     61         if (1 != sscanf(it->key().data(), "%ld", &block_id)) {
     63             delete it;
     64             return false;
     65         }
     66         BlockMeta meta;
     67         if (!meta.ParseFromArray(it->value().data(), it->value().size())) {
     69             assert(0); // TODO: fault tolerant
     70         }
     71         // TODO: do not need store_path in meta any more
     72         std::string file_path = meta.store_path() + Block::BuildFilePath(block_id);
     73         if (meta.version() < 0) {
     76             metadb_->Delete(leveldb::WriteOptions(), it->key());
     77             remove(file_path.c_str());
     78             continue;
     79         } else {
     80             struct stat st;
     81             if (stat(file_path.c_str(), &st) ||
     82                 st.st_size != meta.block_size() ||
     83                 access(file_path.c_str(), R_OK)) {
     87                 metadb_->Delete(leveldb::WriteOptions(), it->key());
     88                 remove(file_path.c_str());
     89                 continue;
     90             } else {
     93             }
     94         }
     95         callback(block_id, this, meta);
     97     }
     98     //more code...
    

    根据上面的目录store_path_list读每个BlockMeta块的元数据,其中会处理一些错误的BlockMeta,并回调BlockManager::AddBlock

    325 bool BlockManager::AddBlock(int64_t block_id, Disk* disk, BlockMeta meta) {
    326     Block* block = new Block(meta, disk, file_cache_);
    327     block->AddRef();
    328     MutexLock lock(&mu_);
    329     return block_map_.insert(std::make_pair(block_id, block)).second;
    330 }
    
      4 message BlockMeta {
      5     optional int64 block_id = 1;
      6     optional int64 block_size = 2;
      7     optional int64 checksum = 3;
      8     optional int64 version = 4 [default = -1];
      9     optional string store_path = 5;
     10 }
    
     34 Block::Block(const BlockMeta& meta, Disk* disk, FileCache* file_cache) :
     35   disk_(disk), meta_(meta),
     36   last_seq_(-1), slice_num_(-1), blockbuf_(NULL), buflen_(0),
     37   bufdatalen_(0), disk_writing_(false),
     38   disk_file_size_(meta.block_size()), file_desc_(kNotCreated), refs_(0),
     39   close_cv_(&mu_), is_recover_(false), expected_size_(-1), deleted_(false),
     40   file_cache_(file_cache) {
     41     assert(meta_.block_id() < (1L<<40));
     42     disk_->counters_.data_size.Add(meta.block_size());
     43     disk_file_ = meta.store_path() + BuildFilePath(meta_.block_id());
     44     disk_->counters_.blocks.Inc();
     45     if (meta_.version() >= 0) {
     46         finished_ = true;
     47         recv_window_ = NULL;
     48     } else {
     49         finished_ = false;
     50         recv_window_ = new common::SlidingWindow<Buffer>(100,
     51                        std::bind(&Block::WriteCallback, this,
     52                        std::placeholders::_1, std::placeholders::_2));
     53     }
     54 }
    

    其中chunkserver中的Block类是比较复杂的,代表真实的数据信息,且在加载的时候,根据version表示是否已完成,否则会创建一个滑动窗口;

    最后ChunkServerImpl::Register

    135 void ChunkServerImpl::Register() {
    136     RegisterRequest request;
    137     request.set_chunkserver_addr(data_server_addr_);
    138     request.set_disk_quota(block_manager_->DiskQuota());
    139     request.set_namespace_version(block_manager_->NamespaceVersion());
    140     request.set_tag(FLAGS_chunkserver_tag);
    143     RegisterResponse response;
    144     if (!nameserver_->SendRequest(&NameServer_Stub::Register, &request, &response, 20)) {
    146         work_thread_pool_->DelayTask(5000, std::bind(&ChunkServerImpl::Register, this));
    147         return;
    148     }
    149     //more code...
    160     int64_t new_version = response.namespace_version();
    161     if (block_manager_->NamespaceVersion() != new_version) {
    162         // NameSpace change
    163         if (!FLAGS_chunkserver_auto_clean) {
    164             /// abort
    166         }
    167         LOG(INFO, "Use new namespace version: %ld, clean local data", new_version);
    168         // Clean
    169         if (!block_manager_->CleanUp(new_version)) {
    171         }
    172         if (!block_manager_->SetNamespaceVersion(new_version)) {
    174         }
    175         work_thread_pool_->AddTask(std::bind(&ChunkServerImpl::Register, this));
    176         return;
    177     }
    179     //more code...
    190 }
    

    以上注册到nameserver服务上,会带上当前磁盘使用情况和版本号,后面收到响应后会做些处理,比如版本号不一致,需要重重置一些数据:

    310 bool BlockManager::CleanUp(int64_t namespace_version) {
    311     for (auto it = block_map_.begin(); it != block_map_.end();) {
    312         Block* block = it->second;
    313         if (block->CleanUp(namespace_version)) {
    314             file_cache_->EraseFileCache(block->GetFilePath());
    315             block->DecRef(); 
    316             block_map_.erase(it++);
    317         } else {
    318             ++it;
    319         }
    320     }       
    322     return true;
    323 }
    
    140 bool Block::CleanUp(int64_t namespace_version) {
    141     if (namespace_version != disk_->NamespaceVersion()) {
    142         SetDeleted();
    143         return true;
    144     }       
    145     return false;
    146 }
    
    148 StatusCode Block::SetDeleted() {
    149     disk_->RemoveBlockMeta(meta_.block_id());
    150     int deleted = common::atomic_swap(&deleted_, 1);
    151     if (deleted != 0) {
    152         return kCsNotFound;
    153     }
    154     return kOK;
    155 }
    

    剩下的字段作用后面再说,其中chunkserver_id_是nameserver服务上分析的id,屏蔽了具体ip/port等信息,接着SendBlockReport /SendHeartbeat,后面再分析,回到nameserver上收到RegisterRequest消息的处理:

     170 void NameServerImpl::Register(::google::protobuf::RpcController* controller,
     171                               const ::baidu::bfs::RegisterRequest* request,
     172                               ::baidu::bfs::RegisterResponse* response,
     173                               ::google::protobuf::Closure* done) {
     174     if (!is_leader_) {
     175         //send to leader
     177         return;
     178     }
     179     sofa::pbrpc::RpcController* sofa_cntl =
     180         reinterpret_cast<sofa::pbrpc::RpcController*>(controller);
     181     const std::string& address = request->chunkserver_addr();
     182     std::string port = address.substr(address.find(':'));
     183     std::string ip_address = sofa_cntl->RemoteAddress();
     184     ip_address = ip_address.substr(0, ip_address.find(':')) + port;
     186     int64_t version = request->namespace_version();
     187     if (version != namespace_->Version()) {
     190         chunkserver_manager_->RemoveChunkServer(address);
     191     } else {
     193         if (chunkserver_manager_->HandleRegister(ip_address, request, response)) {
     194             LeaveReadOnly();
     195         }
     196     }
     197     response->set_namespace_version(namespace_->Version());
     198     done->Run();
     199 }
    

    当nameserver收到chunkserver注册信息时,发现版本号不一致,那么会chunkserver_manager_->RemoveChunkServer(address)会处理该chunkserver状态:

    166 bool ChunkServerManager::RemoveChunkServer(const std::string& addr) {
    167     MutexLock lock(&mu_, "RemoveChunkServer", 10);
    168     std::map<std::string, int32_t>::iterator it = address_map_.find(addr);
    169     if (it == address_map_.end()) {
    170         return false;
    171     }
    172     ChunkServerInfo* cs_info = NULL;
    173     bool ret = GetChunkServerPtr(it->second, &cs_info);
    174     assert(ret);
    175     if (cs_info->status() == kCsActive) {//如果在活跃状态
    176         cs_info->set_status(kCsWaitClean);
    177         std::function<void ()> task =
    178             std::bind(&ChunkServerManager::CleanChunkServer,
    179                         this, cs_info, std::string("Dead"));
    180         thread_pool_->AddTask(task);
    181     }
    182     return true;
    183 }
    
    125 void ChunkServerManager::CleanChunkServer(ChunkServerInfo* cs, const std::string& reason) {
    126     int32_t id = cs->id();
    127     MutexLock lock(&mu_, "CleanChunkServer", 10);
    128     chunkserver_num_--;
    129     auto it = block_map_.find(id);
    130     assert(it != block_map_.end());
    131     std::set<int64_t> blocks;
    132     it->second->CleanUp(&blocks);
    135     cs->set_status(kCsCleaning);
    136     mu_.Unlock();
    137     block_mapping_manager_->DealWithDeadNode(id, blocks);
    138     mu_.Lock("CleanChunkServerRelock", 10);
    139     cs->set_w_qps(0);
    140     cs->set_w_speed(0);
    141     cs->set_r_qps(0);
    142     cs->set_r_speed(0);
    143     cs->set_recover_speed(0);
    144     if (std::find(chunkservers_to_offline_.begin(),
    145                   chunkservers_to_offline_.end(),
    146                   cs->ipaddress()) == chunkservers_to_offline_.end()) {
    147         if (cs->is_dead()) {
    148             cs->set_status(kCsOffLine);
    149         } else {
    150             cs->set_status(kCsStandby);
    151         }
    152     } else {
    153         cs->set_status(kCsReadonly);
    154     }
    155 }
    
    607 void BlockMapping::DealWithDeadNode(int32_t cs_id, const std::set<int64_t>& blocks) {
    608     for (std::set<int64_t>::iterator it = blocks.begin(); it != blocks.end(); ++it) {
    609         MutexLock lock(&mu_);
    610         DealWithDeadBlockInternal(cs_id, *it);
    611     }
    612     MutexLock lock(&mu_);
    613     NSBlock* block = NULL;
    614     for (std::set<int64_t>::iterator it = hi_recover_check_[cs_id].begin();
    615             it != hi_recover_check_[cs_id].end(); ++it) {
    616         if (!GetBlockPtr(*it, &block)) { //没有找到
    618         } else {
    619             block->recover_stat = kNotInRecover;
    620         }
    621     }
    622     hi_recover_check_.erase(cs_id);
    623     for (std::set<int64_t>::iterator it = lo_recover_check_[cs_id].begin();
    624             it != lo_recover_check_[cs_id].end(); ++it) {
    625         if (!GetBlockPtr(*it, &block)) { //没有找到
    627         } else {
    628             block->recover_stat = kNotInRecover;
    629         }
    630     }
    631     lo_recover_check_.erase(cs_id);
    632 }
    

    如果chunkserver服务的版本号和namserver相同,则chunkserver_manager_->HandleRegister

    227 bool ChunkServerManager::HandleRegister(const std::string& ip,
    228                                         const RegisterRequest* request,
    229                                         RegisterResponse* response) {
    230     const std::string& address = request->chunkserver_addr();
    231     StatusCode status = kOK;
    232     int cs_id = -1;
    233     MutexLock lock(&mu_, "HandleRegister", 10);
    234     std::map<std::string, int32_t>::iterator it = address_map_.find(address);
    235     if (it == address_map_.end()) {
    236         cs_id = AddChunkServer(request->chunkserver_addr(), ip,
    237                                request->tag(), request->disk_quota());//加入新的chunkserver节点
    238         assert(cs_id >= 0);
    239         response->set_chunkserver_id(cs_id);
    240     } else {
    241         cs_id = it->second;
    242         ChunkServerInfo* cs_info;
    243         bool ret = GetChunkServerPtr(cs_id, &cs_info);
    244         assert(ret);
    245         if (cs_info->status() == kCsWaitClean || cs_info->status() == kCsCleaning) {
    246             status = kNotOK;
    249         } else {
    250             UpdateChunkServer(cs_id, request->tag(), request->disk_quota());
    251             auto it = block_map_.find(cs_id);
    252             assert(it != block_map_.end());
    253             it->second->MoveNew();
    254             response->set_report_id(it->second->GetReportId());
    257         }
    258     }
    259     response->set_chunkserver_id(cs_id);
    262     response->set_status(status);
    263     return chunkserver_num_ >= FLAGS_expect_chunkserver_num;
    264 }
    
    561 int32_t ChunkServerManager::AddChunkServer(const std::string& address,
    562                                            const std::string& ipaddress,
    563                                            const std::string& tag,
    564                                            int64_t quota) {
    565     mu_.AssertHeld();                      
    566     ChunkServerInfo* info = new ChunkServerInfo;
    567     int32_t id = next_chunkserver_id_++; //自增分配
    568     char buf[20];
    569     common::timer::now_time_str(buf, 20, common::timer::kMin);
    570     info->set_start_time(std::string(buf));
    571     info->set_id(id);
    572     //more code...
    575     info->set_disk_quota(quota);
    576     if (std::find(chunkservers_to_offline_.begin(), chunkservers_to_offline_.end(),
    577                 address) != chunkservers_to_offline_.end()) {
    578         info->set_status(kCsReadonly);
    579     } else {
    580         info->set_status(kCsActive);
    581     }
    582     info->set_kick(false);
    583     std::string host = address.substr(0, address.find(':'));
    584     std::string ip = ipaddress.substr(0, ipaddress.find(':'));
    585     //more code...
    592     chunkservers_[id] = info;
    593     address_map_[address] = id;
    594     int32_t now_time = common::timer::now_time();
    595     heartbeat_list_[now_time].insert(info);//加入检测保活
    596     info->set_last_heartbeat(now_time);
    597     ++chunkserver_num_;
    598     Blocks* blocks = new Blocks(id);
    599     block_map_.insert(std::make_pair(id, blocks));
    600     return id;
    601 }
    
     130 void NameServerImpl::LeaveReadOnly() {
     132     if (readonly_) {
     133         readonly_ = false;
     134     }
     135 }
    

    当chunkserver注册到nameserver的处于正常节点的数量不足配置时chunkserver_num_ >= FLAGS_expect_chunkserver_num,只能提供读;后面会结合具体例子来具体分析chunkserver节点状态的变化和block在什么情况下发生:

     39 enum ChunkServerStatus {//chunkserver节点状态
     40     kCsActive = 101;
     41     kCsWaitClean = 102;
     42     kCsCleaning = 103;
     43     kCsOffLine = 104;
     44     kCsStandby = 105;
     45     kCsReadonly = 106;
     46 }
    
     48 enum RecoverStat {//block块状态
     49     kNotInRecover = 0;
     50     kLoRecover = 1;
     51     kHiRecover = 2;
     52     kCheck = 3;
     53     kIncomplete = 4;
     54     kLost = 5;
     55     kBlockWriting = 6;
     56     kAny = 20;
     57 }
    

    接着回到chunkserver服务下,在ChunkServerImpl::Register中,后面发送RegisterRequest后,即上面分析的:

    135 void ChunkServerImpl::Register() {
    136     //more code...
    179     chunkserver_id_ = response.chunkserver_id();
    180     report_id_ = response.report_id() + 1;
    181     first_round_report_start_ = last_report_blockid_;
    182     is_first_round_ = true;
    188     work_thread_pool_->DelayTask(1, std::bind(&ChunkServerImpl::SendBlockReport, this));
    189     heartbeat_thread_->DelayTask(1, std::bind(&ChunkServerImpl::SendHeartbeat, this));
    190 }
    

    上面部分涉及到同步块的信息和心跳相关的,会在下面一篇中分析,并会举例分析状态的变化。

    相关文章

      网友评论

          本文标题:百度文件系统bfs源码分析系列(六)

          本文链接:https://www.haomeiwen.com/subject/sgclyqtx.html