美文网首页elasticsearch源码分析
Elasticsearch源码分析-Suggest分析

Elasticsearch源码分析-Suggest分析

作者: 尹亮_36cd | 来源:发表于2019-03-29 16:41 被阅读0次

    0. 前言

    对于大部分搜索系统而言,都会有搜索提示功能,即输入一个词,会提示拥有该词/前缀词的信息,而elasticsearch的suggest能较好地支持该功能

    搜索提示

    suggest请求对应的url pattern为 /_suggest 和 /{index}/_suggest,支持Get和Post 请求

    curl -X POST 'localhost:9200/test_index/_suggest?pretty' -d '{
        "myCompletion" : {
            "text" : "10",
            "completion" : {
                "field" : "suggest term"
            }
        }
    }'
    

    1 请求Handler

    suggest url对应的handler为RestSuggestAction,支持的参数如下:
    routing: 路由信息
    preference: 偏好信息
    在解析完参数,会执行client.suggest()进行搜索提示,传入的action参数为SuggestAction.INSTANCE,它绑定的action为TransportSuggestAction类

    public class ActionModule extends AbstractModule {
        @Override
        protected void configure() {
            registerAction(SuggestAction.INSTANCE, TransportSuggestAction.class);
        }
    }
    

    因此,在执行TransportAction.execute() 时,会执行TransportSuggestAction父类TransportBroadcastOperationAction.doExecute()方法,即为处理请求的Action入口

    TransportSuggestAction类图

    2 处理Action

    TransportBroadcastOperationAction的doExecute()方法中,主要是调用AsyncBroadcastAction的start()方法执行suggest逻辑

    public abstract class TransportBroadcastOperationAction<Request extends BroadcastOperationRequest, 
                  Response extends BroadcastOperationResponse, 
                  ShardRequest extends BroadcastShardOperationRequest, 
                  ShardResponse extends BroadcastShardOperationResponse>
        @Override
        protected void doExecute(Request request, ActionListener<Response> listener) {
            new AsyncBroadcastAction(request, listener).start();
        }
    }
    

    suggest 主要流程如下:
    (1)根据请求的索引获取所有对应的shards信息

    protected class AsyncBroadcastAction {
            private final GroupShardsIterator shardsIts;
            private final int expectedOps;
    
            protected AsyncBroadcastAction(Request request, ActionListener<Response> listener) {
                // ...
                shardsIts = shards(clusterState, request, concreteIndices);
                expectedOps = shardsIts.size();
    
            }
    }
    

    获取suggest索引对应shards的方式与搜索时获取索引对应shards方式一致,主要就是
    ① 先获取索引对应的路由信息,包括匹配相应的通配符
    ② 根据路由信息和偏好信息获取对应的shards

    public class TransportSuggestAction extends 
                    TransportBroadcastOperationAction<SuggestRequest, SuggestResponse, ShardSuggestRequest, ShardSuggestResponse> {
        @Override
        protected GroupShardsIterator shards(ClusterState clusterState, SuggestRequest request, String[] concreteIndices) {
            // 获取请求索引对应的routing 信息
            Map<String, Set<String>> routingMap = clusterState.metaData()
                    .resolveSearchRouting(request.routing(), request.indices());
            // 根据routing 和 preference 偏好获取对应的shards 信息
            return clusterService.operationRouting()
                    .searchShards(clusterState, request.indices(), concreteIndices, routingMap, request.preference());
        }
    }
    

    (2)遍历每一个shard执行performOperation()方法

    protected class AsyncBroadcastAction {
            private final GroupShardsIterator shardsIts;
            private final int expectedOps;
     
            public void start() {
                if (shardsIts.size() == 0) {
                    // no shards
                    try {
                        listener.onResponse(newResponse(request, new AtomicReferenceArray(0), clusterState));
                    } catch (Throwable e) {
                        listener.onFailure(e);
                    }
                    return;
                }
                // count the local operations, and perform the non local ones
                int shardIndex = -1;
                for (final ShardIterator shardIt : shardsIts) {
                    shardIndex++;
                    final ShardRouting shard = shardIt.nextOrNull();
                    if (shard != null) {
                        performOperation(shardIt, shard, shardIndex);
                    } else {
                        // really, no shards active in this group
                        onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
                    }
                }
            }
    }
    

    (3)对每个shard执行TransportSuggestAction的shardOperation()方法查询索引
    ①如果shard所在节点为当前节点,则使用线程池执行onOperation()方法
    ②如果shard所在节点不是当前节点,则使用tcp方式将请求发送到对应节点上,action name为SuggestAction.NAME + [s],即indices:data/read/suggest[s],发送到的handler为ShardTransportHandler

    protected class AsyncBroadcastAction {
            protected TransportBroadcastOperationAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
                    super(settings, actionName, threadPool, actionFilters);
                    this.transportShardAction = actionName + "[s]";
                    transportService.registerHandler(transportShardAction, new ShardTransportHandler());
            }
            protected void performOperation(final ShardIterator shardIt, final ShardRouting shard, final int shardIndex) {
                if (shard == null) {
                    // no more active shards... (we should not really get here, just safety)
                    onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
                } else {
                    try {
                        final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request);
                        if (shard.currentNodeId().equals(nodes.localNodeId())) {
                            threadPool.executor(executor).execute(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        onOperation(shard, shardIndex, shardOperation(shardRequest));
                                    } catch (Throwable e) {
                                        onOperation(shard, shardIt, shardIndex, e);
                                    }
                                }
                            });
                        } else {
                            DiscoveryNode node = nodes.get(shard.currentNodeId());
                            if (node == null) {
                                // no node connected, act as failure
                                onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
                            } else {
                                transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler<ShardResponse>() {
                                    // ...
                                    @Override
                                    public void handleResponse(ShardResponse response) {
                                        onOperation(shard, shardIndex, response);
                                    }
                                });
                            }
                        }
                    } catch (Throwable e) {
                        onOperation(shard, shardIt, shardIndex, e);
                    }
                }
            }
    }
    

    在shard所在节点接到请求后,会同样执行shardOperation()方法

    class ShardTransportHandler extends BaseTransportRequestHandler<ShardRequest> {
            @Override
            public ShardRequest newInstance() {
                return newShardRequest();
            }
            @Override
            public String executor() {
                return executor;
            }
            @Override
            public void messageReceived(final ShardRequest request, final TransportChannel channel) throws Exception {
                channel.sendResponse(shardOperation(request));
            }
        }
    

    (4)如果所有shard均已执行完毕,则调用finishHim()合并shard结果并返回响应
    当每请求一个shard后,counterOps会加一;当counterOps大小和expectedOps(请求的索引对应所有的shards个数)一致时,执行finishHim()方法
    在finishHim()方法中,主要是调用newResponse()来构造响应结果

    protected class AsyncBroadcastAction {
            protected void onOperation(ShardRouting shard, int shardIndex, ShardResponse response) {
                logger.trace("received response for {}", shard);
                shardsResponses.set(shardIndex, response);
                if (expectedOps == counterOps.incrementAndGet()) {
                    finishHim();
                }
            }
            protected void finishHim() {
                try {
                    listener.onResponse(newResponse(request, shardsResponses, clusterState));
                } catch (Throwable e) {
                    listener.onFailure(e);
                }
            }
    }
    

    遍历每一个shard的响应结果,合并后返回SuggestResponse对象

    public class TransportSuggestAction extends 
                    TransportBroadcastOperationAction<SuggestRequest, SuggestResponse, ShardSuggestRequest, ShardSuggestResponse> {
            @Override
        protected SuggestResponse newResponse(SuggestRequest request, 
                             AtomicReferenceArray shardsResponses, 
                            ClusterState clusterState) {
            int successfulShards = 0;
            int failedShards = 0;
    
            final Map<String, List<Suggest.Suggestion>> groupedSuggestions = new HashMap<>();
    
            List<ShardOperationFailedException> shardFailures = null;
            for (int i = 0; i < shardsResponses.length(); i++) {
                Object shardResponse = shardsResponses.get(i);
                if (shardResponse == null) {
                    // simply ignore non active shards
                } else if (shardResponse instanceof BroadcastShardOperationFailedException) {
                    failedShards++;
                    if (shardFailures == null) {
                        shardFailures = newArrayList();
                    }
                    shardFailures.add(new DefaultShardOperationFailedException(
                            (BroadcastShardOperationFailedException) shardResponse)
                    );
                } else {
                    Suggest suggest = ((ShardSuggestResponse) shardResponse).getSuggest();
                    Suggest.group(groupedSuggestions, suggest);
                    successfulShards++;
                }
            }
    
            return new SuggestResponse(
                    new Suggest(Suggest.reduce(groupedSuggestions)), 
                    shardsResponses.length(), 
                    successfulShards, 
                    failedShards, 
                    shardFailures
            );
        }
    }
    

    3 suggest逻辑实现

    在shardOperation()方法中,主要
    ① 获取IndexService和IndexShard对象
    ② 并解析suggest source
    ③ 创建suggest context
    ④ 调用SuggestPhase.execute()方法进入suggest阶段

    public class TransportSuggestAction extends 
                    TransportBroadcastOperationAction<SuggestRequest, SuggestResponse, ShardSuggestRequest, ShardSuggestResponse> {
        @Override
        protected ShardSuggestResponse shardOperation(ShardSuggestRequest request) throws ElasticsearchException {
            // 获取index service
            // ...
            try (Engine.Searcher searcher = indexShard.acquireSearcher("suggest")) {
                BytesReference suggest = request.suggest();
                if (suggest != null && suggest.length() > 0) {
                    parser = XContentFactory.xContent(suggest).createParser(suggest);
                    // 创建suggest context
                    final SuggestionSearchContext context = suggestPhase.parseElement().parseInternal(
                            parser, 
                            indexService.mapperService(),
                            indexService.queryParserService(), 
                            request.shardId().getIndex(), 
                            request.shardId().id()
                    );
                    final Suggest result = suggestPhase.execute(context, searcher.searcher());
                    return new ShardSuggestResponse(request.shardId(), result);
                }
                return new ShardSuggestResponse(request.shardId(), new Suggest());
            } catch (Throwable ex) {
    
            } finally {
                // ...
            }
        }
    }
    

    在suggest阶段,主要是遍历每一个suggestion,调用Suggester.execute()执行suggest

    public class SuggestPhase extends AbstractComponent implements SearchPhase {
        public Suggest execute(SuggestionSearchContext suggest, IndexSearcher searcher) {
            try {
                CharsRefBuilder spare = new CharsRefBuilder();
                final List<Suggestion<? extends Entry<? extends Option>>> suggestions = 
                        new ArrayList<>(suggest.suggestions().size());
    
                for (Map.Entry<String, SuggestionSearchContext.SuggestionContext> entry : suggest.suggestions().entrySet()) {
                    SuggestionSearchContext.SuggestionContext suggestion = entry.getValue();
                    Suggester<SuggestionContext> suggester = suggestion.getSuggester();
                    Suggestion<? extends Entry<? extends Option>> result = suggester
                            .execute(entry.getKey(), suggestion, searcher, spare);
                    if (result != null) {
                        assert entry.getKey().equals(result.name);
                        suggestions.add(result);
                    }
                }
    
                return new Suggest(Suggest.Fields.SUGGEST, suggestions);
            } catch (IOException e) {
                throw new ElasticsearchException("I/O exception during suggest phase", e);
            }
        }
    }
    

    Suggester的execute()方法,主要是将判断searcher对应reader的文档数是否为0,如果不为0,则进入innerExecute,交给Suggester的子类执行具体的逻辑

    public abstract class Suggester<T extends SuggestionSearchContext.SuggestionContext> {
            public Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>>
            execute(String name, T suggestion, IndexSearcher searcher, CharsRefBuilder spare) throws IOException {
            // #3469 We want to ignore empty shards
    
            if (searcher.getIndexReader().numDocs() == 0) {
                return null;
            }
            return innerExecute(name, suggestion, searcher, spare);
        }
    }
    

    Suggester下面有三个子类:CompletionSuggester、PhraseSuggester和TermSuggester,分别对应不同类型的suggest

    Suggester类图

    下面即为completion suggest的具体逻辑,只要是利用lucene的lookup方法查询前缀索引,最后调用CollectionUtil.introSort()进行内省排序

    public class CompletionSuggester extends Suggester<CompletionSuggestionContext> {
        @Override
        protected Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>> innerExecute(String name,
                CompletionSuggestionContext suggestionContext, IndexSearcher searcher, CharsRefBuilder spare) throws IOException {
            if (suggestionContext.mapper() == null || !(suggestionContext.mapper() instanceof CompletionFieldMapper)) {
                throw new ElasticsearchException("Field [" + suggestionContext.getField() + "] is not a completion suggest field");
            }
            final IndexReader indexReader = searcher.getIndexReader();
            CompletionSuggestion completionSuggestion = new CompletionSuggestion(name, suggestionContext.getSize());
            spare.copyUTF8Bytes(suggestionContext.getText());
    
            CompletionSuggestion.Entry completionSuggestEntry = new CompletionSuggestion.Entry(new StringText(spare.toString()), 0, spare.length());
            completionSuggestion.addTerm(completionSuggestEntry);
    
            String fieldName = suggestionContext.getField();
            Map<String, CompletionSuggestion.Entry.Option> results = Maps.newHashMapWithExpectedSize(indexReader.leaves().size() * suggestionContext.getSize());
            for (AtomicReaderContext atomicReaderContext : indexReader.leaves()) {
                AtomicReader atomicReader = atomicReaderContext.reader();
                Terms terms = atomicReader.fields().terms(fieldName);
                if (terms instanceof Completion090PostingsFormat.CompletionTerms) {
                    final Completion090PostingsFormat.CompletionTerms lookupTerms = (Completion090PostingsFormat.CompletionTerms) terms;
                    final Lookup lookup = lookupTerms.getLookup(suggestionContext.mapper(), suggestionContext);
                    if (lookup == null) {
                        // we don't have a lookup for this segment.. this might be possible if a merge dropped all
                        // docs from the segment that had a value in this segment.
                        continue;
                    }
                    List<Lookup.LookupResult> lookupResults = lookup.lookup(spare.get(), false, suggestionContext.getSize());
                    for (Lookup.LookupResult res : lookupResults) {
    
                        final String key = res.key.toString();
                        final float score = res.value;
                        final Option value = results.get(key);
                        if (value == null) {
                            final Option option = new CompletionSuggestion.Entry.Option(new StringText(key), score, res.payload == null ? null
                                    : new BytesArray(res.payload));
                            results.put(key, option);
                        } else if (value.getScore() < score) {
                            value.setScore(score);
                            value.setPayload(res.payload == null ? null : new BytesArray(res.payload));
                        }
                    }
                }
            }
            final List<CompletionSuggestion.Entry.Option> options = new ArrayList<>(results.values());
            CollectionUtil.introSort(options, scoreComparator);
    
            int optionCount = Math.min(suggestionContext.getSize(), options.size());
            for (int i = 0 ; i < optionCount ; i++) {
                completionSuggestEntry.addOption(options.get(i));
            }
    
            return completionSuggestion;
        }
    }
    

    4 FST 结构

    对于Completion类型的suggest来说,使用的是FST(Finite State Transducers)存储结构,即有穷状态转换器,类似于前缀树或者字典树
    我们有如下几个词:mop、moth、pop、star、stop和top 5个词,那么构建的FST如下图:


    FST文件存储

    相关文章

      网友评论

        本文标题:Elasticsearch源码分析-Suggest分析

        本文链接:https://www.haomeiwen.com/subject/zhuqbqtx.html