美文网首页
2018-05-17 ProcessClass 管理

2018-05-17 ProcessClass 管理

作者: StayFoolish2 | 来源:发表于2018-05-18 17:02 被阅读0次

目的

梳理 fdb 有哪些 ProcessClass,是怎么启动这些 ProcessClass 的,以及主要的 ProcessClass 的作用。

思路

在 fdb 的实现中,在 WorkerInterfacer.h 文件中有一个 startRole 的函数,在 fdb 启动某个 ProcessClass 的时候都会调用该函数来进行注册。所以我们可以通过该函数来帮助我们梳理fdb的实现。

void startRole(UID roleId, UID workerId, std::string as, std::map<std::string, std::string> details = std::map<std::string, std::string>(), std::string origination = "Recruited");

角色列表

通过直接搜索 startRole,可以得到完整的 ProcessClass 列表有:

  1. LogRouter
  2. Resolver
  3. MasterProxyServer
  4. StorageServer
  5. SharedTLog
  6. MasterServer
  7. Worker
  8. TLog
  9. Tester
  10. ClusterController

ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, LocalityData localities, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, ProcessClass initialClass, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix ) 函数中,先根据 getDiskStores 来启动 StorageServer/TLog, 并注册一个 Worker 的角色。之后通过事件循环监听 interf 来决定启动其它角色。例如其中的 MasterServer

loop choose {

when( RecruitMasterRequest req = waitNext(interf.master.getFuture()) ) {
                MasterInterface recruited;
                recruited.locality = locality;
                recruited.initEndpoints();

                startRole( recruited.id(), interf.id(), "MasterServer" );

                DUMPTOKEN( recruited.waitFailure );
                DUMPTOKEN( recruited.getRateInfo );
                DUMPTOKEN( recruited.tlogRejoin );
                DUMPTOKEN( recruited.changeCoordinators );
                DUMPTOKEN( recruited.getCommitVersion );

                //printf("Recruited as masterServer\n");
                Future<Void> masterProcess = masterServer( recruited, dbInfo, ServerCoordinators( connFile ), req.lifetime );
                errorForwarders.add( zombie(recruited, forwardError( errors, "MasterServer", recruited.id(), masterProcess )) );
                req.reply.send(recruited);
            }
}

流程上:

  1. ClusterConnectionFile 里面有 coordinator 的地址
  2. fdbd 方法尝试从 coordiantor 中选择一个选举为 clusterControler,同时
  3. 会调用 workerServer 方法,这儿 workerServer 会和上一步的 clusterControler 方法共用一个 ClusterControllerFullInterface 变量(之后 clusterControler 会通过其来发送控制消息?)。
  4. clusterControler 会启动 Master,见方法 clusterWatchDatabase

角色个数

fdb 期望来自动决策“启动哪些角色以及启动多少”,默认的配置在 fdbclient/Knobs.cpp 下。如下

        // Configuration
    init( DEFAULT_AUTO_PROXIES,                      3 );
    init( DEFAULT_AUTO_RESOLVERS,                    1 );
    init( DEFAULT_AUTO_LOGS,                         3 );

ClusterControllerData 类中的一些方法,例如 getStorageWorker 根据请求的条件,分配出合适的 worker。在分析机制,有一个 Fitness 的概念来选择最合适的 worker 负责相应的角色。 findWorkersForConfiguration 函数负责从配置中规划整个集群。

    vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, int amount, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, Optional<WorkerFitnessInfo> minWorker = Optional<WorkerFitnessInfo>(), bool checkStable = false ) {
        std::map<std::pair<ProcessClass::Fitness,int>, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
        vector<std::pair<WorkerInterface, ProcessClass>> results;
        if (amount <= 0)
            return results;

        for( auto& it : id_worker ) {
            auto fitness = it.second.processClass.machineClassFitness( role );
            if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.interf.address()) && it.second.interf.locality.dcId() == dcId &&
              ( !minWorker.present() || ( it.second.interf.id() != minWorker.get().worker.first.id() && ( fitness < minWorker.get().fitness || (fitness == minWorker.get().fitness && id_used[it.first] <= minWorker.get().used ) ) ) ) ) {
                fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].push_back(std::make_pair(it.second.interf, it.second.processClass));
            }
        }

        for( auto& it : fitness_workers ) {
            auto& w = it.second;
            g_random->randomShuffle(w);
            for( int i=0; i < w.size(); i++ ) {
                results.push_back(w[i]);
                id_used[w[i].first.locality.processId()]++;
                if( results.size() == amount )
                    return results;
            }
        }

        return results;
    }

ProcessClass::Fitness ProcessClass::machineClassFitness( ClusterRole role ) const {
    switch( role ) {
    case ProcessClass::Storage:
        switch( _class ) {
        case ProcessClass::StorageClass:
            return ProcessClass::BestFit;
        case ProcessClass::UnsetClass:
            return ProcessClass::UnsetFit;
        case ProcessClass::TransactionClass:
            return ProcessClass::WorstFit;
        case ProcessClass::LogClass:
            return ProcessClass::WorstFit;
        case ProcessClass::TesterClass:
            return ProcessClass::NeverAssign;
        default:
            return ProcessClass::NeverAssign;
        }
    case ProcessClass::TLog:
        switch( _class ) {
        case ProcessClass::LogClass:
            return ProcessClass::BestFit;
        case ProcessClass::TransactionClass:
            return ProcessClass::GoodFit;
        case ProcessClass::UnsetClass:
            return ProcessClass::UnsetFit;
        case ProcessClass::StorageClass:
            return ProcessClass::WorstFit;
        case ProcessClass::TesterClass:
            return ProcessClass::NeverAssign;
        default:
            return ProcessClass::NeverAssign;
        }
    case ProcessClass::Proxy:
        switch( _class ) {
        case ProcessClass::ProxyClass:
            return ProcessClass::BestFit;
        case ProcessClass::StatelessClass:
            return ProcessClass::GoodFit;
        case ProcessClass::ResolutionClass:
            return ProcessClass::BestOtherFit;
        case ProcessClass::TransactionClass:
            return ProcessClass::BestOtherFit;
        case ProcessClass::UnsetClass:
            return ProcessClass::UnsetFit;
        case ProcessClass::TesterClass:
            return ProcessClass::NeverAssign;
        default:
            return ProcessClass::WorstFit;
        }
    case ProcessClass::Master:
        switch( _class ) {
        case ProcessClass::MasterClass:
            return ProcessClass::BestFit;
        case ProcessClass::StatelessClass:
            return ProcessClass::GoodFit;
        case ProcessClass::ResolutionClass:
            return ProcessClass::BestOtherFit;
        case ProcessClass::TransactionClass:
            return ProcessClass::BestOtherFit;
        case ProcessClass::UnsetClass:
            return ProcessClass::UnsetFit;
        case ProcessClass::TesterClass:
            return ProcessClass::NeverAssign;
        default:
            return ProcessClass::WorstFit;
        }
    case ProcessClass::Resolver:
        switch( _class ) {
        case ProcessClass::ResolutionClass:
            return ProcessClass::BestFit;
        case ProcessClass::StatelessClass:
            return ProcessClass::GoodFit;
        case ProcessClass::TransactionClass:
            return ProcessClass::BestOtherFit;
        case ProcessClass::UnsetClass:
            return ProcessClass::UnsetFit;
        case ProcessClass::TesterClass:
            return ProcessClass::NeverAssign;
        default:
            return ProcessClass::WorstFit;
        }
    case ProcessClass::LogRouter:
        switch( _class ) {
        case ProcessClass::LogRouterClass:
            return ProcessClass::BestFit;
        case ProcessClass::StatelessClass:
            return ProcessClass::GoodFit;
        case ProcessClass::ResolutionClass:
            return ProcessClass::BestOtherFit;
        case ProcessClass::TransactionClass:
            return ProcessClass::BestOtherFit;
        case ProcessClass::UnsetClass:
            return ProcessClass::UnsetFit;
        case ProcessClass::TesterClass:
            return ProcessClass::NeverAssign;
        default:
            return ProcessClass::WorstFit;
        }
    case ProcessClass::ClusterController:
        switch( _class ) {
        case ProcessClass::ClusterControllerClass:
            return ProcessClass::BestFit;   
        case ProcessClass::StatelessClass:
            return ProcessClass::GoodFit;
        case ProcessClass::MasterClass:
            return ProcessClass::BestOtherFit;
        case ProcessClass::ResolutionClass:
            return ProcessClass::BestOtherFit;
        case ProcessClass::ProxyClass:
            return ProcessClass::BestOtherFit;
        case ProcessClass::UnsetClass:
            return ProcessClass::UnsetFit;
        case ProcessClass::TesterClass:
            return ProcessClass::NeverAssign;
        default:
            return ProcessClass::WorstFit;
        }
    default:
        return ProcessClass::NeverAssign;
    }
}
    std::pair<WorkerInterface, ProcessClass> getStorageWorker( RecruitStorageRequest const& req ) {
        std::set<Optional<Standalone<StringRef>>> excludedMachines( req.excludeMachines.begin(), req.excludeMachines.end() );
        std::set<Optional<Standalone<StringRef>>> includeDCs( req.includeDCs.begin(), req.includeDCs.end() );
        std::set<AddressExclusion> excludedAddresses( req.excludeAddresses.begin(), req.excludeAddresses.end() );

        for( auto& it : id_worker )
            if( workerAvailable( it.second, false ) &&
                    !excludedMachines.count(it.second.interf.locality.zoneId()) &&
                    ( includeDCs.size() == 0 || includeDCs.count(it.second.interf.locality.dcId()) ) &&
                    !addressExcluded(excludedAddresses, it.second.interf.address()) &&
                    it.second.processClass.machineClassFitness( ProcessClass::Storage ) <= ProcessClass::UnsetFit ) {
                return std::make_pair(it.second.interf, it.second.processClass);
            }

        if( req.criticalRecruitment ) {
            ProcessClass::Fitness bestFit = ProcessClass::NeverAssign;
            Optional<std::pair<WorkerInterface, ProcessClass>> bestInfo;
            for( auto& it : id_worker ) {
                ProcessClass::Fitness fit = it.second.processClass.machineClassFitness( ProcessClass::Storage );
                if( workerAvailable( it.second, false ) &&
                        !excludedMachines.count(it.second.interf.locality.zoneId()) &&
                        ( includeDCs.size() == 0 || includeDCs.count(it.second.interf.locality.dcId()) ) &&
                        !addressExcluded(excludedAddresses, it.second.interf.address()) &&
                        fit < bestFit ) {
                    bestFit = fit;
                    bestInfo = std::make_pair(it.second.interf, it.second.processClass);
                }
            }

            if( bestInfo.present() ) {
                return bestInfo.get();
            }
        }

        throw no_more_servers();
    }

WorkerInterface

WorkerInterface 包含ClientWorkerInterface

相关文章

网友评论

      本文标题:2018-05-17 ProcessClass 管理

      本文链接:https://www.haomeiwen.com/subject/pjibdftx.html