目的
梳理 fdb 有哪些 ProcessClass,是怎么启动这些 ProcessClass 的,以及主要的 ProcessClass 的作用。
思路
在 fdb 的实现中,在 WorkerInterfacer.h 文件中有一个 startRole 的函数,在 fdb 启动某个 ProcessClass 的时候都会调用该函数来进行注册。所以我们可以通过该函数来帮助我们梳理fdb的实现。
void startRole(UID roleId, UID workerId, std::string as, std::map<std::string, std::string> details = std::map<std::string, std::string>(), std::string origination = "Recruited");
角色列表
通过直接搜索 startRole,可以得到完整的 ProcessClass 列表有:
- LogRouter
- Resolver
- MasterProxyServer
- StorageServer
- SharedTLog
- MasterServer
- Worker
- TLog
- Tester
- ClusterController
在 ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, LocalityData localities, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, ProcessClass initialClass, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix )
函数中,先根据 getDiskStores
来启动 StorageServer/TLog, 并注册一个 Worker 的角色。之后通过事件循环监听 interf 来决定启动其它角色。例如其中的 MasterServer
loop choose {
when( RecruitMasterRequest req = waitNext(interf.master.getFuture()) ) {
MasterInterface recruited;
recruited.locality = locality;
recruited.initEndpoints();
startRole( recruited.id(), interf.id(), "MasterServer" );
DUMPTOKEN( recruited.waitFailure );
DUMPTOKEN( recruited.getRateInfo );
DUMPTOKEN( recruited.tlogRejoin );
DUMPTOKEN( recruited.changeCoordinators );
DUMPTOKEN( recruited.getCommitVersion );
//printf("Recruited as masterServer\n");
Future<Void> masterProcess = masterServer( recruited, dbInfo, ServerCoordinators( connFile ), req.lifetime );
errorForwarders.add( zombie(recruited, forwardError( errors, "MasterServer", recruited.id(), masterProcess )) );
req.reply.send(recruited);
}
}
流程上:
- ClusterConnectionFile 里面有 coordinator 的地址
- fdbd 方法尝试从 coordiantor 中选择一个选举为 clusterControler,同时
- 会调用 workerServer 方法,这儿 workerServer 会和上一步的 clusterControler 方法共用一个 ClusterControllerFullInterface 变量(之后 clusterControler 会通过其来发送控制消息?)。
- clusterControler 会启动 Master,见方法 clusterWatchDatabase
角色个数
fdb 期望来自动决策“启动哪些角色以及启动多少”,默认的配置在 fdbclient/Knobs.cpp 下。如下
// Configuration
init( DEFAULT_AUTO_PROXIES, 3 );
init( DEFAULT_AUTO_RESOLVERS, 1 );
init( DEFAULT_AUTO_LOGS, 3 );
ClusterControllerData 类中的一些方法,例如 getStorageWorker 根据请求的条件,分配出合适的 worker。在分析机制,有一个 Fitness 的概念来选择最合适的 worker 负责相应的角色。 findWorkersForConfiguration 函数负责从配置中规划整个集群。
vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, int amount, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, Optional<WorkerFitnessInfo> minWorker = Optional<WorkerFitnessInfo>(), bool checkStable = false ) {
std::map<std::pair<ProcessClass::Fitness,int>, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
vector<std::pair<WorkerInterface, ProcessClass>> results;
if (amount <= 0)
return results;
for( auto& it : id_worker ) {
auto fitness = it.second.processClass.machineClassFitness( role );
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.interf.address()) && it.second.interf.locality.dcId() == dcId &&
( !minWorker.present() || ( it.second.interf.id() != minWorker.get().worker.first.id() && ( fitness < minWorker.get().fitness || (fitness == minWorker.get().fitness && id_used[it.first] <= minWorker.get().used ) ) ) ) ) {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].push_back(std::make_pair(it.second.interf, it.second.processClass));
}
}
for( auto& it : fitness_workers ) {
auto& w = it.second;
g_random->randomShuffle(w);
for( int i=0; i < w.size(); i++ ) {
results.push_back(w[i]);
id_used[w[i].first.locality.processId()]++;
if( results.size() == amount )
return results;
}
}
return results;
}
ProcessClass::Fitness ProcessClass::machineClassFitness( ClusterRole role ) const {
switch( role ) {
case ProcessClass::Storage:
switch( _class ) {
case ProcessClass::StorageClass:
return ProcessClass::BestFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::TransactionClass:
return ProcessClass::WorstFit;
case ProcessClass::LogClass:
return ProcessClass::WorstFit;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::NeverAssign;
}
case ProcessClass::TLog:
switch( _class ) {
case ProcessClass::LogClass:
return ProcessClass::BestFit;
case ProcessClass::TransactionClass:
return ProcessClass::GoodFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::StorageClass:
return ProcessClass::WorstFit;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::NeverAssign;
}
case ProcessClass::Proxy:
switch( _class ) {
case ProcessClass::ProxyClass:
return ProcessClass::BestFit;
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::ResolutionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::TransactionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
}
case ProcessClass::Master:
switch( _class ) {
case ProcessClass::MasterClass:
return ProcessClass::BestFit;
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::ResolutionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::TransactionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
}
case ProcessClass::Resolver:
switch( _class ) {
case ProcessClass::ResolutionClass:
return ProcessClass::BestFit;
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::TransactionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
}
case ProcessClass::LogRouter:
switch( _class ) {
case ProcessClass::LogRouterClass:
return ProcessClass::BestFit;
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::ResolutionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::TransactionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
}
case ProcessClass::ClusterController:
switch( _class ) {
case ProcessClass::ClusterControllerClass:
return ProcessClass::BestFit;
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::MasterClass:
return ProcessClass::BestOtherFit;
case ProcessClass::ResolutionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::ProxyClass:
return ProcessClass::BestOtherFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
}
default:
return ProcessClass::NeverAssign;
}
}
std::pair<WorkerInterface, ProcessClass> getStorageWorker( RecruitStorageRequest const& req ) {
std::set<Optional<Standalone<StringRef>>> excludedMachines( req.excludeMachines.begin(), req.excludeMachines.end() );
std::set<Optional<Standalone<StringRef>>> includeDCs( req.includeDCs.begin(), req.includeDCs.end() );
std::set<AddressExclusion> excludedAddresses( req.excludeAddresses.begin(), req.excludeAddresses.end() );
for( auto& it : id_worker )
if( workerAvailable( it.second, false ) &&
!excludedMachines.count(it.second.interf.locality.zoneId()) &&
( includeDCs.size() == 0 || includeDCs.count(it.second.interf.locality.dcId()) ) &&
!addressExcluded(excludedAddresses, it.second.interf.address()) &&
it.second.processClass.machineClassFitness( ProcessClass::Storage ) <= ProcessClass::UnsetFit ) {
return std::make_pair(it.second.interf, it.second.processClass);
}
if( req.criticalRecruitment ) {
ProcessClass::Fitness bestFit = ProcessClass::NeverAssign;
Optional<std::pair<WorkerInterface, ProcessClass>> bestInfo;
for( auto& it : id_worker ) {
ProcessClass::Fitness fit = it.second.processClass.machineClassFitness( ProcessClass::Storage );
if( workerAvailable( it.second, false ) &&
!excludedMachines.count(it.second.interf.locality.zoneId()) &&
( includeDCs.size() == 0 || includeDCs.count(it.second.interf.locality.dcId()) ) &&
!addressExcluded(excludedAddresses, it.second.interf.address()) &&
fit < bestFit ) {
bestFit = fit;
bestInfo = std::make_pair(it.second.interf, it.second.processClass);
}
}
if( bestInfo.present() ) {
return bestInfo.get();
}
}
throw no_more_servers();
}
WorkerInterface
WorkerInterface 包含ClientWorkerInterface
网友评论