ES查询流程
查询流程总体可以分为以下几个部分
- 请求解析
- 搜索Query阶段
- 搜索Fetch阶段
- 数据排序和合并
1.总体流程
2.请求解析
2.1 解析request对象并构建SearchRequest对象
对于客户端发过来的search请求,最终会在RestSearchAction#prepareRequest方法中处理
SearchRequest searchRequest = new SearchRequest();
IntConsumer setSize = size -> searchRequest.source().size(size);
//根据request中的参数,构建SearchRequest对象
request.withContentOrSourceParamParserOrNull(parser ->
parseSearchRequest(searchRequest, request, parser, setSize));
//执行搜索请求
return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));
2.2 调用search
经过层层封装,会调用NodeClient#executeLocally方法
public < Request extends ActionRequest,
Response extends ActionResponse
> Task executeLocally(GenericAction<Request, Response> action, Request request, ActionListener<Response> listener) {
return transportAction(action).execute(request, listener);
}
之后会执行TransportAction#execute方法
public final Task execute(Request request, ActionListener<Response> listener) {
Task task = taskManager.register("transport", actionName, request);
if (task == null) {
execute(null, request, listener);
} else {
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
taskManager.unregister(task);
listener.onResponse(response);
}
@Override
public void onFailure(Exception e) {
taskManager.unregister(task);
listener.onFailure(e);
}
});
}
return task;
}
最后会调用TransportSearchAction#doExecute方法
3.Query阶段
3.1 获取本地shard和远程shard并合并
TransportSearchAction#doExecute方法
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
...
if (remoteClusterIndices.isEmpty()) { //无远程索引
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(),
(clusterName, nodeId) -> {
return null;
}, clusterState, Collections.emptyMap(), listener, clusterState.getNodes()
.getDataNodes().size(), SearchResponse.Clusters.EMPTY);
} else { //有远程索引
//收集所有远程shard
remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(),
searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> {
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses,
remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
int numNodesInvolved = searchShardsResponses.values().stream().mapToInt(r -> r.getNodes().length).sum()
+ clusterState.getNodes().getDataNodes().size();
SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses);
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices,
remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, numNodesInvolved,
clusters);
}, listener::onFailure));
}
}, listener::onFailure);
if (searchRequest.source() == null) {
rewriteListener.onResponse(searchRequest.source());
} else {
Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener);
}
}
执行executeSearch方法
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
Map<String, OriginalIndices> remoteClusterIndices, List<SearchShardIterator> remoteShardIterators,
BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,
Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener, int nodeCount,
SearchResponse.Clusters clusters) {
...
//获取本地shard
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
//将本地shard和远程shard合并
GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
remoteShardIterators);
...
//是否为QUERY_THEN_FETCH
boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
//异步执行搜索
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
}
3.2 遍历所有shard执行请求
执行搜索方法searchAsyncAction(...).start()
private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest,
GroupShardsIterator<SearchShardIterator> shardIterators,
SearchTimeProvider timeProvider,
BiFunction<String, String, Transport.Connection> connectionLookup,
long clusterStateVersion,
Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
Map<String, Set<String>> indexRoutings,
ActionListener<SearchResponse> listener,
boolean preFilter,
SearchResponse.Clusters clusters) {
//使用search线程池管理搜索请求
Executor executor = threadPool.executor(ThreadPool.Names.SEARCH);
if (preFilter) { //搜索类型为QUERY_THEN_FETCH
return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, indexRoutings, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task, (iter) -> {
AbstractSearchAsyncAction action = searchAsyncAction(task, searchRequest, iter, timeProvider, connectionLookup,
clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters);
return new SearchPhase(action.getName()) {
@Override
public void run() {
action.start();
}
};
}, clusters);
} else { //执行搜索type
AbstractSearchAsyncAction searchAsyncAction;
switch (searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,
shardIterators, timeProvider, clusterStateVersion, task, clusters);
break;
case QUERY_AND_FETCH:
case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,
shardIterators, timeProvider, clusterStateVersion, task, clusters);
break;
default:
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
}
return searchAsyncAction;
}
}
这里只分析QUERY_THEN_FETCH这个type类型,最终会调用InitialSearchPhase的run方法,该方法会执行每一个shard请求
@Override
public final void run() throws IOException {
...
if (shardsIts.size() > 0) {
//最大分片请求数
int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
...
for (int index = 0; index < maxConcurrentShardRequests; index++) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
assert shardRoutings.skip() == false;
//执行shard请求
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
}
}
}
调用performPhaseOnShard方法执行shard请求
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {
final Thread thread = Thread.currentThread();
if (shard == null) {
fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
} else {
try {
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
//收到执行成功回复
@Override
public void innerOnResponse(FirstResult result) {
maybeFork(thread, () -> onShardResult(result, shardIt));
}
//收到执行失败回复
@Override
public void onFailure(Exception t) {
maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t));
}
});
} catch (final Exception e) {
/*
* It is possible to run into connection exceptions here because we are getting the connection early and might run in to
* nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
*/
fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e));
}
}
}
会调用TransportInterceptor#sendRequest方法发送请求
3.3 收集结果
调用onShardResult收集成功结果
private void onShardResult(FirstResult result, SearchShardIterator shardIt) {
assert result.getShardIndex() != -1 : "shard index is not set";
assert result.getSearchShardTarget() != null : "search shard target must not be null";
//对所有结果进行收集
onShardSuccess(result);
//是否所有请求都已收到回复
successfulShardExecution(shardIt);
}
onShardSuccess对所有结果进行收集
@Override
public final void onShardSuccess(Result result) {
successfulOps.incrementAndGet();
results.consumeResult(result);
if (logger.isTraceEnabled()) {
logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);
}
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
if (shardFailures != null) {
shardFailures.set(result.getShardIndex(), null);
}
}
successfulShardExecution是否所有请求都已收到回复,都收到之后进行下一阶段
private void successfulShardExecution(SearchShardIterator shardsIt) {
final int remainingOpsOnIterator;
if (shardsIt.skip()) {
remainingOpsOnIterator = shardsIt.remaining();
} else {
remainingOpsOnIterator = shardsIt.remaining() + 1;
}
final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);
//收到全部请求结果
if (xTotalOps == expectedTotalOps) {
onPhaseDone();
} else if (xTotalOps > expectedTotalOps) {
throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected ["
+ expectedTotalOps + "]");
} else if (shardsIt.skip() == false) {
maybeExecuteNext();
}
}
4.Fetch阶段
4.1 发送fetch请求
onPhaseDone()会调用AbstractSearchAsyncAction#executeNextPhase方法
@Override
public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPhase) {
/* This is the main search phase transition where we move to the next phase. At this point we check if there is
* at least one successful operation left and if so we move to the next phase. If not we immediately fail the
* search phase as "all shards failed"*/
if (successfulOps.get() == 0) { // 无搜索结果
final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures());
Throwable cause = shardSearchFailures.length == 0 ? null :
ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
logger.debug(() -> new ParameterizedMessage("All shards failed for phase: [{}]", getName()), cause);
onPhaseFailure(currentPhase, "all shards failed", cause);
} else { //如果请求将产生部分结果, 设置为false用来返回整体故障
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && shardFailures.get() != null ){
if (logger.isDebugEnabled()) {
final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures());
Throwable cause = shardSearchFailures.length == 0 ? null :
ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
shardSearchFailures.length, getName()), cause);
}
onPhaseFailure(currentPhase, "Partial shards failure", null);
} else { //将在超时或部分失败的情况下, 返回部分结果.
if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
}
//执行下一阶段方法
executePhase(nextPhase);
}
}
}
private void executePhase(SearchPhase phase) {
try {
phase.run();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
}
onPhaseFailure(phase, "", e);
}
}
FetchSearchPhase继承了SearchPhase类,所以最终会调用FetchSearchPhase#run方法
@Override
public void run() throws IOException {
context.execute(new ActionRunnable<SearchResponse>(context) {
@Override
public void doRun() throws IOException {
//具体执行方法
innerRun();
}
@Override
public void onFailure(Exception e) {
context.onPhaseFailure(FetchSearchPhase.this, "", e);
}
});
}
innerRun()方法
private void innerRun() throws IOException {
...
//查询都处理完之后进入下一阶段
final Runnable finishPhase = ()
-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
queryResults : fetchResults);
...
if (queryAndFetchOptimization) {
assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults empty [" + phaseResults.isEmpty()
+ "], single result: " + phaseResults.get(0).fetchResult();
// query AND fetch optimization
finishPhase.run();
} else {
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs);
if (reducedQueryPhase.scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return
phaseResults.stream()
.map(SearchPhaseResult::queryResult)
.forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
finishPhase.run();
} else {
final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?
searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards)
: null;
//定义收集器
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r),
docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
finishPhase, context);
for (int i = 0; i < docIdsToLoad.length; i++) {
IntArrayList entry = docIdsToLoad[i];
SearchPhaseResult queryResult = queryResults.get(i);
if (entry == null) { // no results for this shard ID
if (queryResult != null) {
// if we got some hits from this shard we have to release the context there
// we do this as we go since it will free up resources and passing on the request on the
// transport layer is cheap.
releaseIrrelevantSearchContext(queryResult.queryResult());
}
// in any case we count down this result since we don't talk to this shard anymore
counter.countDown();
} else {
SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(),
searchShardTarget.getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,
lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());
//执行Fetch
executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(),
connection);
}
}
}
}
}
executeFetch方法会处理成功或失败的请求,executeFetch的参数querySearchResult中包含分页信息,最后定义一个Listener,每成功获取一个shard数据后就执行counter.onResult,调用对结果的处理回调,把result保存到数组中,然后执行countDown
private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget,
final CountedCollector<FetchSearchResult> counter,
final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult,
final Transport.Connection connection) {
// 发送请求
context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(),
new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {
@Override
//处理成功的请求
public void innerOnResponse(FetchSearchResult result) {
counter.onResult(result);
}
@Override
//处理失败的请求
public void onFailure(Exception e) {
try {
logger.debug(() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.id()), e);
counter.onFailure(shardIndex, shardTarget, e);
} finally {
// the search context might not be cleared on the node where the fetch was executed for example
// because the action was rejected by the thread pool. in this case we need to send a dedicated
// request to clear the search context.
releaseIrrelevantSearchContext(querySearchResult);
}
}
});
}
onResult方法
void onResult(R result) {
try {
resultConsumer.accept(result);
} finally {
countDown();
}
}
4.2 收集结果
当所有shard数据收集完毕后,countDown会执行finishPhase,最终会触发ExpandSearchPhase
FetchSearchPhase(InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer,
SearchPhaseController searchPhaseController,
SearchPhaseContext context) {
this(resultConsumer, searchPhaseController, context,
(response, scrollId) -> new ExpandSearchPhase(context, response, // collapse only happens if the request has inner hits
(finalResponse) -> sendResponsePhase(finalResponse, scrollId, context)));
}
4.3 ExpandSearchPhase
取回所有数据之后,执行ExpandSearchPhase#方法
@Override
public void run() throws IOException {
if (isCollapseRequest() && searchResponse.hits().getHits().length > 0) {
SearchRequest searchRequest = context.getRequest();
CollapseBuilder collapseBuilder = searchRequest.source().collapse();
final List<InnerHitBuilder> innerHitBuilders = collapseBuilder.getInnerHits();
MultiSearchRequest multiRequest = new MultiSearchRequest();
if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {
multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());
}
for (SearchHit hit : searchResponse.hits().getHits()) {
BoolQueryBuilder groupQuery = new BoolQueryBuilder();
Object collapseValue = hit.field(collapseBuilder.getField()).getValue();
if (collapseValue != null) {
groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));
} else {
groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));
}
QueryBuilder origQuery = searchRequest.source().query();
if (origQuery != null) {
groupQuery.must(origQuery);
}
for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {
CollapseBuilder innerCollapseBuilder = innerHitBuilder.getInnerCollapseBuilder();
SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(innerHitBuilder, innerCollapseBuilder)
.query(groupQuery)
.postFilter(searchRequest.source().postFilter());
SearchRequest groupRequest = buildExpandSearchRequest(searchRequest, sourceBuilder);
multiRequest.add(groupRequest);
}
}
context.getSearchTransport().sendExecuteMultiSearch(multiRequest, context.getTask(),
ActionListener.wrap(response -> {
Iterator<MultiSearchResponse.Item> it = response.iterator();
for (SearchHit hit : searchResponse.hits.getHits()) {
for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {
MultiSearchResponse.Item item = it.next();
if (item.isFailure()) {
context.onPhaseFailure(this, "failed to expand hits", item.getFailure());
return;
}
SearchHits innerHits = item.getResponse().getHits();
if (hit.getInnerHits() == null) {
hit.setInnerHits(new HashMap<>(innerHitBuilders.size()));
}
hit.getInnerHits().put(innerHitBuilder.getName(), innerHits);
}
}
context.executeNextPhase(this, nextPhaseFactory.apply(searchResponse));
}, context::onFailure)
);
} else {
context.executeNextPhase(this, nextPhaseFactory.apply(searchResponse));
}
}
4.4 回复客户端
通过sendResponsePhase方法返回给客户端
private static SearchPhase sendResponsePhase(InternalSearchResponse response, String scrollId, SearchPhaseContext context) {
return new SearchPhase("response") {
@Override
public void run() throws IOException {
context.onResponse(context.buildSearchResponse(response, scrollId));
}
};
}
5 数据结点执行搜索过程
对于所有的Query和Fetch过程的入口函数在SearchTransportService#registerRequestHandler
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
//收到请求
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
//执行查询
searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
@Override
//处理成功结果
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
channel.sendResponse(searchPhaseResult);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
//处理失败的结果
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
}
});
}
});
查询入口函数是searchService.executeQueryPhase,查询是会优先差是否有缓存,如果有缓存,就查缓存
SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException {
final SearchContext context = createAndPutContext(request);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
boolean queryPhaseSuccess = false;
try {
context.setTask(task);
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
contextProcessing(context);
//查询是否有缓存
loadOrExecuteQueryPhase(request, context);
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
}
final long afterQueryTime = System.nanoTime();
queryPhaseSuccess = true;
operationListener.onQueryPhase(context, afterQueryTime - time);
if (request.numberOfShards() == 1) {
return executeFetchPhase(context, operationListener, afterQueryTime);
}
return context.queryResult();
} catch (Exception e) {
// execution exception can happen while loading the cache, strip it
if (e instanceof ExecutionException) {
e = (e.getCause() == null || e.getCause() instanceof Exception) ?
(Exception) e.getCause() : new ElasticsearchException(e.getCause());
}
if (!queryPhaseSuccess) {
operationListener.onFailedQueryPhase(context);
}
logger.trace("Query phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
}
}
loadOrExecuteQueryPhase有缓存从缓存加载数据,判断index.requests.cache.enable是否为true(默认为true),来判断是否有缓存。这个cache由节点的所有分片共享,空间满的时候删除最近最少使用的数据,cache并不缓存全部搜索结果
/**
* Try to load the query results from the cache or execute the query phase directly if the cache cannot be used.
*/
private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {
final boolean canCache = indicesService.canCache(request, context);
context.getQueryShardContext().freezeContext();
if (canCache) {
//查缓存
indicesService.loadIntoContext(request, context, queryPhase);
} else {
//查Lucene
queryPhase.execute(context);
}
}
queryPhase.execute查询方法
@Override
public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
if (searchContext.hasOnlySuggest()) {
suggestPhase.execute(searchContext);
// TODO: fix this once we can fetch docs for suggestions
searchContext.queryResult().topDocs(
new TopDocs(0, Lucene.EMPTY_SCORE_DOCS, 0),
new DocValueFormat[0]);
return;
}
// Pre-process aggregations as late as possible. In the case of a DFS_Q_T_F
// request, preProcess is called on the DFS phase phase, this is why we pre-process them
// here to make sure it happens during the QUERY phase
aggregationPhase.preProcess(searchContext);
final ContextIndexSearcher searcher = searchContext.searcher();
boolean rescore = execute(searchContext, searchContext.searcher(), searcher::setCheckCancelled);
if (rescore) { //全文搜索且需要打分
rescorePhase.execute(searchContext);
}
//自动补全及纠错
suggestPhase.execute(searchContext);
//实现聚合
aggregationPhase.execute(searchContext);
if (searchContext.getProfilers() != null) {
ProfileShardResult shardResults = SearchProfileShardResults
.buildShardResults(searchContext.getProfilers());
searchContext.queryResult().profileResults(shardResults);
}
}
慢操作Query日志的统计时间在于本阶段的处理时间
网友评论