美文网首页
Elasticsearch新的Discovery---Coord

Elasticsearch新的Discovery---Coord

作者: Ombres | 来源:发表于2019-07-18 20:36 被阅读0次

    简介

    Elasticsearch目前在使用的服务发现算法,是基于zen的服务发现的第二代实现

    节点模式

    节点有三种模式,分别是CANDIDATE, LEADER, FOLLOWER

    流程分析

    doStart

    主要是服务发现的基本配置的工作,做了几件事:

    1. 获取持久化的协调状态
    2. 初始化节点发现器(探测器)
    3. 选举配置
    4. 初始化集群状态
    protected void doStart() {
            synchronized (mutex) {
                // 获取持久化的协调状态
                CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
                // 当前协调状态
                coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState));
                // 节点发现的数量
                peerFinder.setCurrentTerm(getCurrentTerm());
                // hosts解析器
                configuredHostsResolver.start();
                // 最近一次的状态
                final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
                if (lastAcceptedState.metaData().clusterUUIDCommitted()) {
                    logger.info("cluster UUID [{}]", lastAcceptedState.metaData().clusterUUID());
                }
                // 选举的节点的id,当集群状态变更的时候用到的配置 这里获取最近保存的配置信息
                final VotingConfiguration votingConfiguration = lastAcceptedState.getLastCommittedConfiguration();
                // 单节点发现,而且选举列表不为空,同时没有足够的选举节点就会引发异常
                if (singleNodeDiscovery &&
                    votingConfiguration.isEmpty() == false &&
                    votingConfiguration.hasQuorum(Collections.singleton(getLocalNode().getId())) == false) {
                    throw new IllegalStateException("cannot start with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] set to [" +
                        DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] when local node " + getLocalNode() +
                        " does not have quorum in voting configuration " + votingConfiguration);
                }
                // 初始化集群状态
                ClusterState initialState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
                    .blocks(ClusterBlocks.builder()
                        .addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
                        .addGlobalBlock(noMasterBlockService.getNoMasterBlock()))
                    .nodes(DiscoveryNodes.builder().add(getLocalNode()).localNodeId(getLocalNode().getId()))
                    .build();
                applierState = initialState;
                clusterApplier.setInitialState(initialState);
            }
        }
    

    startInitialJoin

    主要用于第一次启动时进行节点发现及选举加入集群。

      synchronized (mutex) {
                // 成为候选人,Mode初始化是null,也即是什么都不是,这里会改成CANDIDATE.
                becomeCandidate("startInitialJoin");
            }
            // 如果是master,启动集群
            clusterBootstrapService.scheduleUnconfiguredBootstrap();
    

    becomeCandidate(String method)

    void becomeCandidate(String method) {
            assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
            logger.debug("{}: coordinator becoming CANDIDATE in term {} (was {}, lastKnownLeader was [{}])",
                method, getCurrentTerm(), mode, lastKnownLeader);
            // 第一次启动节点mode是null
            if (mode != Mode.CANDIDATE) {
                final Mode prevMode = mode;
                mode = Mode.CANDIDATE;
                cancelActivePublication("become candidate: " + method);
                joinAccumulator.close(mode);
                joinAccumulator = joinHelper.new CandidateJoinAccumulator();
    
                peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
                clusterFormationFailureHelper.start();
    
                if (getCurrentTerm() == ZEN1_BWC_TERM) {
                    discoveryUpgradeService.activate(lastKnownLeader, coordinationState.get().getLastAcceptedState());
                }
    
                leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
                leaderChecker.updateLeader(null);
    
                followersChecker.clearCurrentNodes();
                followersChecker.updateFastResponseState(getCurrentTerm(), mode);
                lagDetector.clearTrackedNodes();
    
                if (prevMode == Mode.LEADER) {
                    cleanMasterService();
                }
    
                if (applierState.nodes().getMasterNodeId() != null) {
                    applierState = clusterStateWithNoMasterBlock(applierState);
                    clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, (source, e) -> {
                    });
                }
            }
    
            preVoteCollector.update(getPreVoteResponse(), null);
        }
    

    相关文章

      网友评论

          本文标题:Elasticsearch新的Discovery---Coord

          本文链接:https://www.haomeiwen.com/subject/wkykkctx.html