美文网首页
elasticsearch shard split 分析(二)

elasticsearch shard split 分析(二)

作者: 华饼 | 来源:发表于2018-02-01 14:58 被阅读0次

    内部实现分析

    首先通过调用RestController的registerHandler函数注册split接口的handler为RestSplitIndexAction

    RestSplitIndexAction

    controller.registerHandler(RestRequest.Method.PUT, "/{index}/_split/{target}", this);
    controller.registerHandler(RestRequest.Method.POST, "/{index}/_split/{target}", this);
    

    当elasticsearch收到请求时会进入RestController的dispatcher函数

    RestController

    boolean dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client,
                                final Optional<RestHandler> mHandler) throws Exception {
            final int contentLength = request.hasContent() ? request.content().length() : 0;
    
            RestChannel responseChannel = channel;
            // Indicator of whether a response was sent or not
            boolean requestHandled;
    
            if (contentLength > 0 && mHandler.map(h -> hasContentType(request, h) == false).orElse(false)) {
               ...
            } else if (contentLength > 0 && mHandler.map(h -> h.supportsContentStream()).orElse(false) &&
                request.getXContentType() != XContentType.JSON && request.getXContentType() != XContentType.SMILE) {
               ...
            } else if (mHandler.isPresent()) {
                //在这个分支里处理
                try {
                    if (canTripCircuitBreaker(mHandler)) {
                        inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
                    } else {
                        inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
                    }
                    // iff we could reserve bytes for the request we need to send the response also over this channel
                    responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
    
                    final RestHandler wrappedHandler = mHandler.map(h -> handlerWrapper.apply(h)).get();
                    //wrappedHandler其实就是在restController中注册的handler,也就是RestSplitIndexAction
                    wrappedHandler.handleRequest(request, responseChannel, client);
                    requestHandled = true;
                } catch (Exception e) {
                    responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
                    // We "handled" the request by returning a response, even though it was an error
                    requestHandled = true;
                }
            } else {
                ...
            }
            // Return true if the request was handled, false otherwise.
            return requestHandled;
        }
    

    上面代码省略了一些其它的东西,主要是在mHandler.isPresent分支,因为开始的时候在restController中注册了split的handler。所以mHandler一定不为空。handleRequest其实是BaseRestHandler中的一个方法。跟进去看一下。

    BaseRestHandler

    public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
            // prepare the request for execution; has the side effect of touching the request parameters
            final RestChannelConsumer action = prepareRequest(request, client);
            ...
            usageCount.increment();
            // execute the action
            //正真的执行在这里,调用prepareRequset返回的channleConsumer,并将channle传递给它。
            action.accept(channel);
        }
    

    在该方法中首先调用了prepareRequest方法,该方法会返回一个RestChannleConsumer,在elasticsearch中有大量的这种consumer接口。这种接口其实就是java 1.8中的函数式接口。接口中有一个accept函数,该接受一个参数。该consumer是prepareRequest函数返回的。而在RestSplitIndexAction中覆盖了该方法。

    RestSplitIndexAction

    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
            if (request.param("target") == null) {
                throw new IllegalArgumentException("no target index");
            }
            if (request.param("index") == null) {
                throw new IllegalArgumentException("no source index");
            }
            ResizeRequest shrinkIndexRequest = new ResizeRequest(request.param("target"), request.param("index"));
            shrinkIndexRequest.setResizeType(ResizeType.SPLIT);
            request.applyContentParser(parser -> ResizeRequest.PARSER.parse(parser, shrinkIndexRequest, null));
            shrinkIndexRequest.timeout(request.paramAsTime("timeout", shrinkIndexRequest.timeout()));
            shrinkIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", shrinkIndexRequest.masterNodeTimeout()));
            shrinkIndexRequest.setWaitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards")));
            return channel -> client.admin().indices().resizeIndex(shrinkIndexRequest, new AcknowledgedRestListener<ResizeResponse>(channel) {
                @Override
                public void addCustomFields(XContentBuilder builder, ResizeResponse response) throws IOException {
                    response.addCustomFields(builder);
                }
            });
        }
    

    在该方法中,构造了一个ResizeRequest对象,并将源索引和目标索引传递进去。同时设置resize的方式为spilt。elasticsearch支持两种resize操作。一种是split,将shard 分裂。另一种是shrink。也就是将shard合并。设置了一些控制超时的参数后返回了一个匿名函数。在BaseRestHandler对象中的handleRequest函数最终会调用该函数。然后进入了IndicesAdmin中的resizeIndex函数。

    IndicesAdmin

    public void resizeIndex(ResizeRequest request, ActionListener<ResizeResponse> listener) {
                execute(ResizeAction.INSTANCE, request, listener);
            }
    

    在该函数中调用execute函数,传入了一个ResezeAction对象。在elasticsearch中,对外暴露的接口都是通过内部的action对象来处理的。因为elasticsearch本身提供了restful的接口和rpc接口(传输层客户端)。所以有两套action。通过restful调的接口首先会被以rest开头的action处理(rest接口和对应的处理action关系由RestController维护),然后再在中间做一层转换,找到相应的以transport开头的action来处理。比如在split接口中,首先会被RestSplitIndexAction处理。处理完后进入了IndicesAdmin中的resizeIndex函数。而该函数中就直接去执行ResizeAction了。

    IndicesAdmin

    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(
                    Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
                client.execute(action, request, listener);
            }
    

    然后会调用client的execute方法来执行

    AbstractClient

    public final <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(
                Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
            listener = threadedWrapper.wrap(listener);
            doExecute(action, request, listener);
        }
    

    随后再调用doExecute方法,因为在IndicesAdmin中的client是NodeClient,所以直接进入NodeClient的doExecute方法。

    NodeClient

    public <    Request extends ActionRequest,
                    Response extends ActionResponse,
                    RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>
                > void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
            // Discard the task because the Client interface doesn't use it.
            executeLocally(action, request, listener);
        }
    

    然后再调用executeLocally函数

    public <    Request extends ActionRequest,
                    Response extends ActionResponse
                > Task executeLocally(GenericAction<Request, Response> action, Request request, ActionListener<Response> listener) {
            return transportAction(action).execute(request, listener);
        }
    

    这里首先调用了transportAction函数,并把ResizeAction传了进去。这一步其实就是解析传输层的handler。

    private <    Request extends ActionRequest,
                    Response extends ActionResponse
                > TransportAction<Request, Response> transportAction(GenericAction<Request, Response> action) {
            if (actions == null) {
                throw new IllegalStateException("NodeClient has not been initialized");
            }
            //根据传入的action去actions里找到对应的传输层action来处理
            TransportAction<Request, Response> transportAction = actions.get(action);
            if (transportAction == null) {
                throw new IllegalStateException("failed to find action [" + action + "] to execute");
            }
            return transportAction;
        }
    

    其中actions就是一个map,在ActionModule中的setupActions中会向actions中注册所有的传输层action。

    ActionModule

    static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) {
            // Subclass NamedRegistry for easy registration
            class ActionRegistry extends NamedRegistry<ActionHandler<?, ?>> {
                ActionRegistry() {
                    super("action");
                }
    
                public void register(ActionHandler<?, ?> handler) {
                    register(handler.getAction().name(), handler);
                }
    
                public <Request extends ActionRequest, Response extends ActionResponse> void register(
                        GenericAction<Request, Response> action, Class<? extends TransportAction<Request, Response>> transportAction,
                        Class<?>... supportTransportActions) {
                    register(new ActionHandler<>(action, transportAction, supportTransportActions));
                }
            }
            ActionRegistry actions = new ActionRegistry();
    
            ...
            //此处省略了很多action的注册
            actions.register(ResizeAction.INSTANCE, TransportResizeAction.class);
            ...
    
            actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register);
    
            return unmodifiableMap(actions.getRegistry());
        }
    

    可以看到,ResizeAction对应的处理对象为TransportResizeAction。回到NodeClient中的executeLocally函数,在找到对应的action处理后,调用其execute方法。

    TransportAction

    public final Task execute(Request request, ActionListener<Response> listener) {
            /*
             * While this version of execute could delegate to the TaskListener
             * version of execute that'd add yet another layer of wrapping on the
             * listener and prevent us from using the listener bare if there isn't a
             * task. That just seems like too many objects. Thus the two versions of
             * this method.k
             */
            Task task = taskManager.register("transport", actionName, request);
            if (task == null) {
                execute(null, request, listener);
            } else {
                execute(task, request, new ActionListener<Response>() {
                    @Override
                    public void onResponse(Response response) {
                        taskManager.unregister(task);
                        listener.onResponse(response);
                    }
    
                    @Override
                    public void onFailure(Exception e) {
                        taskManager.unregister(task);
                        listener.onFailure(e);
                    }
                });
            }
            return task;
        }
    

    这里首先向taskManager注册一个task,task就是一个任务的包装,包括该任务的类型、创建的时间、执行的action、task id及父task信息。生成task后调用了另一个重载的execute函数,同时对listener重新包装了一下,这里之所以重新包装主要是为了在listener中调用taskManager的unregister函数,把该task去掉。

    public final void execute(Task task, Request request, ActionListener<Response> listener) {
            ActionRequestValidationException validationException = request.validate();
            if (validationException != null) {
                listener.onFailure(validationException);
                return;
            }
    
            if (task != null && request.getShouldStoreResult()) {
                listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
            }
    
            RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
            requestFilterChain.proceed(task, actionName, request, listener);
        }
    

    execute函数中首先调用request.validate验证该请求是否有效,如果通过后会构造一个RequestFilterChain对象。

    RequestFilterChain

    public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
                int i = index.getAndIncrement();
                try {
                    if (i < this.action.filters.length) {
                        this.action.filters[i].apply(task, actionName, request, listener, this);
                    } else if (i == this.action.filters.length) {
                        this.action.doExecute(task, request, listener);
                    } else {
                        listener.onFailure(new IllegalStateException("proceed was called too many times"));
                    }
                } catch(Exception e) {
                    logger.trace("Error during transport action execution.", e);
                    listener.onFailure(e);
                }
            }
    

    action中有一个filter数组,维护着所有的filter。如果有filter的话会逐个的调用filter来处理。直到最后调用action的doExecute方法。TransportResizeAction没有设置filter,所以会直接调用action.doExecute方法。因为TransportResizeAction继承了TransportMasterNodeAction,最终进入了TransportMasterNodeAction的doExecute方法。

    TransportMasterNodeAction

    protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
            new AsyncSingleAction(task, request, listener).start();
        }
    

    这里创建了一个AsyncSingleAction对象,并调用了期start方法。

    AsyncSingleAction

    public void start() {
                ClusterState state = clusterService.state();
                this.observer = new ClusterStateObserver(state, clusterService, request.masterNodeTimeout(), logger, threadPool.getThreadContext());
                doStart(state);
            }
    

    首先获取了集群当前的状态,然后调用doStart方法

    protected void doStart(ClusterState clusterState) {
                final Predicate<ClusterState> masterChangePredicate = MasterNodeChangePredicate.build(clusterState);
                final DiscoveryNodes nodes = clusterState.nodes();
                if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
                    // check for block, if blocked, retry, else, execute locally
                    final ClusterBlockException blockException = checkBlock(request, clusterState);
                    if (blockException != null) {
                        if (!blockException.retryable()) {
                            listener.onFailure(blockException);
                        } else {
                            logger.trace("can't execute due to a cluster block, retrying", blockException);
                            retry(blockException, newState -> {
                                ClusterBlockException newException = checkBlock(request, newState);
                                return (newException == null || !newException.retryable());
                            });
                        }
                    } else {
                        ActionListener<Response> delegate = new ActionListener<Response>() {
                            @Override
                            public void onResponse(Response response) {
                                listener.onResponse(response);
                            }
    
                            @Override
                            public void onFailure(Exception t) {
                                if (t instanceof Discovery.FailedToCommitClusterStateException
                                        || (t instanceof NotMasterException)) {
                                    logger.debug((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t);
                                    retry(t, masterChangePredicate);
                                } else {
                                    listener.onFailure(t);
                                }
                            }
                        };
                        threadPool.executor(executor).execute(new ActionRunnable(delegate) {
                            @Override
                            protected void doRun() throws Exception {
                                masterOperation(task, request, clusterState, delegate);
                            }
                        });
                    }
                } else {
                    if (nodes.getMasterNode() == null) {
                        logger.debug("no known master node, scheduling a retry");
                        retry(null, masterChangePredicate);
                    } else {
                        DiscoveryNode masterNode = nodes.getMasterNode();
                        final String actionName = getMasterActionName(masterNode);
                        transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler<Response>(listener,
                            TransportMasterNodeAction.this::newResponse) {
                            @Override
                            public void handleException(final TransportException exp) {
                                Throwable cause = exp.unwrapCause();
                                if (cause instanceof ConnectTransportException) {
                                    // we want to retry here a bit to see if a new master is elected
                                    logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]",
                                            actionName, nodes.getMasterNode(), exp.getDetailedMessage());
                                    retry(cause, masterChangePredicate);
                                } else {
                                    listener.onFailure(exp);
                                }
                            }
                        });
                    }
                }
            }
    

    这里先判断该节点是不是master节点。如果不是master节点需要获取master节点并且把请求转发到master上去执行。否则就在本地执行。如果是本地执行,又将listener包装了一次,这次包装主要是为了在失败的时候能重试。准备工作做完后就获取生成一个ActionRunnable对象,并执行起run方法。注意,这里还是同步执行的。

    EsExecutor

    public void execute(Runnable command) {
                command.run();
            }
    

    AbstractRunnable

    public final void run() {
            try {
                doRun();
            } catch (Exception t) {
                onFailure(t);
            } finally {
                onAfter();
            }
        }
    

    ActionRunnable

    protected void doRun() throws Exception {
              masterOperation(task, request, clusterState, delegate);
       }
    

    TransportMasterNodeAction

    protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
            masterOperation(request, state, listener);
        }
    

    在这里task信息就直接背忽略了,最终调用了TransportResizeAction的masterOperation方法

    TransportResizeAction

    protected void masterOperation(final ResizeRequest resizeRequest, final ClusterState state,
                                       final ActionListener<ResizeResponse> listener) {
    
            // there is no need to fetch docs stats for split but we keep it simple and do it anyway for simplicity of the code
            final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex());
            final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index());
            client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(new ActionListener<IndicesStatsResponse>() {
                @Override
                public void onResponse(IndicesStatsResponse indicesStatsResponse) {
                    CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state,
                        (i) -> {
                            IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
                            return shard == null ? null : shard.getPrimary().getDocs();
                        }, sourceIndex, targetIndex);
                    createIndexService.createIndex(
                        updateRequest,
                        ActionListener.wrap(response ->
                                listener.onResponse(new ResizeResponse(response.isAcknowledged(), response.isShardsAcked(),
                                    updateRequest.index())), listener::onFailure
                        )
                    );
                }
    
                @Override
                public void onFailure(Exception e) {
                    listener.onFailure(e);
                }
            });
    
        }
    

    这里似乎又执行了一个task,其实该task是为了获取索引的状态信息的。但貌似这个索引状态只有在调用shrink api的时候才会有用,这里暂时不分析。当索引状态获取完毕后,会调用listener的onResponse函数。注意,这里进入到onResponse函数里其实已经是在另外的线程里了。在onResponse函数中紧接着调用了prepareCreateIndexRequest函数。

    static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final ResizeRequest resizeRequest, final ClusterState state
           , final IntFunction<DocsStats> perShardDocStats, String sourceIndexName, String targetIndexName) {
         //此处省略了一些代码
           ...
           //对目标索引中的每个shard
           for (int i = 0; i < numShards; i++) {
               if (resizeRequest.getResizeType() == ResizeType.SHRINK) {
                   Set<ShardId> shardIds = IndexMetaData.selectShrinkShards(i, metaData, numShards);
                   long count = 0;
                   for (ShardId id : shardIds) {
                       DocsStats docsStats = perShardDocStats.apply(id.id());
                       if (docsStats != null) {
                           count += docsStats.getCount();
                       }
                       if (count > IndexWriter.MAX_DOCS) {
                           throw new IllegalStateException("Can't merge index with more than [" + IndexWriter.MAX_DOCS
                               + "] docs - too many documents in shards " + shardIds);
                       }
                   }
               } else {
                   //在这里对目标索引中的每个shard都选择一个源shard。判断源shard是否为空,如果为空则抛异常
                   Objects.requireNonNull(IndexMetaData.selectSplitShard(i, metaData, numShards));
                   // we just execute this to ensure we get the right exceptions if the number of shards is wrong or less then etc.
               }
           }
    
           if (IndexMetaData.INDEX_ROUTING_PARTITION_SIZE_SETTING.exists(targetIndexSettings)) {
               throw new IllegalArgumentException("cannot provide a routing partition size value when resizing an index");
           }
           if (IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(targetIndexSettings)) {
               throw new IllegalArgumentException("cannot provide index.number_of_routing_shards on resize");
           }
           String cause = resizeRequest.getResizeType().name().toLowerCase(Locale.ROOT) + "_index";
           targetIndex.cause(cause);
           Settings.Builder settingsBuilder = Settings.builder().put(targetIndexSettings);
           settingsBuilder.put("index.number_of_shards", numShards);
           targetIndex.settings(settingsBuilder);
    
           return new CreateIndexClusterStateUpdateRequest(targetIndex,
               cause, targetIndex.index(), targetIndexName, true)
               // mappings are updated on the node when creating in the shards, this prevents race-conditions since all mapping must be
               // applied once we took the snapshot and if somebody messes things up and switches the index read/write and adds docs we miss
               // the mappings for everything is corrupted and hard to debug
               .ackTimeout(targetIndex.timeout())
               .masterNodeTimeout(targetIndex.masterNodeTimeout())
               .settings(targetIndex.settings())
               .aliases(targetIndex.aliases())
               .customs(targetIndex.customs())
               .waitForActiveShards(targetIndex.waitForActiveShards())
               .recoverFrom(metaData.getIndex())
               .resizeType(resizeRequest.getResizeType());
       }
    

    这个函数比较长,其中最重要的一步可以看

    Objects.requireNonNull(IndexMetaData.selectSplitShard(i, metaData, numShards));
    

    这一行代码内部其实体现了elasticsearch在split的时候是怎么分裂的。即目标索引的shard是从源索引的哪个shard split得到的。

    IndexMetaData

    public static ShardId selectSplitShard(int shardId, IndexMetaData sourceIndexMetadata, int numTargetShards) {
            if (shardId >= numTargetShards) {
                throw new IllegalArgumentException("the number of target shards (" + numTargetShards + ") must be greater than the shard id: "
                    + shardId);
            }
            int numSourceShards = sourceIndexMetadata.getNumberOfShards();
            if (numSourceShards > numTargetShards) {
                throw new IllegalArgumentException("the number of source shards [" + numSourceShards
                     + "] must be less that the number of target shards [" + numTargetShards + "]");
            }
            int routingFactor = getRoutingFactor(numSourceShards, numTargetShards);
            // now we verify that the numRoutingShards is valid in the source index
            int routingNumShards = sourceIndexMetadata.getRoutingNumShards();
            if (routingNumShards % numTargetShards != 0) {
                throw new IllegalStateException("the number of routing shards ["
                    + routingNumShards + "] must be a multiple of the target shards [" + numTargetShards + "]");
            }
            // this is just an additional assertion that ensures we are a factor of the routing num shards.
            assert getRoutingFactor(numTargetShards, sourceIndexMetadata.getRoutingNumShards()) >= 0;
            return new ShardId(sourceIndexMetadata.getIndex(), shardId/routingFactor);
        }
    

    这个函数接受三个参数,分别为目标索引中的某个shard, 源索引的metadata,目标索引中总共有多少个shard。先计算出routingFactor,这里的routingFactor其实是指split 扩大了多少倍。也就是用numTargetShards / numSourceShards。到后面还有有个地方有计算routingFactor,但其实和这里的概念不一样。从最后的返回值可以看出,最终的计算表达式为:

      sourceShardId = targetShardId / (numTargetShards / numSourceShards)
    

    比如源索引有两个shard,想要分裂为四个shard。那么目标索引的shard id 和源索引的shard id关系为:

    源shard 目标shard
    0 0
    0 1
    1 2
    1 3

    然后回到prepareCreateIndexRequest函数,验证通过后,创建了一个CreateIndexClusterStateUpdateRequest对象。从名字也可以看出这是一个集群状态变更对象,而且是一次创建索引的集群变更。创建后设置了一些属性,最重要的我觉得是recoverFrom属性,该属性用于决定目标索引数据怎么获取。然后继续回退到masterOperation函数。这里将prepareCreateIndexRequest对象复制给updateRequest后,传递到了createIndexService的createIndex函数,同时对listener又包装了一次,这次包装主要是为了替换response对象,在这里将其替换成了ResizeResponse对象。createIndexService其实是一个MetaDataCreateIndexService对象,负责创建索引的请求。

    MetaDataCreateIndexService

    public void createIndex(final CreateIndexClusterStateUpdateRequest request,
                                final ActionListener<CreateIndexClusterStateUpdateResponse> listener) {
            onlyCreateIndex(request, ActionListener.wrap(response -> {
                if (response.isAcknowledged()) {
                    activeShardsObserver.waitForActiveShards(new String[]{request.index()}, request.waitForActiveShards(), request.ackTimeout(),
                        shardsAcked -> {
                            if (shardsAcked == false) {
                                logger.debug("[{}] index created, but the operation timed out while waiting for " +
                                                 "enough shards to be started.", request.index());
                            }
                            listener.onResponse(new CreateIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcked));
                        }, listener::onFailure);
                } else {
                    listener.onResponse(new CreateIndexClusterStateUpdateResponse(false, false));
                }
            }, listener::onFailure));
        }
    

    这里只是对listener又包装了一次,用于判断集群状态是否被正确变更,如果是,则等待相应的shard个数被激活。可以看到elasticsearch里边采用了大量的异步方式,大量的listener包装、使用导致很容易跟丢代码,并且相应的注释也比较少。所以elasticsearch的代码还是比较难阅读的。废话不多说,再次对listener包装后进入了onlyCreateIndex函数,从名字上也可以看出这个函数仅仅只创建索引。所以创建索引和等待相应的shard被激活这是异步的。有可能索引创建成功,但shard并没有被创建。

    private void onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request,
                                     final ActionListener<ClusterStateUpdateResponse> listener) {
            Settings.Builder updatedSettingsBuilder = Settings.builder();
            Settings build = updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX).build();
            indexScopedSettings.validate(build, true); // we do validate here - index setting must be consistent
            request.settings(build);
            clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]",
                new IndexCreationTask(logger, allocationService, request, listener, indicesService, aliasValidator, xContentRegistry, settings,
                    this::validate));
        }
    

    到这一步就向集群服务提交了一个状态更新任务,并指命令操作原因,及封装了一个IndexCreationTask任务。

    ClusterService

    public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T> & ClusterStateTaskListener>
            void submitStateUpdateTask(String source, T updateTask) {
            submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask);
        }
    
    public <T> void submitStateUpdateTask(String source, T task,
                                              ClusterStateTaskConfig config,
                                              ClusterStateTaskExecutor<T> executor,
                                              ClusterStateTaskListener listener) {
            submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);
        }
    
    public <T> void submitStateUpdateTasks(final String source,
                                               final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,
                                               final ClusterStateTaskExecutor<T> executor) {
            masterService.submitStateUpdateTasks(source, tasks, config, executor);
        }
    

    到这里变成了调用MasterService的服务。

    MasterService

    public <T> void submitStateUpdateTasks(final String source,
                                               final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,
                                               final ClusterStateTaskExecutor<T> executor) {
            if (!lifecycle.started()) {
                return;
            }
            try {
                List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream()
                    .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue()), executor))
                    .collect(Collectors.toList());
                taskBatcher.submitTasks(safeTasks, config.timeout());
            } catch (EsRejectedExecutionException e) {
                // ignore cases where we are shutting down..., there is really nothing interesting
                // to be done here...
                if (!lifecycle.stoppedOrClosed()) {
                    throw e;
                }
            }
        }
    

    这里用到了java1.8中的语法,其实就是对task做了一次封装。封装成UpdateTask对象,并调用TaskBatcher提交task。

    UpdateTask

    UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener,
                           ClusterStateTaskExecutor<?> executor) {
                    super(priority, source, executor, task);
                    this.listener = listener;
                }
    

    TaskBatcher

    public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException {
            if (tasks.isEmpty()) {
                return;
            }
            final BatchedTask firstTask = tasks.get(0);
            assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) :
                "tasks submitted in a batch should share the same batching key: " + tasks;
            // convert to an identity map to check for dups based on task identity
            final Map<Object, BatchedTask> tasksIdentity = tasks.stream().collect(Collectors.toMap(
                BatchedTask::getTask,
                Function.identity(),
                (a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); },
                IdentityHashMap::new));
    
            synchronized (tasksPerBatchingKey) {
                LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.computeIfAbsent(firstTask.batchingKey,
                    k -> new LinkedHashSet<>(tasks.size()));
                for (BatchedTask existing : existingTasks) {
                    // check that there won't be two tasks with the same identity for the same batching key
                    BatchedTask duplicateTask = tasksIdentity.get(existing.getTask());
                    if (duplicateTask != null) {
                        throw new IllegalStateException("task [" + duplicateTask.describeTasks(
                            Collections.singletonList(existing)) + "] with source [" + duplicateTask.source + "] is already queued");
                    }
                }
                existingTasks.addAll(tasks);
            }
    
            if (timeout != null) {
                threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout));
            } else {
                threadExecutor.execute(firstTask);
            }
        }
    

    这个函数里有两处查重的步骤,第一个是检查函数传入的tasks中有没有重复的task,第二个是检查本次提交的tasks是否和历史提交的tasks有重复。tasksPerBatchingKey维护了同一个batchingKey对应的所有task。随后就调用executor执行这个task。

    PrioritizedEsThreadPoolExecutor

    public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {
            command = wrapRunnable(command);
            doExecute(command);
            if (timeout.nanos() >= 0) {
                if (command instanceof TieBreakingPrioritizedRunnable) {
                    ((TieBreakingPrioritizedRunnable) command).scheduleTimeout(timer, timeoutCallback, timeout);
                } else {
                    // We really shouldn't be here. The only way we can get here if somebody created PrioritizedFutureTask
                    // and passed it to execute, which doesn't make much sense
                    throw new UnsupportedOperationException("Execute with timeout is not supported for future tasks");
                }
            }
        }
    

    这里先会对command包装一下,先看下里边究竟干了啥。

    protected Runnable wrapRunnable(Runnable command) {
            if (command instanceof PrioritizedRunnable) {
                if ((command instanceof TieBreakingPrioritizedRunnable)) {
                    return command;
                }
                Priority priority = ((PrioritizedRunnable) command).priority();
              //UpdateTask对象最终会被包装成这个对象
                return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), priority, insertionOrder.incrementAndGet());
            } else if (command instanceof PrioritizedFutureTask) {
                return command;
            } else { // it might be a callable wrapper...
                if (command instanceof TieBreakingPrioritizedRunnable) {
                    return command;
                }
                return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), Priority.NORMAL, insertionOrder.incrementAndGet());
            }
        }
    

    command对象其实是前文所说的UpdateTask对象,该对象是一种优先级执行对象,所以最终会被包装成TieBreakingPrioritizedRunnable对象,这个对象实现了Runnable的run方法:

    TieBreakingPrioritizedRunnable

    public void run() {
                synchronized (this) {
                    // make the task as stared. This is needed for synchronization with the timeout handling
                    // see  #scheduleTimeout()
                    started = true;
                    FutureUtils.cancel(timeoutFuture);
                }
                runAndClean(runnable);
            }
    

    继续回到execute方法中,包装完command后直接调用了doExecute方法,该方法其实是PrioritizedEsThreadPoolExecutor父类EsThreadPoolExecutor的一个方法。

    EsThreadPoolExecutor

    protected void doExecute(final Runnable command) {
            try {
                super.execute(command);
            } catch (EsRejectedExecutionException ex) {
                if (command instanceof AbstractRunnable) {
                    // If we are an abstract runnable we can handle the rejection
                    // directly and don't need to rethrow it.
                    try {
                        ((AbstractRunnable) command).onRejection(ex);
                    } finally {
                        ((AbstractRunnable) command).onAfter();
    
                    }
                } else {
                    throw ex;
                }
            }
        }
    

    EsThreadPoolExecutor继承了java中的ThreadPoolExecutor,有关elasticsearch中的executor后面再分析。到这里可以看到在doExecute中调用了父类的execute方法,最终提交了该任务到线程池中执行。提交后回到execute方法中,如果设置了超时时间,则在一段时间后调用超时回调函数。
    至此,创建索引的任务已经被提交。在下篇文章中将会分析任务是怎么执行的。

    相关文章

      网友评论

          本文标题:elasticsearch shard split 分析(二)

          本文链接:https://www.haomeiwen.com/subject/dhudzxtx.html