美文网首页大数据
es实战-rebalance功能及源码解析

es实战-rebalance功能及源码解析

作者: caster | 来源:发表于2021-08-27 17:35 被阅读0次
    1. rebalance tasks在es集群里面的表现形式:
      通过调用 GET _cat/tasks?v API
      返回结果中 action 为 internal:index/shard/recovery/start_recovery(不仅仅是rebalance)
    2. 判断shards移动状况:
      通过调用 GET _cat/recovery?v API
      返回结果中 type 为 peer;source_node 和 target_node 可以看出分片移动的方向;stage可以看出移动进行到哪一步: INIT->......->DONE
    3. 查看分片状态
      通过调用 GET _cat/shards?v API
      返回结果中 可以看到移动的分片state为RELOCATING状态
    4. 查看每个节点分片数
      使用kibana的monitor观测或者通过:GET _nodes/stats/indices?level=shards 统计每个node的shards数组长度(感觉_cat/nodes API有必要添加shards数的监控)

    Rebalance相关配置参数有以下3+3个:

    cluster.routing.rebalance.enable//谁可以进行rebalance
    cluster.routing.allocation.allow_rebalance//什么时候可以rebalance
    cluster.routing.allocation.cluster_concurrent_rebalance//rebalance的并行度(shards级别)
    
    cluster.routing.allocation.balance.shard//allocate每个node上shard总数时计算的权重,提高这个值以后会使node上的shard总数基本趋于一致
    cluster.routing.allocation.balance.index//allocate每个index在一个node上shard数时计算的权重,提高这个值会使单个index的shard在集群节点中均衡分布
    cluster.routing.allocation.balance.threshold//阈值,提高这个值可以提高集群rebalance的惰性
    

    具体分析见下文......

    源码解析

    抽象基类:AllocationDecider提供两个判断是否需要rebalane的方法

    public abstract class AllocationDecider {
        //判断是否可以进行shard routing
        public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
            return Decision.ALWAYS;
        }
        //判断集群是否可以进行rebalance操作(主要研究)
        public Decision canRebalance(RoutingAllocation allocation) {
            return Decision.ALWAYS;
        }
    }
    

    AllocationDeciders类继承了基类,用于汇总一组决策者的决定来确定最终决定。

    public Decision canRebalance(RoutingAllocation allocation) {
        Decision.Multi ret = new Decision.Multi();
        for (AllocationDecider allocationDecider : allocations) {
            Decision decision = allocationDecider.canRebalance(allocation);
            // short track if a NO is returned.
            if (decision == Decision.NO) {
                if (!allocation.debugDecision()) {
                    return decision;
                } else {
                    ret.add(decision);
                }
            } else {
                addDecision(ret, decision, allocation);
            }
        }
        return ret;
    }
    

    其中判断集群是否可以进行rebalance的决策者们如下:

    • EnableAllocationDecider
      针对index.routing.rebalance.enable参数
    • ClusterRebalanceAllocationDecider
      针对cluster.routing.allocation.allow_rebalance参数
    • ConcurrentRebalanceAllocationDecider
      针对cluster.routing.allocation.cluster_concurrent_rebalance参数

    具体的rebalance过程是由BalancedShardsAllocator类中allocate()方法中:调用Balancer的balanceByWeights()方法执行。
    BalancedShardsAllocator初始化时会根据上文三个参数设置weightFunction(上文参数4,5)和Threshold(上文参数6)。

    public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) {
        setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings));
        setThreshold(THRESHOLD_SETTING.get(settings));
        clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction);
        clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
    }
    
    private void setWeightFunction(float indexBalance, float shardBalanceFactor) {
        weightFunction = new WeightFunction(indexBalance, shardBalanceFactor);
    }
    
    private void setThreshold(float threshold) {
        this.threshold = threshold;
    }
    

    WeightFunction权重函数用于均衡计算节点间shards数量平衡节点间每个索引shards数平衡,看注释:

    private static class WeightFunction {
    
        private final float indexBalance;
        private final float shardBalance;
        private final float theta0;
        private final float theta1;
        //默认 0.45 和 0.55 相加等于一
        WeightFunction(float indexBalance, float shardBalance) {
            float sum = indexBalance + shardBalance;
            if (sum <= 0.0f) {
                throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
            }
            //相加等于一则权重保持参数配置
            theta0 = shardBalance / sum;
            theta1 = indexBalance / sum;
            this.indexBalance = indexBalance;
            this.shardBalance = shardBalance;
        }
        //获取权重计算结果,方式为通过Balancer策略和当前节点和当前索引计算
        float weight(Balancer balancer, ModelNode node, String index) {
            //当前节点的shards数减去平均的shards数
            final float weightShard = node.numShards() - balancer.avgShardsPerNode();
            //当前节点当前索引shards数减去平均的shards数
            final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
            //乘以系数得出结果
            return theta0 * weightShard + theta1 * weightIndex;
        }
    }
    

    再说Balancer:它的具体三个工作如下所示(本文主要想研究balance):

    public void allocate(RoutingAllocation allocation) {
        if (allocation.routingNodes().size() == 0) {
            failAllocationOfNewPrimaries(allocation);
            return;
        }
        final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
        //分配未分配的shards
        balancer.allocateUnassigned();
        //重分配需要迁移的shards(一些分配规则的限制)
        balancer.moveShards();
        //尽量平衡分片在节点的数量
        balancer.balance();//最终调用balanceByWeights()
    }
    

    接下来看balance():

    • 首先你想看balance过程得开启日log的trace
    • issue 14387,集群OK且shards OK才rebalance,否则可能做无用功
    • 调用上文提到的canRebalance()判断是否可以进行
    • 节点只有一个没必要进行
    • 开始进行rebalance
    private void balance() {
        if (logger.isTraceEnabled()) {
            logger.trace("Start balancing cluster");
        }
        if (allocation.hasPendingAsyncFetch()) {
            /*
             * see https://github.com/elastic/elasticsearch/issues/14387
             * if we allow rebalance operations while we are still fetching shard store data
             * we might end up with unnecessary rebalance operations which can be super confusion/frustrating
             * since once the fetches come back we might just move all the shards back again.
             * Therefore we only do a rebalance if we have fetched all information.
             */
            logger.debug("skipping rebalance due to in-flight shard/store fetches");
            return;
        }
        if (allocation.deciders().canRebalance(allocation).type() != Type.YES) {
            logger.trace("skipping rebalance as it is disabled");
            return;
        }
        if (nodes.size() < 2) { /* skip if we only have one node */
            logger.trace("skipping rebalance as single node only");
            return;
        }
        balanceByWeights();//核心方法
    }
    

    接下来看balanceByWeights():核心代码在此 内容比较多,英文注释已去除,添加了详细的中文注释,一定要捋一遍......

    private void balanceByWeights() {
        //判断是否要rebanlance的决策者
        final AllocationDeciders deciders = allocation.deciders();
        //节点信息:包括节点shards数和节点内每个index的shards数
        final ModelNode[] modelNodes = sorter.modelNodes;
        //节点内每个索引的权重信息
        final float[] weights = sorter.weights;
        //处理每个索引
        for (String index : buildWeightOrderedIndices()) {
            IndexMetadata indexMetadata = metadata.index(index);
            //找到含有索引shards或者索引shards可以移动过去的节点,并将其移动到ModelNode数组靠前的位置
            int relevantNodes = 0;
            for (int i = 0; i < modelNodes.length; i++) {
                ModelNode modelNode = modelNodes[i];
                if (modelNode.getIndex(index) != null
                    || deciders.canAllocate(indexMetadata, modelNode.getRoutingNode(), allocation).type() != Type.NO) {
                    // swap nodes at position i and relevantNodes
                    modelNodes[i] = modelNodes[relevantNodes];
                    modelNodes[relevantNodes] = modelNode;
                    relevantNodes++;
                }
            }
            //没有或者只有一个相关节点则跳过
            if (relevantNodes < 2) {
                continue;
            }
            //对相关节点重新计算权重并排序
            sorter.reset(index, 0, relevantNodes);
            //准备对相关节点即前relevantNodes个节点下手
            int lowIdx = 0;
            int highIdx = relevantNodes - 1;
            while (true) {
                final ModelNode minNode = modelNodes[lowIdx];
                final ModelNode maxNode = modelNodes[highIdx];
                advance_range:
                if (maxNode.numShards(index) > 0) {
                    //计算相关节点的最大权重差值,如果低于参数3配置的值则跳过
                    final float delta = absDelta(weights[lowIdx], weights[highIdx]);
                    if (lessThan(delta, threshold)) {
                        if (lowIdx > 0 && highIdx-1 > 0 && (absDelta(weights[0], weights[highIdx-1]) > threshold) ) {
                            break advance_range;
                        }
                        if (logger.isTraceEnabled()) {
                            logger.trace("Stop balancing index [{}]  min_node [{}] weight: [{}]" +
                                    "  max_node [{}] weight: [{}]  delta: [{}]",
                                    index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
                        }
                        break;
                    }
                    if (logger.isTraceEnabled()) {
                        logger.trace("Balancing from node [{}] weight: [{}] to node [{}] weight: [{}]  delta: [{}]",
                                maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
                    }
                    //权重差值小于默认值1则跳过?应该写配置参数而不是写死1吧?
                    if (delta <= 1.0f) {
                        logger.trace("Couldn't find shard to relocate from node [{}] to node [{}]",
                            maxNode.getNodeId(), minNode.getNodeId());
                        //进行分片们移动,在两个节点间进行全部可能的ShardRouting。
                    } else if (tryRelocateShard(minNode, maxNode, index)) {
                        //移动完成后由于节点shards数发生编发,会重新计算他们的权重并重新排序,开启下一轮计算
                        weights[lowIdx] = sorter.weight(modelNodes[lowIdx]);
                        weights[highIdx] = sorter.weight(modelNodes[highIdx]);
                        sorter.sort(0, relevantNodes);
                        lowIdx = 0;
                        highIdx = relevantNodes - 1;
                        continue;
                    }
                }
                //如果本轮没有移动情况,节点权重没有发生改变,则继续处理其他的相关节点
                if (lowIdx < highIdx - 1) {
                    lowIdx++;
                } else if (lowIdx > 0) {
                    lowIdx = 0;
                    highIdx--;
                } else {
                    //当前索引已经平衡
                    break;
                }
            }
        }
    }
    

    接下来看tryRelocateShard()方法,在两个节点进行分片们的平衡:
    //TODO

    相关文章

      网友评论

        本文标题:es实战-rebalance功能及源码解析

        本文链接:https://www.haomeiwen.com/subject/amzpiltx.html