美文网首页ES
Elasticsearch源码分析-更新索引分析

Elasticsearch源码分析-更新索引分析

作者: 尹亮_36cd | 来源:发表于2019-03-18 17:58 被阅读0次

    0.前言

    如果想更新索引的某条文档数据,可以通过如下几种方式:
    (1)构造document的完整field和数据,然后使用Elasticsearch的Index API重新创建索引

    curl -XPUT 'http://localhost:9200/twitter/tweet/1' -d '{
        "user" : "kimchy",
        "post_date" : "2009-11-15T14:12:12",
        "message" : "trying out Elasticsearch"
    }'
    

    (2)构造document的部分field和数据,然后使用Elasticsearch 的Update API 更新索引

    curl -XPOST 'localhost:9200/twitter/tweet/1/_update' -d '{
        "doc" : {
            "user" : "new_name"
        },
        "doc_as_upsert" : true
    }' 
    

    本篇文章讨论的是第二种方式,该请求对应的url pattern为 /{index}/{type}/{id}/_update,且仅支持Post 请求

    1 请求Handler

    update url对应的handler为RestUpdateAction,支持的参数如下:
    retry_on_conflict: 控制在最终抛出异常之前重试update的次数。
    routing: routing用于将更新请求路由到正确的分片上,并在更新的文档不存在时为upsert请求设置routing。不能用于更新已经存在文档的routing。
    parent: parent用于将更新请求路由到正确的分片上,并在更新的文档不存在时为upsert请求设置parent。不能用于更新已经存在文档的parent。
    timeout: 等待shard变为可用的超时时间。
    consistency: 索引/删除操作的写入一致性。
    refresh: 在操作发生后立即刷新相关的主分片和副本分片(而不是整个索引),以便更新的文档立即显示在搜索结果中。
    fields: 返回更新文档中的相关字段。指定_source返回完整更新的source。
    version & version_type: update API在内部使用Elasticsearch的版本控制支持,以确保在更新期间文档不会更改。

    解析完请求参数后,会调用client.update()更新索引,传入的action参数值为UpdateAction.INSTANCE,它绑定的action为TransportUpdateAction类。

    public class ActionModule extends AbstractModule {
        @Override
        protected void configure() {
            registerAction(UpdateAction.INSTANCE,TransportUpdateAction.class);
        }
    }
    

    因此,在执行TransportAction.execute() 时,会执行TransportUpdateAction.doExecute()方法,即为处理请求的Action入口

    TransportUpdateAction 类图

    2 处理 Action

    在更新索引前,使用shouldAutoCreate()方法判断是否需要创建索引

    public boolean shouldAutoCreate(String index, ClusterState state) {
            // action.auto_create_index 是 false
            // 不再继续检查, 不创建索引
            if (!needToCheck) {
                return false;
            }
            // 如果索引或者别名中已经包含了index
            if (state.metaData().hasConcreteIndex(index)) {
                return false;
            }
            // action.auto_create_index 是 false
            if (globallyDisabled) {
                return false;
            }
            // matches not set, default value of "true"
            // action.auto_create_index 是 null 或者是 true 或者是 false
            if (matches == null) {
                return true;
            }
            // 正则条件判断index是否满足action.auto_create_index
            for (int i = 0; i < matches.length; i++) {
                char c = matches[i].charAt(0);
                if (c == '-') {
                    if (Regex.simpleMatch(matches2[i], index)) {
                        return false;
                    }
                } else if (c == '+') {
                    if (Regex.simpleMatch(matches2[i], index)) {
                        return true;
                    }
                } else {
                    if (Regex.simpleMatch(matches[i], index)) {
                        return true;
                    }
                }
            }
            return false;
        }
    

    ①如果需要创建索引,则,调用TransportCreateIndexAction.execute()方法创建索引,然后执行innerExecute()方法更新文档
    ②否则直接调用innerExecute()更新文档

    在更新文档前,需要获取文档索引所在的主分片信息,然后请请求发送到对应分片的节点上,执行shardOperation()

    class AsyncSingleAction {
            protected boolean doStart() throws ElasticsearchException {
                // 集群的节点信息
                nodes = observer.observedState().nodes();
                try {
                    // 获取要操作的primary shard
                    shardIt = shards(observer.observedState(), internalRequest);
                } catch (Throwable e) {
                   
                }
    
                // this transport only make sense with an iterator that returns a single shard routing (like primary)
                // 由于一个存在的文档, 必然最多只需要一个主分片
                assert shardIt.size() == 1;
    
                internalRequest.request().shardId = shardIt.shardId().id();
    
                // 如果shard 所在的节点为当前节点
                if (shard.currentNodeId().equals(nodes.localNodeId())) {
                    internalRequest.request().beforeLocalFork();
                    try {
                        // 使用线程池方式执行操作
                        threadPool.executor(executor).execute(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    // 抽象方法, 需要子类实现
                                    shardOperation(internalRequest, listener);
                                } catch (Throwable e) {
                                    // ...
                                }
                            }
                        });
                    } catch (Throwable e) {
                        // ...
                    }
                } else {
                    // 如果是远程节点, 需要将请求发送到对应节点
                    DiscoveryNode node = nodes.get(shard.currentNodeId());
                    transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions(), new BaseTransportResponseHandler<Response>() {
                        @Override
                        public void handleResponse(Response response) {
                            listener.onResponse(response);
                        }
            }
    }
    

    3 执行请求

    执行索引operation的过程,主要是
    ①先构造索引请求,即先获取已经存在的文档信息
    ②merge已有文档和待更新的数据,或者执行请求中的脚本,获取完整的doc信息
    ③执行TransportIndexAction.execute() 创建索引重新创建文档,或者TransportDeleteAction.execute() 删除文档

    /**
         * shard 操作逻辑
         * @param request   InternalRequest
         * @param listener  ActionListener
         * @param retryCount    retryCount 重试次数
         * @throws ElasticsearchException   Elasticsearch 异常
         */
        protected void shardOperation(final InternalRequest request, final ActionListener<UpdateResponse> listener, final int retryCount) throws ElasticsearchException {
            // 获取index 对应的service
            IndexService indexService = indicesService.indexServiceSafe(request.concreteIndex());
            // 根据shard id 获取对应的IndexShard
            IndexShard indexShard = indexService.shardSafe(request.request().shardId());
            // 对更新请求进行转换, 获取最终要更新的索引信息
            final UpdateHelper.Result result = updateHelper.prepare(request.request(), indexShard);
            switch (result.operation()) {
                case UPSERT:
                    // 构造索引请求, 将result.action()中的type id routing 和source 拷贝到index request 对象中
                    IndexRequest upsertRequest = new IndexRequest((IndexRequest)result.action(), request.request());
                    // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
                    final BytesReference upsertSourceBytes = upsertRequest.source();
                    // 执行TransportIndexAction.execute(),  创建索引文档
                    indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
                        @Override
                        public void onResponse(IndexResponse response) {
                            // 处理请求
                    });
                    break;
                case INDEX:
                    IndexRequest indexRequest = new IndexRequest((IndexRequest)result.action(), request.request());
                    // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
                    final BytesReference indexSourceBytes = indexRequest.source();
                    indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
                        // 处理请求
                    });
                    break;
                case DELETE:
                    DeleteRequest deleteRequest = new DeleteRequest((DeleteRequest)result.action(), request.request());
                    deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
                        // 执行请求
                    });
                    break;
                case NONE:
                    // ...
                    break;
                default:
                    throw new ElasticsearchIllegalStateException("Illegal operation " + result.operation());
            }
        }
    

    在上面代码中,final UpdateHelper.Result result = updateHelper.prepare(request.request(), indexShard); 主要用来对更新请求进行转换, 获取最终要更新的索引信息
    合并待更新数据和已经存在的数据的策略,主要是覆盖和补充更新的方式:

    /**
        将提供的数据变更(changes)更新到source中
          1 已经存在source的字段将会被提供的变更数据(chanes)覆盖掉
          2 不存source在的字段会被提供的变更数据(changes)补充更新
    **/
    public static boolean update(Map<String, Object> source, Map<String, Object> changes, boolean checkUpdatesAreUnequal) {
            boolean modified = false;
            for (Map.Entry<String, Object> changesEntry : changes.entrySet()) {
                if (!source.containsKey(changesEntry.getKey())) {
                    // safe to copy, change does not exist in source
                    source.put(changesEntry.getKey(), changesEntry.getValue());
                    modified = true;
                    continue;
                }
                Object old = source.get(changesEntry.getKey());
                if (old instanceof Map && changesEntry.getValue() instanceof Map) {
                    // recursive merge maps
                    modified |= update((Map<String, Object>) source.get(changesEntry.getKey()),
                            (Map<String, Object>) changesEntry.getValue(), checkUpdatesAreUnequal && !modified);
                    continue;
                }
                // update the field
                source.put(changesEntry.getKey(), changesEntry.getValue());
                if (modified) {
                    continue;
                }
                if (!checkUpdatesAreUnequal) {
                    modified = true;
                    continue;
                }
                modified = !Objects.equal(old, changesEntry.getValue());
            }
            return modified;
        }
    

    4 处理异常

    在更新lucene索引时,会先检查获取的文档版本和索引中当前文档版本是否冲突,
    版本类型分为如下4种:
    INTERNAL
    EXTERNAL
    EXTERNAL_GTE
    FORCE
    对于每种类型,都有不同的判断冲突的标准

    private void innerIndex(Index index) throws IOException {
            synchronized (dirtyLock(index.uid())) {
                // 获取当前版本号 currentVersion
                final long currentVersion;
                VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
                // 如果version map 中没有拿到当前version, 则需要从reader 中获取当前version
                if (versionValue == null) {
                    currentVersion = loadCurrentVersionFromIndex(index.uid());
                } else {
                    // 判断版本是否待删除
                    if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
                        currentVersion = Versions.NOT_FOUND; // deleted, and GC
                    } else {
                        currentVersion = versionValue.version();
                    }
                }
    
                // 更新后的Version
                long updatedVersion;
                // 待更新索引的版本号
                long expectedVersion = index.version();
                // 使用当前version 和待更新索引version 判断是否存在版本冲突
                // 判断条件为:
                // INTERNAL 为当前版本和待更新版本不一致
                // EXTERNAL 为当前版本大于等于待更新版本
                // EXTERNAL_GTE 为当前版本大于待更新版本
                // FORCE 为待更新版本未指定
                if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
                    if (index.origin() == Operation.Origin.RECOVERY) {
                        return;
                    } else {
                        throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
                    }
                }
    
                // INTERNAL 如果当前版本为未找到或者未设置, 则为1, 否则为当前版本+1
                // EXTERNAL 待更新索引版本号
                // EXTERNAL_GTE 待更新索引版本号
                // FORCE 待更新索引版本号
                updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
    
                index.updateVersion(updatedVersion);
                // 当前不存在文档版本, 则为create
                if (currentVersion == Versions.NOT_FOUND) {
                    // document does not exists, we can optimize for create
                    index.created(true);
                    if (index.docs().size() > 1) {
                        indexWriter.addDocuments(index.docs(), index.analyzer());
                    } else {
                        indexWriter.addDocument(index.docs().get(0), index.analyzer());
                    }
                } else {
                    // 已经存在文档版本, 则update
                    if (versionValue != null) {
                        index.created(versionValue.delete()); // we have a delete which is not GC'ed...
                    }
                    if (index.docs().size() > 1) {
                        indexWriter.updateDocuments(index.uid(), index.docs(), index.analyzer());
                    } else {
                        indexWriter.updateDocument(index.uid(), index.docs().get(0), index.analyzer());
                    }
                }
                // 增加translog
                Translog.Location translogLocation = translog.add(new Translog.Index(index));
    
                versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
    
                indexingService.postIndexUnderLock(index);
            }
        }
    

    在更新文档时,如果发生VersionConflictEngineException或者DocumentAlreadyExistsException,则会重新执行shardOperation进行重试,最大默认重试次数为3次

    相关文章

      网友评论

        本文标题:Elasticsearch源码分析-更新索引分析

        本文链接:https://www.haomeiwen.com/subject/wmoipqtx.html