美文网首页
Elasticsearch源码分析-索引分析(二)

Elasticsearch源码分析-索引分析(二)

作者: 尹亮_36cd | 来源:发表于2018-12-14 19:52 被阅读0次

    1. 写lucene索引入口

    在上篇文章中主要讲述了elasticsearch索引的创建过程,即CreateIndexAction.execute()方法的执行过程。在完成索引创建时,会调用listener.onResponse()方法,回调TransportIndexAction.innerExecute()方法写入索引数据

    public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
        @Override
        protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
            if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
                CreateIndexRequest createIndexRequest = new CreateIndexRequest(request);
                createIndexRequest.index(request.index());
                createIndexRequest.mapping(request.type());
                createIndexRequest.cause("auto(index api)");
                createIndexRequest.masterNodeTimeout(request.timeout());
    
                createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>() {
                    @Override
                    public void onResponse(CreateIndexResponse result) {
                        innerExecute(request, listener);
                    }
    
                    @Override
                    public void onFailure(Throwable e) {
                        if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
                            try {
                                innerExecute(request, listener);
                            } catch (Throwable e1) {
                                listener.onFailure(e1);
                            }
                        } else {
                            listener.onFailure(e);
                        }
                    }
                });
            } else {
                innerExecute(request, listener);
            }
        }
    
        private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
            super.doExecute(request, listener); // TransportShardReplicationOperationAction.doExecute()
        }
    }
    

    2. 获取索引对应的主分片

    由于TransportIndexAction继承TransportShardReplicationOperationAction,因此会调用TransportIndexAction的doExecute()方法,具体的逻辑由PrimaryPhase.doRun()实现

    public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
        @Override
        protected void doExecute(Request request, ActionListener<Response> listener) {
            new PrimaryPhase(request, listener).run();
        }
        final class PrimaryPhase extends AbstractRunnable {
            private final ActionListener<Response> listener;
            private final InternalRequest internalRequest;
            private final ClusterStateObserver observer;
            private final AtomicBoolean finished = new AtomicBoolean(false);
            private volatile Releasable indexShardReference;
    
            PrimaryPhase(Request request, ActionListener<Response> listener) {
                this.internalRequest = new InternalRequest(request);
                this.listener = listener;
                this.observer = new ClusterStateObserver(clusterService, internalRequest.request().timeout(), logger);
            }
    
            @Override
            public void onFailure(Throwable e) {
                finishWithUnexpectedFailure(e);
            }
            protected void doRun() {
                if (checkBlocks() == false) {
                    return;
                }
                final ShardIterator shardIt = shards(observer.observedState(), internalRequest);
                final ShardRouting primary = resolvePrimary(shardIt);
                if (primary == null) {
                    retryBecauseUnavailable(shardIt.shardId(), "No active shards.");
                    return;
                }
                if (primary.active() == false) {
                    logger.trace("primary shard [{}] is not yet active, scheduling a retry.", primary.shardId());
                    retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node.");
                    return;
                }
                if (observer.observedState().nodes().nodeExists(primary.currentNodeId()) == false) {
                    logger.trace("primary shard [{}] is assigned to anode we do not know the node, scheduling a retry.", primary.shardId(), primary.currentNodeId());
                    retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node.");
                    return;
                }
                routeRequestOrPerformLocally(primary, shardIt);
            }
        }
    }
    

    3. 获取索引分片逻辑

    在上面操作中,主要是查找请求写入的索引数据对应的所有shard,然后筛选出主分片primary shard,根据请求信息获取shard的逻辑如下:

    public class PlainOperationRouting extends AbstractComponent implements OperationRouting {
        @Inject
        public PlainOperationRouting(Settings indexSettings, HashFunction hashFunction, 
    AwarenessAllocationDecider awarenessAllocationDecider) {
            this.useType = indexSettings.getAsBoolean("cluster.routing.operation.use_type", false);
        }
    
        private int shardId(ClusterState clusterState, String index, String type, @Nullable String id, @Nullable String routing) {
            if (routing == null) {
                if (!useType) {
                    return Math.abs(hash(id) % indexMetaData(clusterState, index).numberOfShards());
                } else {
                    return Math.abs(hash(type, id) % indexMetaData(clusterState, index).numberOfShards());
                }
            }
            return Math.abs(hash(routing) % indexMetaData(clusterState, index).numberOfShards());
        }
    

    (1)如果请求中没有设置routing,分两种情况
    ① 如果配置文件中没有设置cluster.routing.operation.use_type参数或者设置为false,则shard id为对索引id进行DJB哈希,取绝对值后对索引总分片数求余
    dbj_hash(id) % number_of_shards
    ②如果配置cluster.routing.operation.use_type参数为true,则shard id为对索引的type和id进行DJB哈希,取绝对值后对索引总分片数求余
    (2)如果配置了routing信息,则shard id为直接对routing进行DJB哈希,取绝对值后对索引总分片数求余

    4. 发送写入索引请求

    获取到Primary Shard后通过routeRequestOrPerformLocally()方法,将请求发送到shard对应的节点上执行performOnPrimary()方法,如果shard的节点就在当前node上,那么直接执行performOnPrimary()

    public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
        protected TransportShardReplicationOperationAction(Settings settings, String actionName, TransportService transportService,
                                                           ClusterService clusterService, IndicesService indicesService,
                                                           ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters) {
            transportService.registerHandler(actionName, new OperationTransportHandler());
        }
    
        protected void routeRequestOrPerformLocally(final ShardRouting primary, final ShardIterator shardsIt) {
                if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
                    try {
                        if (internalRequest.request().operationThreaded()) {
                            threadPool.executor(executor).execute(new AbstractRunnable() {
                                @Override
                                public void onFailure(Throwable t) {
                                    finishAsFailed(t);
                                }
    
                                @Override
                                protected void doRun() throws Exception {
                                    performOnPrimary(primary, shardsIt);
                                }
                            });
                        } else {
                            performOnPrimary(primary, shardsIt);
                        }
                    } catch (Throwable t) {
                        finishAsFailed(t);
                    }
                } else {
                    DiscoveryNode node = observer.observedState().nodes().get(primary.currentNodeId());
                    transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler<Response>() {
    
                        @Override
                        public Response newInstance() {
                            return newResponseInstance();
                        }
    
                        @Override
                        public String executor() {
                            return ThreadPool.Names.SAME;
                        }
    
                        @Override
                        public void handleResponse(Response response) {
                            finishOnRemoteSuccess(response);
                        }
    
                        @Override
                        public void handleException(TransportException exp) {
                            try {
                                if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
                                        retryPrimaryException(exp)) {
                                    internalRequest.request().setCanHaveDuplicates();
                                    logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
                                    retry(exp);
                                } else {
                                    finishAsFailed(exp);
                                }
                            } catch (Throwable t) {
                                finishWithUnexpectedFailure(t);
                            }
                        }
                    });
                }
            }
    }
    

    通过构造方法,我们可以看到elasticsearch会将这个action注册到OperationTransportHandler对象上。这个handler接收消息后,重新执行doExecute()方法

    class OperationTransportHandler extends BaseTransportRequestHandler<Request> {
        @Override
        public Request newInstance() {
           return newRequestInstance();
        }
    
        @Override
        public String executor() {
            return ThreadPool.Names.SAME;
        }
    
        @Override
        public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
            request.listenerThreaded(false);
            request.operationThreaded(true);
            execute(request, new ActionListener<Response>() {
                @Override
                public void onResponse(Response result) {
                    try {
                        channel.sendResponse(result);
                    } catch (Throwable e) {
                        onFailure(e);
                    }
                }
                @Override
                public void onFailure(Throwable e) {
                    try {
                        channel.sendResponse(e);
                    } catch (Throwable e1) {
                        logger.warn("Failed to send response for " + actionName, e1);
                    }
                }
            });
        }
    }
    

    最终都是通过performOnPrimary()方法来执行写入索引操作

    1. 在主分片上写数据之前,会通过checkWriteConsistency()方法检查在写入一致性设置前提下是否可写。如果可写,先创建IndexShard的引用对象,然后创建操作主分片的PrimaryOperation对象,最终通过shardOperationOnPrimary在primary shard上写入索引。写入完成后,创建replication phase对象,通过finishAndMoveToReplication在replica shard上写入索引。
    public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
        void performOnPrimary(final ShardRouting primary, final ShardIterator shardsIt) {
                // 检查在写入一致性设置前提下是否可写
                final String writeConsistencyFailure = checkWriteConsistency(primary);
                if (writeConsistencyFailure != null) {
                    retryBecauseUnavailable(primary.shardId(), writeConsistencyFailure);
                    return;
                }
                final ReplicationPhase replicationPhase;
                try {
                    // 获取index shard的引用
                    indexShardReference = getIndexShardOperationsCounter(primary.shardId());
                    // 构建primary operation请求
                    PrimaryOperationRequest por = new PrimaryOperationRequest(primary.id(), internalRequest.concreteIndex(), internalRequest.request());
                    // 执行shard上的索引操作, TransportDeleteAction  TransportIndexAction  TransportShardBulkAction  TransportShardDeleteAction  TransportShardDeleteByQueryAction
                    Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(observer.observedState(), por);
                    logger.trace("operation completed on primary [{}]", primary);
    
                    // 构建replication阶段
                    replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener, indexShardReference);
                } catch (Throwable e) {
                    // ...
                }
                // 将primary shard上的数据拷贝到replication上
                finishAndMoveToReplication(replicationPhase);
            }
    }
    

    在shardOperationOnPrimary()方法中,先通过SourceToParse将请求的信息转化为包含source、type、id、routing、parent、timestamp和 ttl 的对象,然后通过请求中的opType判断是Index还是Create操作,默认为Index,最终通过indexShard.index(index)或者indexShard.create(create)写入lucene索引

    public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
        @Override
        protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
            final IndexRequest request = shardRequest.request;
    
            // validate, if routing is required, that we got routing
            // index meta data
            IndexMetaData indexMetaData = clusterState.metaData().index(shardRequest.shardId.getIndex());
            MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
            if (mappingMd != null && mappingMd.routing().required()) {
                if (request.routing() == null) {
                    throw new RoutingMissingException(shardRequest.shardId.getIndex(), request.type(), request.id());
                }
            }
    
            // index service
            IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
            // index shard
            IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
    
            // request转化为source  type  id  routing  parent  timestamp  ttl
            SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())
                    .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
    
            long version;
            boolean created;
            try {
                Engine.IndexingOperation op;
                // 如果是 index request
                if (request.opType() == IndexRequest.OpType.INDEX) {
                    Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
                    if (index.parsedDoc().mappingsModified()) {
                        mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID());
                    }
                    // 执行index请求
                    indexShard.index(index);
                    version = index.version();
                    op = index;
                    created = index.created();
                } else {
                    // 如果是 create request
                    Engine.Create create = indexShard.prepareCreate(sourceToParse,
                            request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
                    if (create.parsedDoc().mappingsModified()) {
                        mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID());
                    }
                    // 执行create请求
                    indexShard.create(create);
                    version = create.version();
                    op = create;
                    created = true;
                }
                // _refresh 参数
                if (request.refresh()) {
                    try {
                        indexShard.refresh("refresh_flag_index");
                    } catch (Throwable e) {
                        // ignore
                    }
                }
                // update the version on the request, so it will be used for the replicas
                request.version(version);
                request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
                assert request.versionType().validateVersionForWrites(request.version());
    
                // 返回响应
                IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created);
                return new Tuple<>(response, shardRequest.request);
    
            } catch (WriteFailureException e) {
                if (e.getMappingTypeToUpdate() != null) {
                    DocumentMapper docMapper = indexService.mapperService().documentMapper(e.getMappingTypeToUpdate());
                    if (docMapper != null) {
                        mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID());
                    }
                }
                throw e.getCause();
            }
        }
    }
    

    在IndexShard的index方法中,先判断分片是否可写,判断方法主要是通过分片状态来判断,对于主分片来说,如果不是STARTED和RELOCATED,那么就不允许写入分片。若分片可写,就通过内部引擎InternalEngine对象来操作lucene索引

    public class IndexShard extends AbstractIndexShardComponent {
        public ParsedDocument index(Engine.Index index) throws ElasticsearchException {
            // 判断分片可写
            writeAllowed(index.origin());
            index = indexingService.preIndex(index);
            try {
                if (logger.isTraceEnabled()) {
                    logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
                }
                engine().index(index);
                index.endTime(System.nanoTime());
            } catch (RuntimeException ex) {
                indexingService.failedIndex(index);
                throw ex;
            }
            indexingService.postIndex(index);
            return index.parsedDoc();
        }
    }
    

    6. 操作lucene索引

    (1)写入lucene索引主要是通过lucene的IndexWriter对象来操作,在写入lucene前需要先更新待写入文档的version。
    (2)更新version的方式有四种,分别是EXTERNAL、EXTERNAL_GTE、FORCE和INTERNAL,前三种都是将index请求的version当中文档的version,第四种是将当前的version+1当作文档的version。
    (3)先从内存缓存versionMap中获取当前文档的version,如果没有获取到,则从lucene的reader中获取version,否则判断当前版本是否被GC删除,如果没有删除,那么就当作当前版本
    (4)在写入lucene前,通过是否存在当前文档版本来决定是调用addDocuments添加索引还是调用updateDocument()更新文档

    public class InternalEngine extends Engine {
        @Override
        public void index(Index index) throws EngineException {
            try (ReleasableLock lock = readLock.acquire()) {
                ensureOpen();
                if (index.origin() == Operation.Origin.RECOVERY) {
                    // Don't throttle recovery operations
                    innerIndex(index);
                } else {
                    try (Releasable r = throttle.acquireThrottle()) {
                        innerIndex(index);
                    }
                }
            } catch (OutOfMemoryError | IllegalStateException | IOException t) {
                maybeFailEngine("index", t);
                throw new IndexFailedEngineException(shardId, index, t);
            }
            checkVersionMapRefresh();
        }
    
        private void innerIndex(Index index) throws IOException {
            synchronized (dirtyLock(index.uid())) {
                // 获取currentVersion
                final long currentVersion;
                VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
                if (versionValue == null) {
                    currentVersion = loadCurrentVersionFromIndex(index.uid());
                } else {
                    if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
                        currentVersion = Versions.NOT_FOUND; // deleted, and GC
                    } else {
                        currentVersion = versionValue.version();
                    }
                }
    
                // 获取更新后的Version
                long updatedVersion;
                long expectedVersion = index.version();
                if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
                    if (index.origin() == Operation.Origin.RECOVERY) {
                        return;
                    } else {
                        throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
                    }
                }
                updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
    
                index.updateVersion(updatedVersion);
                // 当前不存在文档版本, 则为create
                if (currentVersion == Versions.NOT_FOUND) {
                    // document does not exists, we can optimize for create
                    index.created(true);
                    if (index.docs().size() > 1) {
                        indexWriter.addDocuments(index.docs(), index.analyzer());
                    } else {
                        indexWriter.addDocument(index.docs().get(0), index.analyzer());
                    }
                } else {
                    // 已经存在文档版本, 则update
                    if (versionValue != null) {
                        index.created(versionValue.delete()); // we have a delete which is not GC'ed...
                    }
                    if (index.docs().size() > 1) {
                        indexWriter.updateDocuments(index.uid(), index.docs(), index.analyzer());
                    } else {
                        indexWriter.updateDocument(index.uid(), index.docs().get(0), index.analyzer());
                    }
                }
                // 增加translog
                Translog.Location translogLocation = translog.add(new Translog.Index(index));
    
                versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
    
                indexingService.postIndexUnderLock(index);
            }
        }
    }
    

    至此完成了在主分片上创建lucene索引,完成后将进入到下一阶段,执行TransportShardReplicationOperationAction.PrimaryPhase.finishAndMoveToReplication()在分片上创建索引

    7. 开始写副本

    在操作shard之前,会判断shard是主分片还是副本。如果是primary shard, 那么只在新shard上拷贝数据,否则在所有的replication上拷贝数据
    在performOnReplica()方法中会判断副本所在的shard是否是当前节点,如果不是,则需要将请求发送到对应的节点执行,action为transportReplicaAction对象,会被注册到ReplicaOperationTransportHandler上。

    public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
        protected TransportShardReplicationOperationAction(Settings settings, String actionName, TransportService transportService,
                                                           ClusterService clusterService, IndicesService indicesService,
                                                           ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters) {
            transportService.registerHandler(transportReplicaAction, new ReplicaOperationTransportHandler());
    }
    
        final class PrimaryPhase extends AbstractRunnable {
            void finishAndMoveToReplication(ReplicationPhase replicationPhase) {
                if (finished.compareAndSet(false, true)) {
                    replicationPhase.run();
                } else {
                    assert false : "finishAndMoveToReplication called but operation is already finished";
                }
            }
        }
    
        final class ReplicationPhase extends AbstractRunnable {
            @Override
            protected void doRun() {
                if (pending.get() == 0) {
                    doFinish();
                    return;
                }
                ShardRouting shard;
                shardIt.reset(); 
                while ((shard = shardIt.nextOrNull()) != null) {
                    if (shard.unassigned()) {
                        continue;
                    }
                    if (shard.primary()) {
                        if (originalPrimaryShard.currentNodeId().equals(shard.currentNodeId()) == false) {
                            performOnReplica(shard, shard.currentNodeId());
                        }
                        if (shard.relocating()) {
                            performOnReplica(shard, shard.relocatingNodeId());
                        }
                    } else if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings()) == false) {  
                        // index.shadow_replicas = false
                        performOnReplica(shard, shard.currentNodeId());
                        if (shard.relocating()) {
                            performOnReplica(shard, shard.relocatingNodeId());
                        }
                    }
                }
            }
    
            void performOnReplica(final ShardRouting shard, final String nodeId) {
                if (!observer.observedState().nodes().nodeExists(nodeId)) {
                    onReplicaFailure(nodeId, null);
                    return;
                }
                final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), replicaRequest);
                if (!nodeId.equals(observer.observedState().nodes().localNodeId())) {
                    final DiscoveryNode node = observer.observedState().nodes().get(nodeId);
                    transportService.sendRequest(node, transportReplicaAction, shardRequest,
                            transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                                @Override
                                public void handleResponse(TransportResponse.Empty vResponse) {
                                    onReplicaSuccess();
                                }
    
                                @Override
                                public void handleException(TransportException exp) {
                                    // ...
                                }
    
                            });
                } else {
                    // 提交给线程池操作
                    if (replicaRequest.operationThreaded()) {
                        try {
                            threadPool.executor(executor).execute(new AbstractRunnable() {
                                @Override
                                protected void doRun() {
                                    try {
                                        shardOperationOnReplica(shardRequest);
                                        onReplicaSuccess();
                                    } catch (Throwable e) {
                                        onReplicaFailure(nodeId, e);
                                        failReplicaIfNeeded(shard.index(), shard.id(), e);
                                    }
                                }
                                @Override
                                public boolean isForceExecution() {
                                    return true;
                                }
                                @Override
                                public void onFailure(Throwable t) {
                                    onReplicaFailure(nodeId, t);
                                }
                            });
                        } catch (Throwable e) {
                            failReplicaIfNeeded(shard.index(), shard.id(), e);
                            onReplicaFailure(nodeId, e);
                        }
                    } else {
                        try {
                            shardOperationOnReplica(shardRequest);
                            onReplicaSuccess();
                        } catch (Throwable e) {
                            failReplicaIfNeeded(shard.index(), shard.id(), e);
                            onReplicaFailure(nodeId, e);
                        }
                    }
                }
            }
        }
    }
    

    ReplicaOperationTransportHandler在接收到消息后,依然调用shardOperationOnReplica()方法来写入副本数据

    class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {
            @Override
            public ReplicaOperationRequest newInstance() {
                return new ReplicaOperationRequest();
            }
            @Override
            public String executor() {
                return executor;
            }
    
            @Override
            public boolean isForceExecution() {
                return true;
            }
            @Override
            public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
                try (Releasable shardReference = getIndexShardOperationsCounter(request.shardId)) {
                    shardOperationOnReplica(request);
                } catch (Throwable t) {
                    failReplicaIfNeeded(request.shardId.getIndex(), request.shardId.id(), t);
                    throw t;
                }
                channel.sendResponse(TransportResponse.Empty.INSTANCE);
            }
        }
    

    从shardOperationOnReplica()可以看出写入副本的操作和写入主分片的操作类似,都是先将请求信息解析成SourceToParse,然后在通过IndexShard在分片上写入lucene索引。

    public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
         @Override
        protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
            // index shard
            IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
            // request
            IndexRequest request = shardRequest.request;
            // source
            SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
                    .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
            if (request.opType() == IndexRequest.OpType.INDEX) {
                Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates());
                indexShard.index(index);
            } else {
                Engine.Create create = indexShard.prepareCreate(sourceToParse,
                        request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
                indexShard.create(create);
            }
            if (request.refresh()) {
                try {
                    indexShard.refresh("refresh_flag_index");
                } catch (Exception e) {
                    // ignore
                }
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Elasticsearch源码分析-索引分析(二)

          本文链接:https://www.haomeiwen.com/subject/pohghqtx.html