美文网首页elasticsearch源码分析
Elasticsearch源码分析-索引分析(一)

Elasticsearch源码分析-索引分析(一)

作者: 尹亮_36cd | 来源:发表于2018-12-13 19:14 被阅读0次

    1. 一个简单的索引请求示例

    首先,我们来看一个索引请求:

    curl -XPUT 127.0.0.1:9200/item/show/28589790
    {
       "id": 28589790,
       "text": "这是一个索引文本"
    }
    

    这个请求的主要作用是向item索引中添加一个索引文档,文档信息:
    文档 id: 28589790
    字段id: 28589790
    字段text: 这是一个索引文本
    如果索引中已经包含id为28589790的索引,elasticsearch将会使用这条数据进行覆盖

    2. 索引时序图

    3. 索引请求转发

    1. 在elasticsearch启动时,会注入RestSearchAction 对象,并且会把方法、URI 和当前对象注册到内存中
    public class RestIndexAction extends BaseRestHandler {
        @Inject
        public RestIndexAction(Settings settings, RestController controller, Client client) {
            super(settings, controller, client);
            controller.registerHandler(POST, "/{index}/{type}", this); // auto id creation
            controller.registerHandler(PUT, "/{index}/{type}/{id}", this);
            controller.registerHandler(POST, "/{index}/{type}/{id}", this);
            CreateHandler createHandler = new CreateHandler(settings, controller, client);
            controller.registerHandler(PUT, "/{index}/{type}/{id}/_create", createHandler);
            controller.registerHandler(POST, "/{index}/{type}/{id}/_create", createHandler);
        }
    }
    

    elasticsearch使用HttpRequestHandler.messageReceived()方法接受用户请求,然后调用dispatchRequest()方法对请求进行转发。
    当请求跳转到RestController时,会调用getHandler()方法根据请求的Path获取对应的handler,由上文可以看出item/show/28589790 会匹配到RestIndexAction

    public class RestController extends AbstractLifecycleComponent<RestController> {
        void executeHandler(RestRequest request, RestChannel channel) throws Exception {
            final RestHandler handler = getHandler(request);
            if (handler != null) {
                handler.handleRequest(request, channel);
            } else {
                if (request.method() == RestRequest.Method.OPTIONS) {
                    // when we have OPTIONS request
                    // simply send OK by default (with the Access Control Origin header which gets automatically added)
                    channel.sendResponse(new BytesRestResponse(OK));
                } else {
                    channel.sendResponse(new BytesRestResponse(BAD_REQUEST, "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]"));
                }
            }
        }
    }
    

    handler.handleRequest()方法最终会调用RestIndexAction.handleRequest()方法对索引参数进行解析,创建索引请求对象indexRequest,然后调用client.index()开始创建索引

    public class RestIndexAction extends BaseRestHandler {
        @Override
        public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
            IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
            indexRequest.listenerThreaded(false);
            indexRequest.operationThreaded(true);
            indexRequest.routing(request.param("routing"));
            indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
            indexRequest.timestamp(request.param("timestamp"));
            if (request.hasParam("ttl")) {
                indexRequest.ttl(request.paramAsTime("ttl", null).millis());
            }
            indexRequest.source(request.content());
            indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
            indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh()));
            indexRequest.version(RestActions.parseVersion(request));
            indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
            String sOpType = request.param("op_type");
            if (sOpType != null) {
                try {
                    indexRequest.opType(IndexRequest.OpType.fromString(sOpType));
                } catch (ElasticsearchIllegalArgumentException eia){
                    try {
                        XContentBuilder builder = channel.newErrorBuilder();
                        channel.sendResponse(new BytesRestResponse(BAD_REQUEST, builder.startObject().field("error", eia.getMessage()).endObject()));
                    } catch (IOException e1) {
                        logger.warn("Failed to send response", e1);
                        return;
                    }
                }
            }
            String replicationType = request.param("replication");
            if (replicationType != null) {
                indexRequest.replicationType(ReplicationType.fromString(replicationType));
            }
            String consistencyLevel = request.param("consistency");
            if (consistencyLevel != null) {
                indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
            }
            client.index(indexRequest, new RestBuilderListener<IndexResponse>(channel) {
                @Override
                public RestResponse buildResponse(IndexResponse response, XContentBuilder builder) throws Exception {
                    builder.startObject()
                            .field(Fields._INDEX, response.getIndex())
                            .field(Fields._TYPE, response.getType())
                            .field(Fields._ID, response.getId())
                            .field(Fields._VERSION, response.getVersion())
                            .field(Fields.CREATED, response.isCreated());
                    builder.endObject();
                    RestStatus status = OK;
                    if (response.isCreated()) {
                        status = CREATED;
                    }
                    return new BytesRestResponse(status, builder);
                }
            });
        }
    }
    

    在索引请求中,支持下列参数:
    routing: 路由信息,具有相同路由信息的文档存储在同一分片上
    parent: 文档的parent id, 如果未设置路由,则会自动将其设置为路由
    timestamp: 文档产生的时间戳
    ttl: 过期时间
    timeout: 超时时间
    refresh: 此索引操作之后是否执行刷新,从而使文档可被搜索,默认为false
    version: 文档的版本号
    version_type: 版本类型,默认internal,支持internal、external、external_gt、external_gte和force
    op_type: 索引操作类型,支持create和index
    replication: 副本类型,支持async、sync和default
    consistency: 一致性,支持one、quorum、all和default
    请求的content即索引的source,文档内容
    在封装完索引请求后,就要调用 client.index() 执行索引

    4. 创建索引入口

    在index()方法中,使用的Action是IndexAction.INSTANCE

    public abstract class AbstractClient implements Client {
        @Override
        public void index(final IndexRequest request, final ActionListener<IndexResponse> listener) {
            execute(IndexAction.INSTANCE, request, listener);
        }
    }
    

    这个action在ActionModule中被TransportIndexAction注册

    public class ActionModule extends AbstractModule {
        @Override
        protected void configure() {
            registerAction(IndexAction.INSTANCE, TransportIndexAction.class);
        }
    }
    

    因此在NodeClient的execute()方法中根据action获取到的transport action为TransportIndexAction

    public class NodeClient extends AbstractClient {
        @Override
        public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
            headers.applyTo(request);
            TransportAction<Request, Response> transportAction = actions.get((ClientAction)action);  // TransportIndexAction
            transportAction.execute(request, listener);
        }
    }
    

    由于TransportIndexAction继承了TransportAction,因此调用过程为NodeClient.execute() -> TransportAction.execute() -> TransportIndexAction.doExecute()
    索引的大体流程为:先判断是否需要创建索引,如果是则先创建索引,然后写入文档数据,否则直接写入文档数据

    public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
        @Override
        protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
            if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
                CreateIndexRequest createIndexRequest = new CreateIndexRequest(request);
                createIndexRequest.index(request.index());
                createIndexRequest.mapping(request.type());
                createIndexRequest.cause("auto(index api)");
                createIndexRequest.masterNodeTimeout(request.timeout());
                createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>() {
                    @Override
                    public void onResponse(CreateIndexResponse result) {
                        innerExecute(request, listener);
                    }
    
                    @Override
                    public void onFailure(Throwable e) {
                        if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
                            // we have the index, do it
                            try {
                                innerExecute(request, listener);
                            } catch (Throwable e1) {
                                listener.onFailure(e1);
                            }
                        } else {
                            listener.onFailure(e);
                        }
                    }
                });
            } else {
                innerExecute(request, listener);
            }
        }
    }
    

    elasticsearch主要使用AutoCreateIndex.shouldAutoCreate()方法来判断是否需要创建索引

    public class AutoCreateIndex {
        public AutoCreateIndex(Settings settings) {
            String value = settings.get("action.auto_create_index");
            if (value == null || Booleans.isExplicitTrue(value)) {
                needToCheck = true;
                globallyDisabled = false;
                matches = null;
                matches2 = null;
            } else if (Booleans.isExplicitFalse(value)) {
                needToCheck = false;
                globallyDisabled = true;
                matches = null;
                matches2 = null;
            } else {
                needToCheck = true;
                globallyDisabled = false;
                matches = Strings.commaDelimitedListToStringArray(value);
                matches2 = new String[matches.length];
                for (int i = 0; i < matches.length; i++) {
                    matches2[i] = matches[i].substring(1);
                }
            }
        }
    
        public boolean shouldAutoCreate(String index, ClusterState state) {
            if (!needToCheck) {
                return false;
            }
            if (state.metaData().hasConcreteIndex(index)) {
                return false;
            }
            if (globallyDisabled) {
                return false;
            }
            if (matches == null) {
                return true;
            }
            for (int i = 0; i < matches.length; i++) {
                char c = matches[i].charAt(0);
                if (c == '-') {
                    if (Regex.simpleMatch(matches2[i], index)) {
                        return false;
                    }
                } else if (c == '+') {
                    if (Regex.simpleMatch(matches2[i], index)) {
                        return true;
                    }
                } else {
                    if (Regex.simpleMatch(matches[i], index)) {
                        return true;
                    }
                }
            }
            return false;
        }
    }
    

    其中参数和globallyDisabled的含义:
    action.auto_create_index: elasticsearch配置文件的的配置项,表示是否允许创建索引
    needToCheck: 是否需要检查能否创建索引,只有当action.auto_create_index为false时不需要检查,直接返回无法创建索引
    globallyDisabled: 是否全局禁用创建索引,只有当action.auto_create_index为false时全局禁用创建索引,直接返回无法创建索引
    如果当前集群中已经包含了要创建的索引,那么也不需要创建索引。其他情况则根据action.auto_create_index配置的正则表达式来判断
    如果允许创建索引,则开始创建索引名的流程

    5. 创建索引名

    首先创建创建索引的请求createIndexRequest,设置了4个参数,分别是索引名index、索引mapping、创建索引的原因cause和master节点超时时间masterNodeTimeout

    CreateIndexRequest createIndexRequest = new CreateIndexRequest(request);
    createIndexRequest.index(request.index());
    createIndexRequest.mapping(request.type());
    createIndexRequest.cause("auto(index api)");
    createIndexRequest.masterNodeTimeout(request.timeout());
    

    然后开始调用createIndexAction.execute()方法创建索引名

    public abstract class TransportAction<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {
        public final void execute(Request request, ActionListener<Response> listener) {
            if (forceThreadedListener()) {
                request.listenerThreaded(true);
            }
            if (request.listenerThreaded()) {
                listener = new ThreadedActionListener<>(threadPool, listener, logger);
            }
    
            ActionRequestValidationException validationException = request.validate();
            if (validationException != null) {
                listener.onFailure(validationException);
                return;
            }
    
            if (filters.length == 0) {
                try {
                    // TransportAction 子类都要重写这个方法
                    doExecute(request, listener);
                } catch(Throwable t) {
                    logger.trace("Error during transport action execution.", t);
                    listener.onFailure(t);
                }
            } else {
                RequestFilterChain requestFilterChain = new RequestFilterChain<>(this, logger);
                requestFilterChain.proceed(actionName, request, listener);
            }
        }
    
        rotected abstract void doExecute(Request request, ActionListener<Response> listener);
    }
    

    从下面的类图可以看出,TransportCreateIndexAction继承了TransportMasterNodeOperation,调用过程即TransportAction.execute()-> TransportMasterNodeOperation.doExecute()方法来完成操作


    TransportCreateIndexAction类图

    在TransportMasterNodeOperation中主要是保证操作在master节点上执行

    public abstract class TransportMasterNodeOperationAction<Request extends MasterNodeOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
        @Override
        protected void doExecute(final Request request, final ActionListener<Response> listener) {
            innerExecute(request, listener, new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger), false);
        }
    
        private void innerExecute(final Request request, final ActionListener<Response> listener, final ClusterStateObserver observer, final boolean retrying) {
            final ClusterState clusterState = observer.observedState();
            final DiscoveryNodes nodes = clusterState.nodes();
            if (nodes.localNodeMaster() || localExecute(request)) {
                final ClusterBlockException blockException = checkBlock(request, clusterState);
                if (blockException != null) {
                    if (!blockException.retryable()) {
                        listener.onFailure(blockException);
                        return;
                    }
                    logger.trace("can't execute due to a cluster block: [{}], retrying", blockException);
                    observer.waitForNextChange(
                            new ClusterStateObserver.Listener() {
                                @Override
                                public void onNewClusterState(ClusterState state) {
                                    innerExecute(request, listener, observer, false);
                                }
    
                                @Override
                                public void onClusterServiceClose() {
                                    listener.onFailure(blockException);
                                }
    
                                @Override
                                public void onTimeout(TimeValue timeout) {
                                    listener.onFailure(blockException);
                                }
                            }, new ClusterStateObserver.ValidationPredicate() {
                                @Override
                                protected boolean validate(ClusterState newState) {
                                    ClusterBlockException blockException = checkBlock(request, newState);
                                    return (blockException == null || !blockException.retryable());
                                }
                            }
                    );
    
                } else {
                    try {
                        threadPool.executor(executor).execute(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    masterOperation(request, clusterService.state(), listener);
                                } catch (Throwable e) {
                                    listener.onFailure(e);
                                }
                            }
                        });
                    } catch (Throwable t) {
                        listener.onFailure(t);
                    }
                }
            } else {
                if (nodes.masterNode() == null) {
                    if (retrying) {
                        listener.onFailure(new MasterNotDiscoveredException());
                    } else {
                        logger.debug("no known master node, scheduling a retry");
                        observer.waitForNextChange(
                                new ClusterStateObserver.Listener() {
                                    @Override
                                    public void onNewClusterState(ClusterState state) {
                                        // 集群状态发生了改变, 重新执行该方法
                                        innerExecute(request, listener, observer, true);
                                    }
    
                                    @Override
                                    public void onClusterServiceClose() {
                                        listener.onFailure(new NodeClosedException(clusterService.localNode()));
                                    }
    
                                    @Override
                                    public void onTimeout(TimeValue timeout) {
                                        listener.onFailure(new MasterNotDiscoveredException("waited for [" + timeout + "]"));
                                    }
                                }, new ClusterStateObserver.ChangePredicate() {
                                    @Override
                                    public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus,
                                                         ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
                                        return newState.nodes().masterNodeId() != null;
                                    }
    
                                    @Override
                                    public boolean apply(ClusterChangedEvent event) {
                                        return event.nodesDelta().masterNodeChanged();
                                    }
                                }
                        );
                    }
                    return;
                }
                processBeforeDelegationToMaster(request, clusterState);
    
                transportService.sendRequest(nodes.masterNode(), actionName, request, new BaseTransportResponseHandler<Response>() {
                    @Override
                    public Response newInstance() {
                        return newResponse();
                    }
    
                    @Override
                    public void handleResponse(Response response) {
                        listener.onResponse(response);
                    }
    
                    @Override
                    public String executor() {
                        return ThreadPool.Names.SAME;
                    }
    
                    @Override
                    public void handleException(final TransportException exp) {
                        if (exp.unwrapCause() instanceof ConnectTransportException) {
                            // we want to retry here a bit to see if a new master is elected
                            logger.debug("connection exception while trying to forward request to master node [{}], scheduling a retry. Error: [{}]",
                                    nodes.masterNode(), exp.getDetailedMessage());
                            observer.waitForNextChange(new ClusterStateObserver.Listener() {
                                                           @Override
                                                           public void onNewClusterState(ClusterState state) {
                                                               innerExecute(request, listener, observer, false);
                                                           }
    
                                                           @Override
                                                           public void onClusterServiceClose() {
                                                               listener.onFailure(new NodeClosedException(clusterService.localNode()));
                                                           }
    
                                                           @Override
                                                           public void onTimeout(TimeValue timeout) {
                                                               listener.onFailure(new MasterNotDiscoveredException());
                                                           }
                                                       }, new ClusterStateObserver.EventPredicate() {
                                                           @Override
                                                           public boolean apply(ClusterChangedEvent event) {
                                                               return event.nodesDelta().masterNodeChanged();
                                                           }
                                                       }
                            );
                        } else {
                            listener.onFailure(exp);
                        }
                    }
                });
            }
        }
    }
    

    这个操作主要保证了两点:
    (1)如果当前节点不是master,则将请求发送到master节点执行masterOperation()方法
    (2)如果当前集群block了,则等待集群状态更新,然后重新执行完整的innerExecute()方法

    然后进入到TransportCreateIndexAction.masterOperation()方法中,创建CreateIndexClusterStateUpdateRequest对象,用来创建索引时更新集群状态信息的请求,其中settings和mappings及aliases默认为空集合

    public class TransportCreateIndexAction extends TransportMasterNodeOperationAction<CreateIndexRequest, CreateIndexResponse> {
        private final MetaDataCreateIndexService createIndexService;
    
        @Override
        protected void masterOperation(final CreateIndexRequest request, final ClusterState state, final ActionListener<CreateIndexResponse> listener) throws ElasticsearchException {
            String cause = request.cause();
            if (cause.length() == 0) {
                cause = "api";
            }
    
            final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, request.index())
                    .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
                    .settings(request.settings()).mappings(request.mappings())
                    .aliases(request.aliases()).customs(request.customs());
    
            // 执行创建索引
            createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
                @Override
                public void onResponse(ClusterStateUpdateResponse response) {
                    listener.onResponse(new CreateIndexResponse(response.isAcknowledged()));
                }
    
                @Override
                public void onFailure(Throwable t) {
                    if (t instanceof IndexAlreadyExistsException) {
                        logger.trace("[{}] failed to create", t, request.index());
                    } else {
                        logger.debug("[{}] failed to create", t, request.index());
                    }
                    listener.onFailure(t);
                }
            });
        }
    }
    

    然后调用MetaDataCreateIndexService的createIndex()方法,如果能获取到锁信息则直接执行重载的createIndex()方法,否则交给线程池去执行

    public class MetaDataCreateIndexService extends AbstractComponent {
        public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
            final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index());
    
            if (mdLock.tryAcquire()) {
                createIndex(request, listener, mdLock);
                return;
            }
            threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new ActionRunnable(listener) {
                @Override
                public void doRun() throws InterruptedException {
                    if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) {
                        listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock"));
                        return;
                    }
                    createIndex(request, listener, mdLock);
                }
            });
        }
    }
    

    在重载从createIndex()方法中,通过提交一个更新集群状态的任务来实现创建索引的具体逻辑

    public class MetaDataCreateIndexService extends AbstractComponent {
        private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener, final Semaphore mdLock) {
    
            ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
            updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX);
            request.settings(updatedSettingsBuilder.build());
    
            clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]",
                    Priority.URGENT,
                    new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
    
                @Override
                protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
                    return new ClusterStateUpdateResponse(acknowledged);
                }
    
                @Override
                public void onAllNodesAcked(@Nullable Throwable t) {
                    mdLock.release();
                    super.onAllNodesAcked(t);
                }
    
                @Override
                public void onAckTimeout() {
                    mdLock.release();
                    super.onAckTimeout();
                }
    
                @Override
                public void onFailure(String source, Throwable t) {
                    mdLock.release();
                    super.onFailure(source, t);
                }
    
                @Override
                public ClusterState execute(ClusterState currentState) throws Exception {
                    // 创建索引的具体逻辑
                    // ...
                }
            });
        }
    }
    

    提交StateUpdateTask任务时,会创建一个UpdateTask对象,然后执行其run()方法,即MetaDataCreateIndexService中创建的AckedClusterStateUpdateTask匿名对象

    public class InternalClusterService extends AbstractLifecycleComponent<ClusterService> implements ClusterService {
        public void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask) {
            if (!lifecycle.started()) {
                return;
            }
            try {
                final UpdateTask task = new UpdateTask(source, priority, updateTask);
                if (updateTask instanceof TimeoutClusterStateUpdateTask) {
                    final TimeoutClusterStateUpdateTask timeoutUpdateTask = (TimeoutClusterStateUpdateTask) updateTask;
                    updateTasksExecutor.execute(task, threadPool.scheduler(), timeoutUpdateTask.timeout(), new Runnable() {
                        @Override
                        public void run() {
                            threadPool.generic().execute(new Runnable() {
                                @Override
                                public void run() {
                                    timeoutUpdateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source()));
                                }
                            });
                        }
                    });
                } else {
                    updateTasksExecutor.execute(task);
                }
            } catch (EsRejectedExecutionException e) {
                if (!lifecycle.stoppedOrClosed()) {
                    throw e;
                }
            }
        }
    }
    

    在UpdateTask的run()方法中,会调用ClusterStateUpdateTask.execute()方法获取新的集群状态,

    class UpdateTask extends TimedPrioritizedRunnable {
    
            public final ClusterStateUpdateTask updateTask;
    
    
            UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
                super(priority, source);
                this.updateTask = updateTask;
            }
    
            @Override
            public void run() {
                if (!lifecycle.started()) {
                    logger.debug("processing [{}]: ignoring, cluster_service not started", source);
                    return;
                }
                logger.debug("processing [{}]: execute", source);
                ClusterState previousClusterState = clusterState;
                // 当前节点是否为master
                if (!previousClusterState.nodes().localNodeMaster() && updateTask.runOnlyOnMaster()) {
                    logger.debug("failing [{}]: local node is no longer master", source);
                    updateTask.onNoLongerMaster(source);
                    return;
                }
                // 新的集群状态
                ClusterState newClusterState;
                long startTimeNS = System.nanoTime();
                try {
                    // 调用task的execute方法,获取新的集群状态
                    newClusterState = updateTask.execute(previousClusterState);
                } catch (Throwable e) {
                    TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
                    if (logger.isTraceEnabled()) {
                        StringBuilder sb = new StringBuilder("failed to execute cluster state update in ").append(executionTime).append(", state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
                        sb.append(previousClusterState.nodes().prettyPrint());
                        sb.append(previousClusterState.routingTable().prettyPrint());
                        sb.append(previousClusterState.readOnlyRoutingNodes().prettyPrint());
                        logger.trace(sb.toString(), e);
                    }
                    warnAboutSlowTaskIfNeeded(executionTime, source);
                    updateTask.onFailure(source, e);
                    return;
                }
    
                // 集群状态没有发生更改
                if (previousClusterState == newClusterState) {
                    if (updateTask instanceof AckedClusterStateUpdateTask) {
                        //no need to wait for ack if nothing changed, the update can be counted as acknowledged
                        ((AckedClusterStateUpdateTask) updateTask).onAllNodesAcked(null);
                    }
                    if (updateTask instanceof ProcessedClusterStateUpdateTask) {
                        ((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
                    }
                    TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
                    logger.debug("processing [{}]: took {} no change in cluster_state", source, executionTime);
                    warnAboutSlowTaskIfNeeded(executionTime, source);
                    return;
                }
    
                try {
                    Discovery.AckListener ackListener = new NoOpAckListener();
                    // 当前节点是master
                    if (newClusterState.nodes().localNodeMaster()) {
                        // only the master controls the version numbers
                        Builder builder = ClusterState.builder(newClusterState).version(newClusterState.version() + 1);
                        // 重新构建routing table
                        if (previousClusterState.routingTable() != newClusterState.routingTable()) {
                            builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1));
                        }
                        // 重新构建meta data
                        if (previousClusterState.metaData() != newClusterState.metaData()) {
                            builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));
                        }
                        newClusterState = builder.build();
    
                        if (updateTask instanceof AckedClusterStateUpdateTask) {
                            final AckedClusterStateUpdateTask ackedUpdateTask = (AckedClusterStateUpdateTask) updateTask;
                            if (ackedUpdateTask.ackTimeout() == null || ackedUpdateTask.ackTimeout().millis() == 0) {
                                ackedUpdateTask.onAckTimeout();
                            } else {
                                try {
                                    ackListener = new AckCountDownListener(ackedUpdateTask, newClusterState.version(), newClusterState.nodes(), threadPool);
                                } catch (EsRejectedExecutionException ex) {
                                    if (logger.isDebugEnabled()) {
                                        logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex);
                                    }
                                    //timeout straightaway, otherwise we could wait forever as the timeout thread has not started
                                    ackedUpdateTask.onAckTimeout();
                                }
                            }
                        }
                    }
    
                    newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED);
    
                    if (logger.isTraceEnabled()) {
                        StringBuilder sb = new StringBuilder("cluster state updated, source [").append(source).append("]\n");
                        sb.append(newClusterState.prettyPrint());
                        logger.trace(sb.toString());
                    } else if (logger.isDebugEnabled()) {
                        logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source);
                    }
    
                    ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState);
                    // new cluster state, notify all listeners
                    final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
                    if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
                        String summary = nodesDelta.shortSummary();
                        if (summary.length() > 0) {
                            logger.info("{}, reason: {}", summary, source);
                        }
                    }
    
                    // TODO, do this in parallel (and wait)
                    for (DiscoveryNode node : nodesDelta.addedNodes()) {
                        if (!nodeRequiresConnection(node)) {
                            continue;
                        }
                        try {
                            transportService.connectToNode(node);
                        } catch (Throwable e) {
                            // the fault detection will detect it as failed as well
                            logger.warn("failed to connect to node [" + node + "]", e);
                        }
                    }
    
                    // if we are the master, publish the new state to all nodes
                    // we publish here before we send a notification to all the listeners, since if it fails
                    // we don't want to notify
                    if (newClusterState.nodes().localNodeMaster()) {
                        logger.debug("publishing cluster state version {}", newClusterState.version());
                        discoveryService.publish(newClusterState, ackListener);
                    }
    
                    // update the current cluster state
                    // 更新集群的state
                    clusterState = newClusterState;
                    logger.debug("set local cluster state to version {}", newClusterState.version());
                    for (ClusterStateListener listener : preAppliedListeners) {
                        try {
                            listener.clusterChanged(clusterChangedEvent);
                        } catch (Exception ex) {
                            logger.warn("failed to notify ClusterStateListener", ex);
                        }
                    }
    
                    for (DiscoveryNode node : nodesDelta.removedNodes()) {
                        try {
                            transportService.disconnectFromNode(node);
                        } catch (Throwable e) {
                            logger.warn("failed to disconnect to node [" + node + "]", e);
                        }
                    }
    
                    newClusterState.status(ClusterState.ClusterStateStatus.APPLIED);
    
                    for (ClusterStateListener listener : postAppliedListeners) {
                        try {
                            listener.clusterChanged(clusterChangedEvent);
                        } catch (Exception ex) {
                            logger.warn("failed to notify ClusterStateListener", ex);
                        }
                    }
    
                    //manual ack only from the master at the end of the publish
                    if (newClusterState.nodes().localNodeMaster()) {
                        try {
                            ackListener.onNodeAck(newClusterState.nodes().localNode(), null);
                        } catch (Throwable t) {
                            logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode());
                        }
                    }
    
                    // 调用task的clusterStateProcessed()方法
                    if (updateTask instanceof ProcessedClusterStateUpdateTask) {
                        ((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
                    }
    
                    TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
                    logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {})", source, executionTime, newClusterState.version());
                    warnAboutSlowTaskIfNeeded(executionTime, source);
                } catch (Throwable t) {
                    TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
                    StringBuilder sb = new StringBuilder("failed to apply updated cluster state in ").append(executionTime).append(":\nversion [").append("], source [").append(source).append("]\n");
                    sb.append(newClusterState.nodes().prettyPrint());
                    sb.append(newClusterState.routingTable().prettyPrint());
                    sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint());
                    logger.warn(sb.toString(), t);
                    // TODO: do we want to call updateTask.onFailure here?
                }
            }
        }
    

    在完成索引创建完成后,集群状态信息会发生变化,elasticsearch会将这个变化发布到其他节点,以维持集群统一的状态信息

    相关文章

      网友评论

        本文标题:Elasticsearch源码分析-索引分析(一)

        本文链接:https://www.haomeiwen.com/subject/ssyvhqtx.html