美文网首页ElasticSearch
es源码笔记-Routing

es源码笔记-Routing

作者: 多喝水JS | 来源:发表于2019-10-31 14:12 被阅读0次

    ES 建索引时默认是根据文档标识符 _id 将文档均分至多个分片。当搜索数据时,默认查询所有分片结果然后汇总,而并不必须知道数据到底存在哪个分片上。

    路由算法就是根据routing和文档id计算目标shardid的过程。
    一般情况下,路由计算方式为下面的公式:

    shard_num = hash(_routing) % num_primary_shards
    

    默认情况下,_routing值就是文档id。
    ES使用随机id和Hash算法来确保文档均匀地分配给分片。当使用自定义id或routing时, id 或 routing 值可能不够随机,造成数据倾斜,部分分片过大。在这种情况下,可以使用index.routing_partition_size 配置来减少倾斜的风险。routing_partition_size越大,数据的分布越均匀。
    在设置了index.routing_partition_size的情况下,计算公式为:

    shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards
    

    也就是说,对于同一个routing值,hash(_routing)的结果固定的,hash(_id) % routing_partition_size的结果有 routing_partition_size 个可能的值,两个组合在一起,对于同一个routing值的多个doc,也就能计算出 routing_partition_size 可能的shard了,即一个shard集合。
    index.routing_partition_size取值应具有大于1且小于index.number_of_shards的值

    计算过程的实现如下

        private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
            final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;
    
            // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
            // of original index to hash document
            return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
        }
    

    Search时如何根据routing找到指定的分片?

    例子

    GET /{index}/{type}/_search?routing=beijing
    

    通过发送search请求查询数据。指定了routing是beijing

    1、解析流程

    当es接收到上面的请求时,交给org.elasticsearch.rest.action.search.RestSearchAction处理,repareRequest方法中将请求体解析为SearchRequest数据结构

    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
            SearchRequest searchRequest = new SearchRequest();      
            IntConsumer setSize = size -> searchRequest.source().size(size);
            request.withContentOrSourceParamParserOrNull(parser ->
                parseSearchRequest(searchRequest, request, parser, setSize));
    
            return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));
        }
    
        public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,
                                              XContentParser requestContentParser,
                                              IntConsumer setSize) throws IOException {      
            String searchType = request.param("search_type");
            parseSearchSource(searchRequest.source(), request, setSize);        searchRequest.requestCache(request.paramAsBoolean("request_cache", null));
            String scroll = request.param("scroll");
            searchRequest.routing(request.param("routing")); //接收到routing参数,并封装到searchRequest中
            searchRequest.preference(request.param("preference"));
            searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));        searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", true));
            checkRestTotalHits(request, searchRequest);
        }
    
    构造目的shard列表

    prepareRequest方法构造请求后通过transport模块发送给org.elasticsearch.action.search.TransportSearchAction处理

        private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,
                                   OriginalIndices localIndices, String[] concreteIndices, Map<String, Set<String>> routingMap,
                                   Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
                                   List<SearchShardIterator> remoteShardIterators, BiFunction<String, String, DiscoveryNode> remoteConnections,
                                   ClusterState clusterState, ActionListener<SearchResponse> listener, SearchResponse.Clusters clusters) {
    
            Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
            GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
                    concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
            GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
                searchRequest.getLocalClusterAlias(), remoteShardIterators);
    
           。。。。
        }
    

    将请求涉及的本集群shard列表和远程集群的shard列表(远程集群用于跨集群访问)合并

    其中routing查找指定分片的流程就在org.elasticsearch.cluster.routing.OperationRouting.searchShards(ClusterState, String[], Map<String, Set<String>>, String, ResponseCollectorService, Map<String, Long>)方法中

        public GroupShardsIterator<ShardIterator> searchShards(ClusterState clusterState,
                                                               String[] concreteIndices,
                                                               @Nullable Map<String, Set<String>> routing,
                                                               @Nullable String preference,
                                                               @Nullable ResponseCollectorService collectorService,
                                                               @Nullable Map<String, Long> nodeCounts) {
            final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
    ····
        }
    
        private Set<IndexShardRoutingTable> computeTargetedShards(ClusterState clusterState, String[] concreteIndices,
                                                                  @Nullable Map<String, Set<String>> routing) {
       
            for (String index : concreteIndices) {
               ····
                if (effectiveRouting != null) {
                    for (String r : effectiveRouting) {
                     
                            set.add(RoutingTable.shardRoutingTable(indexRouting, calculateScaledShardId(indexMetaData, r, partitionOffset)));
                       
                    }
            
            }
            return set;
        }
    

    最终调用calculateScaledShardId方法计算出指定的分片

    相关文章

      网友评论

        本文标题:es源码笔记-Routing

        本文链接:https://www.haomeiwen.com/subject/wtxtbctx.html