美文网首页
Elasticsearch基础概念之写入流程

Elasticsearch基础概念之写入流程

作者: 逍遥白亦 | 来源:发表于2020-12-25 19:37 被阅读0次

    写入流程

    1.名词解释

    预处理节点:数据前置处理转换的节点,支持 pipeline 设置,对数据进行过滤、转换等操作

    协调节点:主要作用于请求转发,请求响应处理等轻量级操作

    2.整体流程图

    image

    红色:协调节点
    绿色:主分片节点
    蓝色:副分片节点

    2.1 协调节点流程

    协调节点的流程代码在TransportBulkAction类里

    2.1.1 参数检查

    校验index、type、source、contentType不为空,参数检查遇到异常会拒绝当前请求

    2.1.2 处理pipeline请求

    请求中指定了pipeline参数,则先使用相应的pipeline进行处理

    2.1.3 自动创建索引

    具体步骤

    1. 收集请求中所有索引
    2. 创建不存在的索引
    3. 将创建索引请求发送到Master节点
    4. Master节点返回创建成功或失败的信息
    5. 对失败的响应进行标记,对成功的响应执行写流程
      相关源码
                // 在bulk操作执行之前,尝试创建所有我们需要的索引
                // Step 1: 收集请求中的所有索引
                final Set<String> indices = bulkRequest.requests.stream()
                        //删除不存在的索引
                    .filter(request -> request.opType() != DocWriteRequest.OpType.DELETE 
                            || request.versionType() == VersionType.EXTERNAL 
                            || request.versionType() == VersionType.EXTERNAL_GTE)
                    .map(DocWriteRequest::index)
                    .collect(Collectors.toSet());
                /* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create
                 * that we'll use when we try to run the requests. */
                 筛选出我们可以创建的不存在的索引。同时创建一个map来存放我们不能创建的索引
                final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
                Set<String> autoCreateIndices = new HashSet<>();
                ClusterState state = clusterService.state();
                for (String index : indices) {
                    boolean shouldAutoCreate;
                    try {
                        shouldAutoCreate = shouldAutoCreate(index, state);
                    } catch (IndexNotFoundException e) {
                        shouldAutoCreate = false;
                        indicesThatCannotBeCreated.put(index, e);
                    }
                    if (shouldAutoCreate) {
                        autoCreateIndices.add(index);
                    }
                }
                // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
                if (autoCreateIndices.isEmpty()) {
                    executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
                } else {
                    final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
                    //遍历所有需要创建的索引
                    for (String index : autoCreateIndices) {
                        //发送创建索引请求
                        createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
                            @Override
                            //收到执行成功响应
                            public void onResponse(CreateIndexResponse result) {
                                //将计数器递减,计数器的值为需要创建的索引数量
                                if (counter.decrementAndGet() == 0) {
                                    //全部创建完毕时执行
                                    executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
                                }
                            }
    
                            @Override
                            //收到失败的响应
                            public void onFailure(Exception e) {
                                if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) {
                                    //将创建失败索引对应的请求置空
                                    for (int i = 0; i < bulkRequest.requests.size(); i++) {
                                        DocWriteRequest request = bulkRequest.requests.get(i);
                                        if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
                                            bulkRequest.requests.set(i, null);
                                        }
                                    }
                                }
                                if (counter.decrementAndGet() == 0) {
                                    executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
                                        inner.addSuppressed(e);
                                        listener.onFailure(inner);
                                    }), responses, indicesThatCannotBeCreated);
                                }
                            }
                        });
                    }
                }
    

    2.1.4 对请求的预先处理

    该部分实现在TransportBulkAction.BulkOperation.doRun里
    检查请求参数、自动生成doc的Id(如果指定Id的话,就不用自动生成)

    2.1.5 检测集群状态

    协调节点在处理前会检测集群状态,若集群异常会阻塞等待Master节点直至超时(阻塞操作)

            private boolean handleBlockExceptions(ClusterState state) {
                ClusterBlockException blockException = state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
                if (blockException != null) {
                    if (blockException.retryable()) {
                        logger.trace("cluster is blocked, scheduling a retry", blockException);
                        retry(blockException);
                    } else {
                        onFailure(blockException);
                    }
                    return true;
                }
                return false;
            }
    

    2.1.6 请求合并

    对所有请求进行分析,如果这些写入操作的文档的主分片都属于同一个,那么就把这些请求合并为1个。

    Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
        for (int i = 0; i < bulkRequest.requests.size(); i++) {
            DocWriteRequest request = bulkRequest.requests.get(i);
                if (request == null) {
                        continue;
                }
                    String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
                    ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
                    List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
                    shardRequests.add(new BulkItemRequest(i, request));
            
        }
    

    2.1.7 路由

    根据路由算法,确定主分片所在节点

    2.1.8 转发请求并等待响应

    根据上边路由确定的主分片所在节点,将请求进行转发并等待返回结果,收到所有返回结果之后,再转发给客户端。

    protected void doRun() {
                setPhase(task, "routing");
                final ClusterState state = observer.setAndGetObservedState();
                if (handleBlockExceptions(state)) {
                    return;
                }
    
                // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
                final String concreteIndex = concreteIndex(state);
                final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
                if (indexMetaData == null) {
                    retry(new IndexNotFoundException(concreteIndex));
                    return;
                }
                if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
                    throw new IndexClosedException(indexMetaData.getIndex());
                }
    
                // resolve all derived request fields, so we can route and apply it
                resolveRequest(indexMetaData, request);
                assert request.shardId() != null : "request shardId must be set in resolveRequest";
                assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
                //获取主分片所在节点
                final ShardRouting primary = primary(state);
                if (retryIfUnavailable(state, primary)) {
                    return;
                }
                final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
                //如果主分片在本节点,则在本地执行,否则转发出去
                if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
                    performLocalAction(state, primary, node, indexMetaData);
                } else {
                    performRemoteAction(state, primary, node);
                }
    }
    

    2.2 主分片节点流程

    代码入口TransportReplicationAction.PrimaryOperationTransportHandler#messageReceived,然后进入AsyncPrimaryAction#doRun方法

        protected class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
    
            public PrimaryOperationTransportHandler() {
    
            }
    
            @Override
            public void messageReceived(final ConcreteShardRequest<Request> request, final TransportChannel channel) throws Exception {
                throw new UnsupportedOperationException("the task parameter is required for this operation");
            }
    
            @Override
            public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
                new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, channel, (ReplicationTask) task).run();
            }
        }
    

    2.2.1. 检查请求

    AsyncPrimaryAction#doRun方法中会调用acquirePrimaryShardReference方法。
    该方法会对请求进行校验

    1. 当前是否为主分片
    2. allocationId是否是预期值
    3. PrimaryTerm是否是预期值
    IndexShard indexShard = getIndexShard(shardId);
            //当前是否为主分片
            if (indexShard.routingEntry().primary() == false) {
                throw new ReplicationOperation.RetryOnPrimaryException(indexShard.shardId(),
                    "actual shard is not a primary " + indexShard.routingEntry());
            }
            final String actualAllocationId = indexShard.routingEntry().allocationId().getId();
            //allocationId是否是预期值
            if (actualAllocationId.equals(allocationId) == false) {
                throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
            }
            final long actualTerm = indexShard.getPendingPrimaryTerm();
            //PrimaryTerm是否是预期值
            if (actualTerm != primaryTerm) {
                throw new ShardNotFoundException(shardId, "expected aID [{}] with term [{}] but found [{}]", allocationId,
                    primaryTerm, actualTerm);
            }
    

    2.2.2 是否延迟执行

    判断请求是否需要延迟执行,如果需要延迟则放入队列,否则继续。

        private void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution,
                            final Object debugInfo, final StackTraceElement[] stackTrace) {
            //节点关闭直接返回                
            if (closed) {
                onAcquired.onFailure(new IndexShardClosedException(shardId));
                return;
            }
            final Releasable releasable;
            try {
                synchronized (this) {
                    if (queuedBlockOperations > 0) {
                        final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
                        final ActionListener<Releasable> wrappedListener;
                        if (executorOnDelay != null) {
                            wrappedListener =
                                new PermitAwareThreadedActionListener(threadPool, executorOnDelay,
                                            new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution);
                        } else {
                            wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired);
                        }
                        //放入延迟队列
                        delayedOperations.add(new DelayedOperation(wrappedListener, debugInfo, stackTrace));
                        return;
                    } else {
                        releasable = acquire(debugInfo, stackTrace);
                    }
                }
            } catch (final InterruptedException e) {
                onAcquired.onFailure(e);
                return;
            }
            //调用AsyncPrimaryAction的onResponse方法
            onAcquired.onResponse(releasable);
        }
    

    2.2.3 判断主分片是否已经发生迁移

    如果已经迁移:

    1. 将phase状态设为“primary_delegation”
    2. 关闭当前分片的primaryShardReference,及时释放资源
    3. 获取已经迁移到的目标节点,将请求转发到该节点,并等待执行结果
    4. 拿到结果后,将task状态更新为“finish”。
                    //已经迁移
                    if (primaryShardReference.isRelocated()) {
                        //关闭当前分片的primaryShardReference,及时释放资源
                        primaryShardReference.close(); 
                        //将phase状态设为“primary_delegation”
                        setPhase(replicationTask, "primary_delegation");
                        // delegate primary phase to relocation target
                        // it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
                        // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
                        final ShardRouting primary = primaryShardReference.routingEntry();
                        assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
                        //获取已经迁移到的目标节点
                        DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
                        //将请求转发到该节点,并等待执行结果
                        transportService.sendRequest(relocatingNode, transportPrimaryAction,
                            new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm),
                            transportOptions,
                            new TransportChannelResponseHandler<Response>(logger, channel, "rerouting indexing to target primary " + primary,
                                TransportReplicationAction.this::newResponseInstance) {
    
                                @Override
                                public void handleResponse(Response response) {
                                    setPhase(replicationTask, "finished");
                                    super.handleResponse(response);
                                }
    
                                @Override
                                //拿到结果后,将task状态更新为“finish”。
                                public void handleException(TransportException exp) {
                                    setPhase(replicationTask, "finished");
                                    super.handleException(exp);
                                }
                            });
                    } 
                    else {  //没有迁移
                        // 将task状态更新为“primary”
                        setPhase(replicationTask, "primary");
                        final ActionListener<Response> listener = createResponseListener(primaryShardReference);
                        //转发请求给副本分片
                        createReplicatedOperation(request,
                                ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
                                primaryShardReference)
                                .execute(); //ReplicationOperation类的execute方法
                    }
    
    

    如果没有迁移:

    1. 将task状态更新为“primary”
    2. 主分片准备操作(主要部分)
    3. 转发请求给副本分片

    2.2.4 开始写入主分片(关键步骤)

    primary所在的node收到协调节点发过来的写入请求后,开始正式执行写入的逻辑,写入执行的入口是在ReplicationOperation类的execute方法

        public void execute() throws Exception {
            .......
            //关键,这里开始执行写主分片
            primaryResult = primary.perform(request);
            .......
        }
    
    

    perform方法:

            @Override
            public PrimaryResult perform(Request request) throws Exception {
                PrimaryResult result = shardOperationOnPrimary(request, indexShard);
                assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest()
                    + "] with a primary failure [" + result.finalFailure + "]";
                return result;
            }
    

    最终会调用InternalEngine#index方法,写数据,先写Lucene(前文提到过的内存缓冲区)再写translog,这么做的目的是写入Lucene时,Lucene会再对数据进行一些检查,有可能出现写入Lucene失败的情况。如果先写translog,那么就要处理写入translog成功但是写入Lucene一直失败的问题,所以ES采用了先写Lucene的方式。

        public IndexResult index(Index index) throws IOException {
                    .......
                    final IndexResult indexResult;
                    if (plan.earlyResultOnPreFlightError.isPresent()) {
                        indexResult = plan.earlyResultOnPreFlightError.get();
                        assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
                    } else if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
                        // 将数据写入lucene,最终会调用lucene的文档写入接口
                        indexResult = indexIntoLucene(index, plan);
                    } else {
                        indexResult = new IndexResult(
                            plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
                    }
                    if (index.origin().isFromTranslog() == false) {
                        final Translog.Location location;
                        if (indexResult.getResultType() == Result.Type.SUCCESS) {
                            // 写入translog
                            location = translog.add(new Translog.Index(index, indexResult)); //写translog
                        ......
                        indexResult.setTranslogLocation(location);
                    }
                  .......
            }
    
    

    2.2.5 开始写副分片

        public void execute() throws Exception {
            .......
            //关键,这里开始执行写主分片
            primaryResult = primary.perform(request);
            .......
            final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
            if (replicaRequest != null) {
                ........
                markUnavailableShardsAsStale(replicaRequest, replicationGroup);
                // 关键步骤,写完primary后这里转发请求到replicas
                performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
            }
            successfulShards.incrementAndGet();  // mark primary as successful
            decPendingAndFinishIfNeeded();
        }
    
    

    在写完primary后,会继续写replicas,接下来需要将请求转发到从节点上,如果replica shard未分配,则直接忽略;如果replica shard正在搬迁数据到其他节点,则将请求转发到搬迁的目标shard上,否则,转发到replica shard。replicaRequest是在写入主分片后,从primaryResult中获取,并非原始Request。

    private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
                                       final ReplicationGroup replicationGroup) {
            // for total stats, add number of unassigned shards and
            // number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
            totalShards.addAndGet(replicationGroup.getSkippedShards().size());
    
            final ShardRouting primaryRouting = primary.routingEntry();
    
            for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
                if (shard.isSameAllocation(primaryRouting) == false) {
                    performOnReplica(shard, replicaRequest, globalCheckpoint);
                }
            }
        }
    

    performOnReplica方法会将请求转发到目标节点,如果出现异常,如端节点挂掉、shard写入失败等,对于这些异常,primary认为该replica shard发生故障不可用,将会向master汇报并移除该replica

    private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest, final long globalCheckpoint) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
            }
    
            totalShards.incrementAndGet();
            pendingActions.incrementAndGet();
            replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, new ActionListener<ReplicaResponse>() {
                @Override
                public void onResponse(ReplicaResponse response) {
                    successfulShards.incrementAndGet();
                    try {
                        primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
                        primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
                    } catch (final AlreadyClosedException e) {
                        // okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
                    } catch (final Exception e) {
                        // fail the primary but fall through and let the rest of operation processing complete
                        final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
                        primary.failShard(message, e);
                    }
                    decPendingAndFinishIfNeeded();
                }
    
                @Override
                public void onFailure(Exception replicaException) {
                    logger.trace(() -> new ParameterizedMessage(
                        "[{}] failure while performing [{}] on replica {}, request [{}]",
                        shard.shardId(), opType, shard, replicaRequest), replicaException);
                    // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
                    if (TransportActions.isShardNotAvailableException(replicaException) == false) {
                        RestStatus restStatus = ExceptionsHelper.status(replicaException);
                        shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
                            shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
                    }
                    String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
                    //处理副分片写入失败
                    replicasProxy.failShardIfNeeded(shard, message,
                        replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
                        ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
                }
            });
        }
    
    

    2.3 副分片流程

    执行与主分片基本相同的流程

    2.4 检查点

    上述写入完成之后,再返回客户端结果之前,会执行GlobalCheckpointSyncAction全局检查点的操作,该部分模块里边有一段很重要的代码,会影响ES的写入

        @Override
        protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
                final Request request, final IndexShard indexShard) throws Exception {
            maybeSyncTranslog(indexShard);
            return new PrimaryResult<>(request, new ReplicationResponse());
        }
    

    上面代码中有个maybeSyncTranslog方法,该方法会判断要不要同步对Translog执行flush操作,也就是把translog中的日志文件纪录的索引数据,写到磁盘中,而且是同步写入,该方法源码如下:

        private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
            if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST &&
                indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
                indexShard.sync();
            }
        }
    

    在ES的默认配置中,为了保证数据的可靠性,每一次对translog的写入,都同步刷到磁盘中,保证数据不丢失。默认配置如下:
    index.translog.durability:request

    也就是上边代码中的Translog.Durability.REQUEST,但是这种配置写入性能非常的差,所以ES提供了额外的定时异步写入的方式,下边是参考的配置项

    index.translog.durability:async

    index.translog.sync_interval:120s

    至此写入流程分析完毕

    3 调优建议

    1. 副分片写入过程需要重新建立索引,所以每个主分片的副分片数量不宜太多
    2. ES的默认配置有许多影响性能的坑在里边,比如refresh刷新频率、translog随请求同步刷磁盘,建议如果降低refresh的刷新频率,并将translog改成异步写入磁盘
    3. 由于ES利用了系统缓存区来临时存放segment数据,所以该部分区域的内存占比,不宜太小
    4. 尽量用批量写入代替单条数据写入
    5. 由于ES对磁盘的利用率很大,有条件的话建议使用固态硬盘

    相关文章

      网友评论

          本文标题:Elasticsearch基础概念之写入流程

          本文链接:https://www.haomeiwen.com/subject/zjpinktx.html