美文网首页
Elasticsearch 查询分片选择

Elasticsearch 查询分片选择

作者: b2499d9c833c | 来源:发表于2019-07-09 20:47 被阅读0次

对于同一次查询查询请求,ES会在选择某个分片副本进行查询。
例如:如图1所示,索引twitter中有3个主分片,每个分片有2个副本,共9个分片,一次搜索请求会由3个分片来完成,他们可能是主分片也可能是副本分片。即一次搜索请求只会命中所有分片副本中的一个。
主分片和副本分片中的数据理论上是完全一致的,并且一次查询只会使用一个副本,所以增加副本数不会因为并行查询而使搜索变快。但是在某些场景下多个副本下,可能会选择出一个当前集群状态下能快速响应的副本,从而使搜索快速响应。

图1

副本选择

副本选择的方式有如下几种:

  • 根据偏好参数指定分片副本
  • 感知副本选择
  • 自适应副本选择
  • 轮询分片的方式选择副本(默认
image
private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId,
                                                        DiscoveryNodes nodes, @Nullable String preference,
                                                        @Nullable ResponseCollectorService collectorService,
                                                        @Nullable Map<String, Long> nodeCounts) {
        if (preference == null || preference.isEmpty()) {
            if (awarenessAttributes.isEmpty()) {
                if (useAdaptiveReplicaSelection) {
                    return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
                } else {
                    return indexShard.activeInitializingShardsRandomIt();
                }
            } else {
                return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
            }
        }
        if (preference.charAt(0) == '_') {
            Preference preferenceType = Preference.parse(preference);
            if (preferenceType == Preference.SHARDS) {
                // starts with _shards, so execute on specific ones
                int index = preference.indexOf('|');

                String shards;
                if (index == -1) {
                    shards = preference.substring(Preference.SHARDS.type().length() + 1);
                } else {
                    shards = preference.substring(Preference.SHARDS.type().length() + 1, index);
                }
                String[] ids = Strings.splitStringByCommaToArray(shards);
                boolean found = false;
                for (String id : ids) {
                    if (Integer.parseInt(id) == indexShard.shardId().id()) {
                        found = true;
                        break;
                    }
                }
                if (!found) {
                    return null;
                }
                // no more preference
                if (index == -1 || index == preference.length() - 1) {
                    if (awarenessAttributes.isEmpty()) {
                        if (useAdaptiveReplicaSelection) {
                            return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
                        } else {
                            return indexShard.activeInitializingShardsRandomIt();
                        }
                    } else {
                        return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
                    }
                } else {
                    // update the preference and continue
                    preference = preference.substring(index + 1);
                }
            }
            preferenceType = Preference.parse(preference);
            switch (preferenceType) {
                case PREFER_NODES:
                    final Set<String> nodesIds =
                            Arrays.stream(
                                    preference.substring(Preference.PREFER_NODES.type().length() + 1).split(",")
                            ).collect(Collectors.toSet());
                    return indexShard.preferNodeActiveInitializingShardsIt(nodesIds);
                case LOCAL:
                    return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId));
                case PRIMARY:
                    deprecationLogger.deprecated("[_primary] has been deprecated in 6.1+, and will be removed in 7.0; " +
                        "use [_only_nodes] or [_prefer_nodes]");
                    return indexShard.primaryActiveInitializingShardIt();
                case REPLICA:
                    deprecationLogger.deprecated("[_replica] has been deprecated in 6.1+, and will be removed in 7.0; " +
                        "use [_only_nodes] or [_prefer_nodes]");
                    return indexShard.replicaActiveInitializingShardIt();
                case PRIMARY_FIRST:
                    deprecationLogger.deprecated("[_primary_first] has been deprecated in 6.1+, and will be removed in 7.0; " +
                        "use [_only_nodes] or [_prefer_nodes]");
                    return indexShard.primaryFirstActiveInitializingShardsIt();
                case REPLICA_FIRST:
                    deprecationLogger.deprecated("[_replica_first] has been deprecated in 6.1+, and will be removed in 7.0; " +
                        "use [_only_nodes] or [_prefer_nodes]");
                    return indexShard.replicaFirstActiveInitializingShardsIt();
                case ONLY_LOCAL:
                    return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
                case ONLY_NODES:
                    String nodeAttributes = preference.substring(Preference.ONLY_NODES.type().length() + 1);
                    return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttributes.split(","), nodes);
                default:
                    throw new IllegalArgumentException("unknown preference [" + preferenceType + "]");
            }
        }
        // if not, then use it as the index
        int routingHash = Murmur3HashFunction.hash(preference);
        if (nodes.getMinNodeVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
            // The AllocationService lists shards in a fixed order based on nodes
            // so earlier versions of this class would have a tendency to
            // select the same node across different shardIds.
            // Better overall balancing can be achieved if each shardId opts
            // for a different element in the list by also incorporating the
            // shard ID into the hash of the user-supplied preference key.
            routingHash = 31 * routingHash + indexShard.shardId.hashCode();
        }
        if (awarenessAttributes.isEmpty()) {
            return indexShard.activeInitializingShardsIt(routingHash);
        } else {
            return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);
        }
    }

根据preference参数选择副本

偏好 说明
_primary 发送到集群的相关操作请求只会在主分片上执行。
_primary_first 指查询会先在主分片中查询,如果主分片找不到(挂了),就会在副本中查询。
_replica 发送到集群的相关操作请求只会在副本上执行。
_replica_first 指查询会先在副本中查询,如果副本找不到(挂了),就会在主分片中查询。
_local 指查询操作会优先在本地节点有的分片中查询,没有的话再在其它节点查询。
_only_local 尽在本地节点上的分片上执行查询。
_prefer_nodes:abc,xyz 在提供的节点上优先执行(在这种情况下为'abc'或'xyz')
_shards:2,3 限制操作到指定的分片。 (23)。这个偏好可以与其他偏好组合,但必须首先出现_shards:2,3 | _primary
_only_nodes:node1,node2 指在指定id的节点里面进行查询,如果该节点只有要查询索引的部分分片,就只在这部分分片中查找,不同节点之间用“,”分隔。
custom(自定义) 任何不以_开头的值。如果两个搜索都为其首选项提供相同的自定义字符串值,并且基础集群状态不会更改,则将使用相同的分片顺序进行搜索。这并不能保证每次都使用完全相同的分片:群集状态以及所选分片可能会因包括分片重定位和分片失败在内的多种原因而发生变化,并且节点有时可能会拒绝导致备用节点回退的搜索。然而,在实践中,碎片的排序趋于长时间保持稳定。自定义首选项值的良好候选者类似于Web会话ID或用户名。
image.png

注意

  • custom preference参数不能以下划线"_"开头。custom preference的作用是保证搜索到结果是按照相同的顺序呈现的。
  • _only_local首选项仅保证在本地节点上使用分片副本,这有时对故障排除很有用。 所有其他选项不能完成保证在搜索中使用任何特定的分片副本,并且在变化的索引上,这可能意味着在不同的刷新状态的不同分片副本上执行重复搜索,则重复搜索可能产生不同的结果。
  • _primary,_primary_first,_replica和_replica_first已弃用,因为不建议使用它们。
    它们无助于避免因使用具有不同刷新状态的分片而产生的不一致结果,并且Elasticsearch使用同步复制,因此主数据通常不包含比其副本更新的数据。
    如果无法搜索首选副本,_primary_first和_replica_first首选项将无声地回退到非首选副本。
    如果将副本提升为主副本,则_primary和_replica首选项将以静默方式更改其首选分片,这可能随时发生。
    _primary首选项会在主分片上添加不必要的额外负载。
    也可以使用_only_nodes,_prefer_nodes或自定义字符串值来获取这些选项的缓存相关优势。

机架感知特性

如果在一个物理机上运行多个虚拟机,并且在多个虚拟机中运行了多个es节点,或者在多个机架上,多个机房,都有可能有多个es节点在相同的物理机上,或者在相同的机架上,或者在相同的机房里,那么这些节点就可能会因为物理机,机架,机房的问题,一起崩溃。
如果es可以感知到硬件的物理布局,就可以确保说,priamry shard和replica shard一定是分配到不同的物理机,或者物理机架,或者不同的机房,这样可以最小化物理机,机架,机房崩溃的风险。

shard allocation awareness可以告诉es我们的硬件架构

举个栗子,如果我们有多个机架,那么我们启动一个node的时候,就要告诉这个node它在哪个机架上,可以给它一个rack_id,比如下面的命令:./bin/elasticsearch -Enode.attr.rack_id=rack_one,也可以在elasticsearch.yml中设置这个机架id

cluster.routing.allocation.awareness.attributes: rack_id
node.attr.rack_id=rack_one

上面的两行设置里,第一行是设置机架id的属性名称,第二行是用那个机架id属性名称设置具体的机架id

如果启动两个node,都在一个机架上,此时创建一个有5个primary shard和5个replica shard的索引,此时shards会被分配到两个节点上

如果再启动两个node,设置为另外一个机架,此时es会将shard移动到新的node上,去确保说,不会让primary shard和其replica shard在同一个机架上。但是如果机架2故障了,为了恢复集群,那么还是会在恢复的时候,将shards全部在机架1上分配的

prefer local shard机制:在执行search或者get请求的时候,如果启用了shard awareness特性,那么es会尽量使用local shard来执行请求,也就是在同一个awareness group中的shard来执行请求,也就是说尽量用一个机架或者一个机房中的shard来执行请求,而不要跨机架或者跨机房来执行请求

可以指定多个awareness属性,比如机架id和机房名称,类似下面:

cluster.routing.allocation.awareness.attributes: rack_id,zone
强制性的感知

如果现在我们有两个机房,并且有足够的硬件资源来容纳所有的shard,但是可能每个机房的硬件只能容纳一半shard,不能容纳所有的shard。如果仅仅使用原始的感知特性,如果一个机房故障了,那么es会将需要恢复的shard全部分配给剩下的一个机房,但是剩下的那个机房的硬件资源并不足以容纳所有的shard。

强制感知特性会解决这个问题,因为这个特性会绝对不允许在一个机房内分配所有的shard

比如说,有一个感知属性叫做zone,有两个机房,zone1和zone2,看看下面的配置:

cluster.routing.allocation.awareness.attributes: zone
cluster.routing.allocation.awareness.force.zone.values: zone1,zone2

[图片上传失败...(image-a61d7e-1562841058405)]

那么此时如果将2个node分配给zone1机房,然后创建一个索引,5个primary shard和5个replica shard,但是此时只会在zone1机房分配5个primary shard,只有我们启动一批node在zone2机房,才会分配replica shard

配置了机架后,副本选择默认是优先在本地的机架上来查找的。

        final ArrayList<ShardRouting> to = new ArrayList<>();
        for (final String attribute : key.attributes) {
            final String localAttributeValue = nodes.getLocalNode().getAttributes().get(attribute);
            if (localAttributeValue != null) {
                for (Iterator<ShardRouting> iterator = from.iterator(); iterator.hasNext(); ) {
                    ShardRouting fromShard = iterator.next();
                    final DiscoveryNode discoveryNode = nodes.get(fromShard.currentNodeId());
                    if (discoveryNode == null) {
                        iterator.remove(); // node is not present anymore - ignore shard
                    } else if (localAttributeValue.equals(discoveryNode.getAttributes().get(attribute))) {
                        iterator.remove();
                        to.add(fromShard);
                    }
                }
            } 
        }
        return Collections.unmodifiableList(to);
    }

默认副本选择方式

在7.0之前的版本,搜索时,初始使用一个随机值,接下来的请求轮询每一个分片。

    /**
     * Returns an iterator over active and initializing shards. Making sure though that
     * its random within the active shards, and initializing shards are the last to iterate through.
     */
    public ShardIterator activeInitializingShardsRandomIt() {
        return activeInitializingShardsIt(shuffler.nextSeed());
    }
 
    @Override
    public int nextSeed() {
        return seed.getAndIncrement();
    }
    @Override
    public List<ShardRouting> shuffle(List<ShardRouting> shards, int seed) {
        return CollectionUtils.rotate(shards, seed);
    }
    /**
     * Returns an iterator over active and initializing shards. Making sure though that
     * its random within the active shards, and initializing shards are the last to iterate through.
     */
    public ShardIterator activeInitializingShardsIt(int seed) {
      // 保证初始化好的排在正在初始化中的前面
        if (allInitializingShards.isEmpty()) {
            return new PlainShardIterator(shardId, shuffler.shuffle(activeShards, seed));
        }
        ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
        ordered.addAll(shuffler.shuffle(activeShards, seed));
        ordered.addAll(allInitializingShards);
        return new PlainShardIterator(shardId, ordered);
    }
  /**
     * Return a rotated view of the given list with the given distance.
     */
  // 根据seed的自增和哈希来保证不同的分片被轮询到
    public static <T> List<T> rotate(final List<T> list, int distance) {
        if (list.isEmpty()) {
            return list;
        }

        int d = distance % list.size();
        if (d < 0) {
            d += list.size();
        }

        if (d == 0) {
            return list;
        }

        return new RotatedList<>(list, d);
    }

假设现在有三个分片,请求响应的时延如下:

  • shard copy 1: 100ms
  • shard copy 2 (degraded): 1350ms
  • shard copy 3: 150ms
    长时间得垃圾回收、高磁盘IO、网络带宽满、节点硬件异构等情况下很容易出现某个节点性能恶化,例如shard copy 2。
    发送到shard copy 2上的请求响应会变慢,为了使副本选择更加智能,ES的工程师开发了ARS的功能。

自适应副本选择

Our ARS implementation is based on a formula where, for each search request, 
Elasticsearch ranks each copy of the shard to determine which is likeliest to be the "best" 
copy to send the request to. Instead of sending requests in a round-robin fashion to each 
copy of the shard, Elasticsearch selects the "best" copy and routes the request there.

The ARS formula initially seems complex, but let's break it down:

Ψ(s) = R(s) - 1/µ̄(s) + (q̂(s))^3 / µ̄(s)

Where q̂(s) is:

q̂(s) = 1 + (os(s) * n) + q(s)
 private double innerRank(long outstandingRequests) {
            // the concurrency compensation is defined as the number of
            // outstanding requests from the client to the node times the number
            // of clients in the system
            double concurrencyCompensation = outstandingRequests * clientNum;

            // Cubic queue adjustment factor. The paper chose 3 though we could
            // potentially make this configurable if desired.
            int queueAdjustmentFactor = 3;

            // EWMA of queue size
            double qBar = queueSize;
            double qHatS = 1 + concurrencyCompensation + qBar;

            // EWMA of response time
            double rS = responseTime / FACTOR;
            // EWMA of service time
            double muBarS = serviceTime / FACTOR;

            // The final formula
            double rank = rS - (1.0 / muBarS) + (Math.pow(qHatS, queueAdjustmentFactor) / muBarS);
            return rank;
        }

And looking at the individual pieces:

  • os(s) ,节点未完成的搜索请求数。
  • n ,系统中节点的数量。
  • R(s), 从协调节点上得到的响应时间的加权平均值,单位毫秒。
  • q(s) ,搜索队列中等待任务的加权平均值。
  • µ̄(s) ,数据节点上搜索时间得加权平均值。
private static class NodeStatistics {
        final String nodeId;
        final ExponentiallyWeightedMovingAverage queueSize;
        final ExponentiallyWeightedMovingAverage responseTime;
        double serviceTime;

        NodeStatistics(String nodeId,
                       ExponentiallyWeightedMovingAverage queueSizeEWMA,
                       ExponentiallyWeightedMovingAverage responseTimeEWMA,
                       double serviceTimeEWMA) {
            this.nodeId = nodeId;
            this.queueSize = queueSizeEWMA;
            this.responseTime = responseTimeEWMA;
            this.serviceTime = serviceTimeEWMA;
        }
    }


"adaptive_selection": {
                "5BN2QxfZQ3yzotzQhUXzlg": {
                    "outgoing_searches": 0,
                    "avg_queue_size": 0,
                    "avg_service_time_ns": 2976073,
                    "avg_response_time_ns": 3396261,
                    "rank": "3.4"
                },
                "eAznL5r5RreHLIU16XpczA": {
                    "outgoing_searches": 0,
                    "avg_queue_size": 0,
                    "avg_service_time_ns": 8884750,
                    "avg_response_time_ns": 15520622,
                    "rank": "15.5"
                }
            }
没有负载下的对比

有上图可以看出,即使集群处于没有负载的情况下,ARS仍然有利于增加吞吐和减少时延。

单个节点在有负载的情况下

在某个数据节点处于高负载的情况下,吞吐有了很大的提高,延迟中位数有所增加, 这是为了绕开高负载的节点,增加了压力较低的节点的负载,从而增加了延迟,


private static List<ShardRouting> rankShardsAndUpdateStats(List<ShardRouting> shards, final ResponseCollectorService collector,
                                                               final Map<String, Long> nodeSearchCounts) {
        if (collector == null || nodeSearchCounts == null || shards.size() <= 1) {
            return shards;
        }

        // Retrieve which nodes we can potentially send the query to
        final Set<String> nodeIds = getAllNodeIds(shards);
        final int nodeCount = nodeIds.size();

        final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = getNodeStats(nodeIds, collector);

        // Retrieve all the nodes the shards exist on
        final Map<String, Double> nodeRanks = rankNodes(nodeStats, nodeSearchCounts);

        // sort all shards based on the shard rank
        ArrayList<ShardRouting> sortedShards = new ArrayList<>(shards);
        Collections.sort(sortedShards, new NodeRankComparator(nodeRanks));

        // adjust the non-winner nodes' stats so they will get a chance to receive queries
        if (sortedShards.size() > 1) {
            ShardRouting minShard = sortedShards.get(0);
            // If the winning shard is not started we are ranking initializing
            // shards, don't bother to do adjustments
            if (minShard.started()) {
                String minNodeId = minShard.currentNodeId();
                Optional<ResponseCollectorService.ComputedNodeStats> maybeMinStats = nodeStats.get(minNodeId);
                if (maybeMinStats.isPresent()) {
                    adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get());
                    // Increase the number of searches for the "winning" node by one.
                    // Note that this doesn't actually affect the "real" counts, instead
                    // it only affects the captured node search counts, which is
                    // captured once for each query in TransportSearchAction
                    nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1);
                }
            }
        }

        return sortedShards;
    }

eg. 防止某些节点一直不处理请求,会在每次选择完节点后,对选出的节点的计数+1,并且调整没有选中的节点。

/**
     * Adjust the for all other nodes' collected stats. In the original ranking paper there is no need to adjust other nodes' stats because
     * Cassandra sends occasional requests to all copies of the data, so their stats will be updated during that broadcast phase. In
     * Elasticsearch, however, we do not have that sort of broadcast-to-all behavior. In order to prevent a node that gets a high score and
     * then never gets any more requests, we must ensure it eventually returns to a more normal score and can be a candidate for serving
     * requests.
     *
     * This adjustment takes the "winning" node's statistics and adds the average of those statistics with each non-winning node. Let's say
     * the winning node had a queue size of 10 and a non-winning node had a queue of 18. The average queue size is (10 + 18) / 2 = 14 so the
     * non-winning node will have statistics added for a queue size of 14. This is repeated for the response time and service times as well.
     */
    private static void adjustStats(final ResponseCollectorService collector,
                                    final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats,
                                    final String minNodeId,
                                    final ResponseCollectorService.ComputedNodeStats minStats) {
        if (minNodeId != null) {
            for (Map.Entry<String, Optional<ResponseCollectorService.ComputedNodeStats>> entry : nodeStats.entrySet()) {
                final String nodeId = entry.getKey();
                final Optional<ResponseCollectorService.ComputedNodeStats> maybeStats = entry.getValue();
                if (nodeId.equals(minNodeId) == false && maybeStats.isPresent()) {
                    final ResponseCollectorService.ComputedNodeStats stats = maybeStats.get();
                    final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2;
                    final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2;
                    final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2;
                    collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService);
                }
            }
        }
    }

https://elasticsearch.cn/article/334
https://juejin.im/post/5b83b1d5e51d4538da22ef50
https://doc.yonyoucloud.com/doc/mastering-elasticsearch/chapter-4/45_README.html
https://www.elastic.co/guide/en/elasticsearch/reference/6.7/allocation-awareness.html
http://zh1cheung.com/zhi1cheung.github.io/elk/2018/10/02/elk/
https://pdfs.semanticscholar.org/99c7/f437d672abf56fdc9438c0c46a7ef716e8c7.pdf
https://www.elastic.co/guide/en/elasticsearch/reference/6.7/search-request-preference.html
https://jobs.zalando.com/tech/blog/a-closer-look-at-elasticsearch-express/?gh_src=4n3gxh1
https://www.elastic.co/blog/improving-response-latency-in-elasticsearch-with-adaptive-replica-selection
https://www.elastic.co/guide/cn/elasticsearch/guide/current/_search_options.html

相关文章

网友评论

      本文标题:Elasticsearch 查询分片选择

      本文链接:https://www.haomeiwen.com/subject/rqrfkctx.html