Elasticsearch的选举机制

作者: kason_zhang | 来源:发表于2018-05-12 16:53 被阅读386次

    关于Elasticsearch的选举机制:
    ES选举master机制不像Hbase的HMaster选举, HMaster选举是借助ZK,通过各个节点向ZK注册临时节点(ZK保证只有一个节点能够注册成功, 此节点就是master节点),其余节点加入备节点,而且会监测ZNODE是否消失,消失的时候,备节点会争相向ZK注册临时节点进而选出新的master。
    Elasticsearch选举master的时候, 当加入一个节点, 如果之前的Elasticsearch集群已经正常的在运行, 那么此时这个节点的加入会选择接受之前的master, 然后自己连接master并加入这个master构成的集群。如果是整个master集群刚开始初始启动的时候,这时候情况就会不同,就会出现选举master的过程。 这时候的选举可能选到了自己作为master, 也有可能是接受其他节点的master。

    代码流程图如下所以:


    Elasticsearch Master选举机制

    其代码主要是ZenDiscovery这个类,在它的doStart方法中,

    protected void doStart() {
            nodesFD.setLocalNode(clusterService.localNode());
            joinThreadControl.start();
            pingService.start();
            this.nodeJoinController = new NodeJoinController(clusterService, routingService, discoverySettings, settings);
    
            // start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
            clusterService.submitStateUpdateTask("initial_join", new ClusterStateNonMasterUpdateTask() {
                @Override
                public ClusterState execute(ClusterState currentState) throws Exception {
                    // do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
                    joinThreadControl.startNewThreadIfNotRunning();
                    return currentState;
                }
    
                @Override
                public void onFailure(String source, @org.elasticsearch.common.Nullable Throwable t) {
                    logger.warn("failed to start initial join process", t);
                }
            });
        }
    
    public void startNewThreadIfNotRunning() {
                assertClusterStateThread();
                if (joinThreadActive()) {
                    return;
                }
                threadPool.generic().execute(new Runnable() {
                    @Override
                    public void run() {
                        Thread currentThread = Thread.currentThread();
                        if (!currentJoinThread.compareAndSet(null, currentThread)) {
                            return;
                        }
                        while (running.get() && joinThreadActive(currentThread)) {
                            try {
                                innerJoinCluster();
                                return;
                            } catch (Exception e) {
                                logger.error("unexpected error while joining cluster, trying again", e);
                                // Because we catch any exception here, we want to know in
                                // tests if an uncaught exception got to this point and the test infra uncaught exception
                                // leak detection can catch this. In practise no uncaught exception should leak
                                assert ExceptionsHelper.reThrowIfNotNull(e);
                            }
                        }
                        // cleaning the current thread from currentJoinThread is done by explicit calls.
                    }
                });
            }
    
    

    最终会调用 innerJoinCluster();函数
    innerJoinCluster函数中,最主要的一部分代码就是

    // 一直阻塞直到找到master节点,在集群刚刚启动,或者集群master丢失的情况,这种阻塞能够保证集群一致性
            while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
                masterNode = findMaster(); // 找到Master, 可能是自己也可能不是自己
            }
    

    findMaster的代码逻辑是:

    private DiscoveryNode findMaster() {
            logger.trace("starting to ping");
            // 通过ping 其他节点来判定本节点能够连接上的节点的个数
            ZenPing.PingResponse[] fullPingResponses = pingService.pingAndWait(pingTimeout);
            if (fullPingResponses == null) {
                logger.trace("No full ping responses");
                return null;
            }
            if (logger.isTraceEnabled()) {
                StringBuilder sb = new StringBuilder("full ping responses:");
                if (fullPingResponses.length == 0) {
                    sb.append(" {none}");
                } else {
                    for (ZenPing.PingResponse pingResponse : fullPingResponses) {
                        sb.append("\n\t--> ").append(pingResponse);
                    }
                }
                logger.trace(sb.toString());
            }
    
            // filter responses
            // 过滤PingResponse, 排除掉client节点,单纯的data节点
            List<ZenPing.PingResponse> pingResponses = new ArrayList<>();
            for (ZenPing.PingResponse pingResponse : fullPingResponses) {
                DiscoveryNode node = pingResponse.node();
                if (masterElectionFilterClientNodes && (node.clientNode() || (!node.masterNode() && !node.dataNode()))) {
                    // filter out the client node, which is a client node, or also one that is not data and not master (effectively, client)
                } else if (masterElectionFilterDataNodes && (!node.masterNode() && node.dataNode())) {
                    // filter out data node that is not also master
                } else {
                    pingResponses.add(pingResponse);
                }
            }
    
            if (logger.isDebugEnabled()) {
                StringBuilder sb = new StringBuilder("filtered ping responses: (filter_client[").append(masterElectionFilterClientNodes).append("], filter_data[").append(masterElectionFilterDataNodes).append("])");
                if (pingResponses.isEmpty()) {
                    sb.append(" {none}");
                } else {
                    for (ZenPing.PingResponse pingResponse : pingResponses) {
                        sb.append("\n\t--> ").append(pingResponse);
                    }
                }
                logger.debug(sb.toString());
            }
    
            final DiscoveryNode localNode = clusterService.localNode();
            List<DiscoveryNode> pingMasters = new ArrayList<>();
            //获取所有ping响应中的master节点,如果master节点是节点本身则过滤掉。
            // 要么是同一个节点(出现不同节点则集群出现了问题不过没关系,后面会进行选举)
            // 正常情况下, pingMasters只有一个值
            for (ZenPing.PingResponse pingResponse : pingResponses) {
                if (pingResponse.master() != null) {
                    // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
                    // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
                    if (!localNode.equals(pingResponse.master())) {
                        pingMasters.add(pingResponse.master());
                    }
                }
            }
    
            // nodes discovered during pinging
            Set<DiscoveryNode> activeNodes = Sets.newHashSet();
            // nodes discovered who has previously been part of the cluster and do not ping for the very first time
            Set<DiscoveryNode> joinedOnceActiveNodes = Sets.newHashSet();
           //本节点暂时是master也要加入候选节点进行选举
            if (localNode.masterNode()) { // 本节点被人选举为master
                activeNodes.add(localNode);
                long joinsCounter = clusterJoinsCounter.get();
                if (joinsCounter > 0) {
                    logger.trace("adding local node to the list of active nodes who has previously joined the cluster (joins counter is [{}})", joinsCounter);
                    joinedOnceActiveNodes.add(localNode);
                }
            }
    
            for (ZenPing.PingResponse pingResponse : pingResponses) {
                activeNodes.add(pingResponse.node());
                if (pingResponse.hasJoinedOnce()) {
                    joinedOnceActiveNodes.add(pingResponse.node());
                }
            }
            //pingMasters为空,则本节点是master节点,
            if (pingMasters.isEmpty()) { // pingMasters时空有两种情况,一种本地节点就是master节点
                // 保证选举数量,说明有足够多的节点选举本节点为master,但是这还不够,本节点还需要再选举一次,如果本次选举节点仍旧是自己,那么本节点才能成为master。
                if (electMaster.hasEnoughMasterNodes(activeNodes)) {   // 判断是否包含足够的节点数,是否大于n/2 + 1
                    // we give preference to nodes who have previously already joined the cluster. Those will
                    // have a cluster state in memory, including an up to date routing table (which is not persistent to disk
                    // by the gateway)
                    DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes); // 既然本节点已经被ping同的节点选为了master, 也要自己选择自己一把,才能成为master, 不然的话对activeNodes进行重新选举
                    if (master != null) {
                        return master;
                    }
                    return electMaster.electMaster(activeNodes); // 重新选举
                } else {
                    // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                    logger.trace("not enough master nodes [{}]", activeNodes);
                    return null;
                }
            } else {
                //pingMasters不为空(pingMasters列表中应该都是同一个节点),本节点没有被选举为master,那就接受之前的选举。
                assert !pingMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
                // lets tie break between discovered nodes
                return electMaster.electMaster(pingMasters);
            }
        }
    
    

    这里我来使用场景来说明一下初始启动集群的场景, 正常集群加入新节点的过程这里就不再进行描述。
    假设有三个节点node1, node2, node3, 假设我们配置每个节点都有机会(node.master: true)可以成为master,刚开始启动的时候, node1启动了,此时node1去执行findMaster(),由于此时只有一个节点, node1只能发现自己这个节点, 不满足节点数大于n/2+1的条件(配置文件指定的),所以此时找不到master, node1会不断的执行while循环直到找到master位置。
    然后此时node2上线启动,node1和node2构成了两个节点,node2选择自己作为master节点, 此时node2 通过ping可以发现node1, 此时

    if (localNode.masterNode()) { // 本节点被人选举为master
                activeNodes.add(localNode);
                long joinsCounter = clusterJoinsCounter.get();
                if (joinsCounter > 0) {
                    logger.trace("adding local node to the list of active nodes who has previously joined the cluster (joins counter is [{}})", joinsCounter);
                    joinedOnceActiveNodes.add(localNode);
                }
            }
    
            for (ZenPing.PingResponse pingResponse : pingResponses) {
                activeNodes.add(pingResponse.node());
                if (pingResponse.hasJoinedOnce()) {
                    joinedOnceActiveNodes.add(pingResponse.node());
                }
            }
    

    可以知道activeNodes里面将会存放node1,node2,
    joinedOnceActiveNodes存放的是node2
    然后进行到此处:

    if (pingMasters.isEmpty()) { // pingMasters时空有两种情况,一种本地节点就是master节点,另一种一开始初始启动,还没选出master,而本节点也没被选为master就可能出现空
                // 保证选举数量,说明有足够多的节点选举本节点为master,但是这还不够,本节点还需要再选举一次,如果本次选举节点仍旧是自己,那么本节点才能成为master。
                if (electMaster.hasEnoughMasterNodes(activeNodes)) {   // 判断是否包含足够的节点数,是否大于n/2 + 1
                    // we give preference to nodes who have previously already joined the cluster. Those will
                    // have a cluster state in memory, including an up to date routing table (which is not persistent to disk
                    // by the gateway)
                    DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes); // 既然本节点已经被ping同的节点选为了master, 也要自己选择自己一把,才能成为master, 不然的话对activeNodes进行重新选举
                    if (master != null) {
                        return master;
                    }
                    return electMaster.electMaster(activeNodes); // 重新选举
                } else {
                    // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                    logger.trace("not enough master nodes [{}]", activeNodes);
                    return null;
                }
            } 
    

    此时node2还会选举自己一把DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes);这样node2才能真正成为master, 不然的话会执行return electMaster.electMaster(activeNodes);也就是在node1和node2上重新选举。
    此时node1 的循环又开始了,他就会发现master不是自己而是node2, 这样就会接受node2是master

    然后第三个节点node3上线了, 此时他也会执行while循环中的findMaster方法, 发现集群中已经有一个正常的master, 这时候也是接受那个master, 并与之联系,加入集群。

    相关文章

      网友评论

      • 划船不用桨_1948:也就是说,如果按顺序启动集群,正常情况,永远都是node2是主节点?
        kason_zhang:@划船不用桨_1948 是的, 不过前提是你的配置是node.master=true让节点可以成为master,如果是node.data=true或者node.client=true类的作为数据节点或者协调节点的话就不一定了.

      本文标题:Elasticsearch的选举机制

      本文链接:https://www.haomeiwen.com/subject/xyhhdftx.html