美文网首页ESelasticsearch源码分析
Elasticsearch源码分析-搜索分析(二)

Elasticsearch源码分析-搜索分析(二)

作者: 尹亮_36cd | 来源:发表于2018-12-06 19:40 被阅读0次

    0. 前言

    在上一篇文章中主要讲述了elasticsearch搜索过程的第一部分,主要逻辑包括:
    1.接收search请求,然后对请求进行转发
    2.匹配相应的action,对请求参数进行解析
    3.根据search_type获取对应的TransportSearchTypeAction,执行具体的搜索逻辑

    本文将接着上篇文章,以QUERY_THEN_FETCH为例,讲解搜索的QUERY阶段具体逻辑

    1. 搜索参数初始化

    上篇文章可知,如果search_type为空(默认)或者为"query_then_fetch",那么elasticsearch会调用TransportSearchTypeAction的子类TransportSearchQueryThenFetch的execute()方法,以获取搜索结果,类图如下:

    TransportSearchQueryThenFetchClass.png

    从类图中可以看出,方法的执行顺序为:


    TransportSearchQueryThenFetchSeg.png

    因此elasticsearch的搜索过程,就是调用TransportSearchQueryThenFetchAction的doExecute()方法,具体就是构造AsyncAction对象,然后执行其start()方法

    public class TransportSearchQueryThenFetchAction extends TransportSearchTypeAction {
        @Override
        protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
            new AsyncAction(searchRequest, listener).start();
        }
        
        private class AsyncAction extends BaseAsyncAction<QuerySearchResultProvider> {
            final AtomicArray<FetchSearchResult> fetchResults;
            final AtomicArray<IntArrayList> docIdsToLoad;
            private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
                super(request, listener);
                fetchResults = new AtomicArray<>(firstResults.length());
                docIdsToLoad = new AtomicArray<>(firstResults.length());
            }
        }
    }
    

    AsyncAction和BaseAsyncAction以及TransportSearchQueryThenFetchAction类图如下:

    TransportSearchQueryThenFetchClass_1.png
    从上图和代码我们可以看到,执行AsyncAction的start()方法其实是执行BaseAsyncAction的start()方法,在构造AsyncAction时
    (1)调用父类BaseAsyncAction的构造方法
    (2)初始化两个数组,即fetchResults和docIdsToLoad
    fetchResults: 用来存储Fetch完成后shardIndex和对应的FetchResult
    docIdsToLoad: 用来存储Query完成后shardIndex和待Fetch的文档id集合DocId

    BaseAsyncAction的构造方法代码如下:

    protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
        protected BaseAsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
                ...
                String[] concreteIndices = clusterState.metaData().concreteIndices(request.indicesOptions(), request.indices());
                Map<String, Set<String>> routingMap = clusterState.metaData().resolveSearchRouting(request.routing(), request.indices());
                shardsIts = clusterService.operationRouting().searchShards(clusterState, request.indices(), concreteIndices, routingMap, request.preference());
                expectedSuccessfulOps = shardsIts.size();
                expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
                firstResults = new AtomicArray<>(shardsIts.size());
                ...
        }
    }
    

    在父类BaseAsyncAction的构造方法中,主要初始化了如下数据:
    clusterState: 集群状态信息
    nodes: 集群节点信息
    concreteIndices: request中的索引转化成实际要搜索的索引
    shardsIts: 需要搜索的index的shard,包括一些shard偏好
    expectedSuccessfulOps: 所需执行的shard总数
    expectedTotalOps: 期望多少个shard给出响应
    firstResults: 用来存储每个shard的Query结果集

    搜索请求使用preference参数支持搜索偏好,具体代码如下:

    private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId, DiscoveryNodes nodes, @Nullable String preference) {
        preferenceType = Preference.parse(preference);
        switch (preferenceType) {
            case PREFER_NODE:
                return indexShard.preferNodeActiveInitializingShardsIt(preference.substring(Preference.PREFER_NODE.type().length() + 1));
            case LOCAL:
                return indexShard.preferNodeActiveInitializingShardsIt(localNodeId);
            case PRIMARY:
                return indexShard.primaryActiveInitializingShardIt();
            case PRIMARY_FIRST:
                return indexShard.primaryFirstActiveInitializingShardsIt();
            case ONLY_LOCAL:
                return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
            case ONLY_NODE:
                String nodeId = preference.substring(Preference.ONLY_NODE.type().length() + 1);
                ensureNodeIdExists(nodes, nodeId);
                return indexShard.onlyNodeActiveInitializingShardsIt(nodeId);
            default:
                throw new ElasticsearchIllegalArgumentException("unknown preference [" + preferenceType + "]");
        }
    }
    

    preference类型具体的含义如下:
    _prefer_node: 优选使用提供的节点标识
    _local: 查询将优先在本地分配的分片上执行
    _primary: 查询将在主分片上执行, 如果不可用,将在其他分片上执行
    _primary_first: 该查询将仅在主分片上执行
    _only_local: 查询将仅在本地分配的分片上执行
    _only_node: 将查询限制为仅在具有提供的节点标识的节点上执行

    在调用AsyncAction的构造方法完成参数初始化之后,就会调用BaseAsyncAction的start()开始搜索

    2. 搜索QUERY整体流程

    start()的逻辑主要是遍历每一个要搜索的shard,对每个shard都去调用performFirstPhase()方法获取搜索结果

    protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
        public void start() {
            ...
            int shardIndex = -1;
            for (final ShardIterator shardIt : shardsIts) {
                shardIndex++;
                final ShardRouting shard = shardIt.nextOrNull();
                if (shard != null) {
                    performFirstPhase(shardIndex, shardIt, shard);
                }
                ...
            }
        }
    

    在performFirstPhase()中主要是获取要搜索shard所在的节点,然后在指定的节点上执行搜索,拿到搜索结果后调用onFirstPhaseResult()方法获取待fetch的doc id集合,然后判断是否可以进入第二阶段执行Fetch

    protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
        void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
            if (shard == null) {
                ...
            } else {
                final DiscoveryNode node = nodes.get(shard.currentNodeId());
                if (node == null) {
                    ...
                } else {
                    String[] filteringAliases = clusterState.metaData().filteringAliases(shard.index(), request.indices());
                    sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime(), useSlowScroll), new SearchServiceListener<FirstResult>() {
                        @Override
                        public void onResult(FirstResult result) {
                            onFirstPhaseResult(shardIndex, shard, result, shardIt);
                        }
    
                        @Override
                        public void onFailure(Throwable t) {
                            onFirstPhaseResult(shardIndex, shard, node.id(), shardIt, t);
                        }
                    });
                }
            }
        }
    

    在onFirstPhaseResult()中主要是通过processFirstPhaseResult()方法,将第一阶段的Query结果放入firstResults对象中,然后判断已经获得结果的shard数是否和期望得到响应的shard数一致,如果一致,则进入第二阶段执行Fetch获取文档详情

    protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
        void onFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result, ShardIterator shardIt) {
            result.shardTarget(new SearchShardTarget(shard.currentNodeId(), shard.index(), shard.id()));
            processFirstPhaseResult(shardIndex, shard, result);
            successfulOps.incrementAndGet();
            final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1);
            if (xTotalOps == expectedTotalOps) {
                try {
                    innerMoveToSecondPhase();
                } catch (Throwable e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "] while moving to second phase", e);
                    }
                    raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
                }
            } 
            ...
        }
    }
    

    3.在shard上进行搜索

    通过上一部分,我们得知elasticsearch会获取要搜索的shard所在的节点,然后通过sendExecuteFirstPhase()方法将search request发送到该节点执行Query,以获取相应的search结果,且sendExecuteFirstPhase会被不同的search_type重写。

    1.QUERY_THEN_FETCH的sendExecuteFirstPhase会继续调用SearchServiceTransportAction.sendExecuteQuery继续发送search request。

    private class AsyncAction extends BaseAsyncAction<QuerySearchResultProvider> {
        @Override
        protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<QuerySearchResultProvider> listener) {
            searchService.sendExecuteQuery(node, request, listener);
        }
    }
    

    2.在SearchServiceTransportAction的sendExecuteQuery()方法中,会首先判断要请求的节点是否是当前节点:
    2.1 如果是当前节点,则直接执行execute()方法。在execute()方法中,线程池首先会执行入参传的callable的call()方法,根据执行的结果,如果为null则执行listener的onFailure()方法,否则执行onResult()方法
    在callable的call()方法中,直接执行SearchService的executeQueryPhase()方法。

    public class SearchServiceTransportAction extends AbstractComponent {
        public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, final SearchServiceListener<QuerySearchResultProvider> listener) {
            if (clusterService.state().nodes().localNodeId().equals(node.id())) {
                execute(new Callable<QuerySearchResultProvider>() {
                    @Override
                    public QuerySearchResultProvider call() throws Exception {
                        return searchService.executeQueryPhase(request);
                    }
                }, listener);
            } else {
                ...
            }
        }
    
        private <T> void execute(final Callable<? extends T> callable, final SearchServiceListener<T> listener) {
            try {
                threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
                    @Override
                    public void run() {
                        T result = null;
                        Throwable error = null;
                        try {
                            result = callable.call();
                        } catch (Throwable t) {
                            error = t;
                        } finally {
                            if (result == null) {
                                assert error != null;
                                listener.onFailure(error);
                            } else {
                                assert error == null : error;
                                listener.onResult(result);
                            }
                        }
                    }
                });
            } catch (Throwable t) {
                listener.onFailure(t);
            }
        }
    }
    

    2.2 如果要search的不是当前node,则需要将request发送到指定的node上,该方法带了两个比较重要的入参QUERY_ACTION_NAME和BaseTransportResponseHandler对象

    transportService.sendRequest(node, QUERY_ACTION_NAME, request, new BaseTransportResponseHandler<QuerySearchResultProvider>() {
            @Override
            public QuerySearchResult newInstance() {
                return new QuerySearchResult();
            }
            @Override
            public void handleResponse(QuerySearchResultProvider response) {
                listener.onResult(response);
            }
            @Override
            public void handleException(TransportException exp) {
                listener.onFailure(exp);
            }
            @Override
            public String executor() {
                return ThreadPool.Names.SAME;
            }
        });
    

    在elasticsearch启动时,会注入SearchServiceTransportAction,会将QUERY_ACTION_NAME注册到SearchQueryTransportHandler上

    public class SearchServiceTransportAction extends AbstractComponent {
        @Inject
        public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, SearchService searchService) {
            super(settings);
            transportService.registerHandler(QUERY_ACTION_NAME, new SearchQueryTransportHandler());
            ...
        }
    }
    

    在SearchQueryTransportHandler接收到消息之后,也会执行SearchService的executeQueryPhase()方法获取search结果,这种方式实现了本地和远程搜索

    private class SearchQueryTransportHandler extends BaseTransportRequestHandler<ShardSearchTransportRequest> {
        @Override
        public ShardSearchTransportRequest newInstance() {
            return new ShardSearchTransportRequest();
        }
        @Override
        public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception {
            QuerySearchResultProvider result = searchService.executeQueryPhase(request);
            channel.sendResponse(result);
        }
    
        @Override
        public String executor() {
            return ThreadPool.Names.SEARCH;
        }
    }
    

    4.搜索Query逻辑

    在SearchService的executeQueryPhase()中,主要步骤是:
    (1) 创建search context
    (2) 预处理query
    (3) 加载缓存或者查询lucene
    (4) 如果search_type是count,则释放search context
    (5) 记录慢query日志

    public class SearchService extends AbstractLifecycleComponent<SearchService> {
        public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException {
            final SearchContext context = createAndPutContext(request);
            try {
                context.indexShard().searchService().onPreQueryPhase(context);
                long time = System.nanoTime();
                contextProcessing(context);
                loadOrExecuteQueryPhase(request, context, queryPhase);
                if (context.searchType() == SearchType.COUNT) {
                    freeContext(context.id());
                } else {
                    contextProcessedSuccessfully(context);
                }
                context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time);
    
                return context.queryResult();
            } catch (Throwable e) {
                ...
            } finally {
                cleanContext(context);
            }
        }
    }
    

    在调用createAndPutContext创建context时,主要操作如下:
    (1) 根据index shard获取对应的lucene searcher,即lucene的搜索对象
    (2) 设置scroll,解析source,设置from和size
    (3) query预处理,将elasticsearch的query转化为lucene的query
    (4) search context默认保存5分钟

    在创建完context后,开始调用loadOrExecuteQueryPhase执行查询,如果search_type是COUNT且满足cache的条件,则从cache获取结果,否则调用QueryPhase的execute()方法执行lucene搜索

    public class QueryPhase implements SearchPhase {
        public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
            boolean rescore = false;
            try {
                searchContext.queryResult().from(searchContext.from());
                searchContext.queryResult().size(searchContext.size());
    
                Query query = searchContext.query();
                TopDocs topDocs;
                int numDocs = searchContext.from() + searchContext.size();
                if (searchContext.searchType() == SearchType.COUNT || numDocs == 0) {
                    TotalHitCountCollector collector = new TotalHitCountCollector();
                    searchContext.searcher().search(query, collector);
                    topDocs = new TopDocs(collector.getTotalHits(),
                    Lucene.EMPTY_SCORE_DOCS, 0);
                } else if (searchContext.searchType() == SearchType.SCAN) {
                    topDocs = searchContext.scanContext().execute(searchContext);
                } else {
                    if (!searchContext.useSlowScroll() && searchContext.request().scroll() != null) {
                        
                    } else {
                        if (searchContext.sort() != null) {
                            topDocs = searchContext.searcher().search(
                                        query, 
                                        null,
                                        numDocs, searchContext.sort(),
                                        searchContext.trackScores(),
                                        searchContext.trackScores());
                        } else {
                            topDocs = searchContext.searcher().search(query, numDocs);
                        }
                    }
                }
                searchContext.queryResult().topDocs(topDocs);
            } catch (Throwable e) {
                throw new QueryPhaseExecutionException(searchContext, "Failed to execute main query", e);
            } finally {
                                        searchContext.searcher().finishStage(ContextIndexSearcher.Stage.MAIN_QUERY);
            }
            if (rescore) { // only if we do a regular search
                rescorePhase.execute(searchContext);
            }
            suggestPhase.execute(searchContext);
            facetPhase.execute(searchContext);
            aggregationPhase.execute(searchContext);
        }
    }
    

    lucene search完成后将top docs放入search context中,至此已经完成了一个shard的搜索。如果已经搜索shard数和期望要执行的shard数相等,那么执行innerMoveToSecondPhase()方法进入第二阶段执行Fetch,具体逻辑将会在第三篇文章中介绍。

    相关文章

      网友评论

        本文标题:Elasticsearch源码分析-搜索分析(二)

        本文链接:https://www.haomeiwen.com/subject/wvhacqtx.html