1. 一个简单的索引请求示例
首先,我们来看一个索引请求:
curl -XPUT 127.0.0.1:9200/item/show/28589790
{
"id": 28589790,
"text": "这是一个索引文本"
}
这个请求的主要作用是向item索引中添加一个索引文档,文档信息:
文档 id: 28589790
字段id: 28589790
字段text: 这是一个索引文本
如果索引中已经包含id为28589790的索引,elasticsearch将会使用这条数据进行覆盖
2. 索引时序图
3. 索引请求转发
- 在elasticsearch启动时,会注入RestSearchAction 对象,并且会把方法、URI 和当前对象注册到内存中
public class RestIndexAction extends BaseRestHandler {
@Inject
public RestIndexAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(POST, "/{index}/{type}", this); // auto id creation
controller.registerHandler(PUT, "/{index}/{type}/{id}", this);
controller.registerHandler(POST, "/{index}/{type}/{id}", this);
CreateHandler createHandler = new CreateHandler(settings, controller, client);
controller.registerHandler(PUT, "/{index}/{type}/{id}/_create", createHandler);
controller.registerHandler(POST, "/{index}/{type}/{id}/_create", createHandler);
}
}
elasticsearch使用HttpRequestHandler.messageReceived()方法接受用户请求,然后调用dispatchRequest()方法对请求进行转发。
当请求跳转到RestController时,会调用getHandler()方法根据请求的Path获取对应的handler,由上文可以看出item/show/28589790
会匹配到RestIndexAction
public class RestController extends AbstractLifecycleComponent<RestController> {
void executeHandler(RestRequest request, RestChannel channel) throws Exception {
final RestHandler handler = getHandler(request);
if (handler != null) {
handler.handleRequest(request, channel);
} else {
if (request.method() == RestRequest.Method.OPTIONS) {
// when we have OPTIONS request
// simply send OK by default (with the Access Control Origin header which gets automatically added)
channel.sendResponse(new BytesRestResponse(OK));
} else {
channel.sendResponse(new BytesRestResponse(BAD_REQUEST, "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]"));
}
}
}
}
handler.handleRequest()方法最终会调用RestIndexAction.handleRequest()方法对索引参数进行解析,创建索引请求对象indexRequest,然后调用client.index()开始创建索引
public class RestIndexAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
indexRequest.listenerThreaded(false);
indexRequest.operationThreaded(true);
indexRequest.routing(request.param("routing"));
indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
indexRequest.timestamp(request.param("timestamp"));
if (request.hasParam("ttl")) {
indexRequest.ttl(request.paramAsTime("ttl", null).millis());
}
indexRequest.source(request.content());
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh()));
indexRequest.version(RestActions.parseVersion(request));
indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
String sOpType = request.param("op_type");
if (sOpType != null) {
try {
indexRequest.opType(IndexRequest.OpType.fromString(sOpType));
} catch (ElasticsearchIllegalArgumentException eia){
try {
XContentBuilder builder = channel.newErrorBuilder();
channel.sendResponse(new BytesRestResponse(BAD_REQUEST, builder.startObject().field("error", eia.getMessage()).endObject()));
} catch (IOException e1) {
logger.warn("Failed to send response", e1);
return;
}
}
}
String replicationType = request.param("replication");
if (replicationType != null) {
indexRequest.replicationType(ReplicationType.fromString(replicationType));
}
String consistencyLevel = request.param("consistency");
if (consistencyLevel != null) {
indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
}
client.index(indexRequest, new RestBuilderListener<IndexResponse>(channel) {
@Override
public RestResponse buildResponse(IndexResponse response, XContentBuilder builder) throws Exception {
builder.startObject()
.field(Fields._INDEX, response.getIndex())
.field(Fields._TYPE, response.getType())
.field(Fields._ID, response.getId())
.field(Fields._VERSION, response.getVersion())
.field(Fields.CREATED, response.isCreated());
builder.endObject();
RestStatus status = OK;
if (response.isCreated()) {
status = CREATED;
}
return new BytesRestResponse(status, builder);
}
});
}
}
在索引请求中,支持下列参数:
routing: 路由信息,具有相同路由信息的文档存储在同一分片上
parent: 文档的parent id, 如果未设置路由,则会自动将其设置为路由
timestamp: 文档产生的时间戳
ttl: 过期时间
timeout: 超时时间
refresh: 此索引操作之后是否执行刷新,从而使文档可被搜索,默认为false
version: 文档的版本号
version_type: 版本类型,默认internal,支持internal、external、external_gt、external_gte和force
op_type: 索引操作类型,支持create和index
replication: 副本类型,支持async、sync和default
consistency: 一致性,支持one、quorum、all和default
请求的content即索引的source,文档内容
在封装完索引请求后,就要调用 client.index() 执行索引
4. 创建索引入口
在index()方法中,使用的Action是IndexAction.INSTANCE
public abstract class AbstractClient implements Client {
@Override
public void index(final IndexRequest request, final ActionListener<IndexResponse> listener) {
execute(IndexAction.INSTANCE, request, listener);
}
}
这个action在ActionModule中被TransportIndexAction注册
public class ActionModule extends AbstractModule {
@Override
protected void configure() {
registerAction(IndexAction.INSTANCE, TransportIndexAction.class);
}
}
因此在NodeClient的execute()方法中根据action获取到的transport action为TransportIndexAction
public class NodeClient extends AbstractClient {
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
headers.applyTo(request);
TransportAction<Request, Response> transportAction = actions.get((ClientAction)action); // TransportIndexAction
transportAction.execute(request, listener);
}
}
由于TransportIndexAction继承了TransportAction,因此调用过程为NodeClient.execute() -> TransportAction.execute() -> TransportIndexAction.doExecute()
索引的大体流程为:先判断是否需要创建索引,如果是则先创建索引,然后写入文档数据,否则直接写入文档数据
public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
@Override
protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(request);
createIndexRequest.index(request.index());
createIndexRequest.mapping(request.type());
createIndexRequest.cause("auto(index api)");
createIndexRequest.masterNodeTimeout(request.timeout());
createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(request, listener);
}
@Override
public void onFailure(Throwable e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
// we have the index, do it
try {
innerExecute(request, listener);
} catch (Throwable e1) {
listener.onFailure(e1);
}
} else {
listener.onFailure(e);
}
}
});
} else {
innerExecute(request, listener);
}
}
}
elasticsearch主要使用AutoCreateIndex.shouldAutoCreate()方法来判断是否需要创建索引
public class AutoCreateIndex {
public AutoCreateIndex(Settings settings) {
String value = settings.get("action.auto_create_index");
if (value == null || Booleans.isExplicitTrue(value)) {
needToCheck = true;
globallyDisabled = false;
matches = null;
matches2 = null;
} else if (Booleans.isExplicitFalse(value)) {
needToCheck = false;
globallyDisabled = true;
matches = null;
matches2 = null;
} else {
needToCheck = true;
globallyDisabled = false;
matches = Strings.commaDelimitedListToStringArray(value);
matches2 = new String[matches.length];
for (int i = 0; i < matches.length; i++) {
matches2[i] = matches[i].substring(1);
}
}
}
public boolean shouldAutoCreate(String index, ClusterState state) {
if (!needToCheck) {
return false;
}
if (state.metaData().hasConcreteIndex(index)) {
return false;
}
if (globallyDisabled) {
return false;
}
if (matches == null) {
return true;
}
for (int i = 0; i < matches.length; i++) {
char c = matches[i].charAt(0);
if (c == '-') {
if (Regex.simpleMatch(matches2[i], index)) {
return false;
}
} else if (c == '+') {
if (Regex.simpleMatch(matches2[i], index)) {
return true;
}
} else {
if (Regex.simpleMatch(matches[i], index)) {
return true;
}
}
}
return false;
}
}
其中参数和globallyDisabled的含义:
action.auto_create_index: elasticsearch配置文件的的配置项,表示是否允许创建索引
needToCheck: 是否需要检查能否创建索引,只有当action.auto_create_index为false时不需要检查,直接返回无法创建索引
globallyDisabled: 是否全局禁用创建索引,只有当action.auto_create_index为false时全局禁用创建索引,直接返回无法创建索引
如果当前集群中已经包含了要创建的索引,那么也不需要创建索引。其他情况则根据action.auto_create_index配置的正则表达式来判断
如果允许创建索引,则开始创建索引名的流程
5. 创建索引名
首先创建创建索引
的请求createIndexRequest,设置了4个参数,分别是索引名index、索引mapping、创建索引的原因cause和master节点超时时间masterNodeTimeout
CreateIndexRequest createIndexRequest = new CreateIndexRequest(request);
createIndexRequest.index(request.index());
createIndexRequest.mapping(request.type());
createIndexRequest.cause("auto(index api)");
createIndexRequest.masterNodeTimeout(request.timeout());
然后开始调用createIndexAction.execute()方法创建索引名
public abstract class TransportAction<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {
public final void execute(Request request, ActionListener<Response> listener) {
if (forceThreadedListener()) {
request.listenerThreaded(true);
}
if (request.listenerThreaded()) {
listener = new ThreadedActionListener<>(threadPool, listener, logger);
}
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
return;
}
if (filters.length == 0) {
try {
// TransportAction 子类都要重写这个方法
doExecute(request, listener);
} catch(Throwable t) {
logger.trace("Error during transport action execution.", t);
listener.onFailure(t);
}
} else {
RequestFilterChain requestFilterChain = new RequestFilterChain<>(this, logger);
requestFilterChain.proceed(actionName, request, listener);
}
}
rotected abstract void doExecute(Request request, ActionListener<Response> listener);
}
从下面的类图可以看出,TransportCreateIndexAction继承了TransportMasterNodeOperation,调用过程即TransportAction.execute()-> TransportMasterNodeOperation.doExecute()方法来完成操作
TransportCreateIndexAction类图
在TransportMasterNodeOperation中主要是保证操作在master节点上执行
public abstract class TransportMasterNodeOperationAction<Request extends MasterNodeOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
@Override
protected void doExecute(final Request request, final ActionListener<Response> listener) {
innerExecute(request, listener, new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger), false);
}
private void innerExecute(final Request request, final ActionListener<Response> listener, final ClusterStateObserver observer, final boolean retrying) {
final ClusterState clusterState = observer.observedState();
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.localNodeMaster() || localExecute(request)) {
final ClusterBlockException blockException = checkBlock(request, clusterState);
if (blockException != null) {
if (!blockException.retryable()) {
listener.onFailure(blockException);
return;
}
logger.trace("can't execute due to a cluster block: [{}], retrying", blockException);
observer.waitForNextChange(
new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
innerExecute(request, listener, observer, false);
}
@Override
public void onClusterServiceClose() {
listener.onFailure(blockException);
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(blockException);
}
}, new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
ClusterBlockException blockException = checkBlock(request, newState);
return (blockException == null || !blockException.retryable());
}
}
);
} else {
try {
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
masterOperation(request, clusterService.state(), listener);
} catch (Throwable e) {
listener.onFailure(e);
}
}
});
} catch (Throwable t) {
listener.onFailure(t);
}
}
} else {
if (nodes.masterNode() == null) {
if (retrying) {
listener.onFailure(new MasterNotDiscoveredException());
} else {
logger.debug("no known master node, scheduling a retry");
observer.waitForNextChange(
new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
// 集群状态发生了改变, 重新执行该方法
innerExecute(request, listener, observer, true);
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new MasterNotDiscoveredException("waited for [" + timeout + "]"));
}
}, new ClusterStateObserver.ChangePredicate() {
@Override
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus,
ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
return newState.nodes().masterNodeId() != null;
}
@Override
public boolean apply(ClusterChangedEvent event) {
return event.nodesDelta().masterNodeChanged();
}
}
);
}
return;
}
processBeforeDelegationToMaster(request, clusterState);
transportService.sendRequest(nodes.masterNode(), actionName, request, new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponse();
}
@Override
public void handleResponse(Response response) {
listener.onResponse(response);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleException(final TransportException exp) {
if (exp.unwrapCause() instanceof ConnectTransportException) {
// we want to retry here a bit to see if a new master is elected
logger.debug("connection exception while trying to forward request to master node [{}], scheduling a retry. Error: [{}]",
nodes.masterNode(), exp.getDetailedMessage());
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
innerExecute(request, listener, observer, false);
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new MasterNotDiscoveredException());
}
}, new ClusterStateObserver.EventPredicate() {
@Override
public boolean apply(ClusterChangedEvent event) {
return event.nodesDelta().masterNodeChanged();
}
}
);
} else {
listener.onFailure(exp);
}
}
});
}
}
}
这个操作主要保证了两点:
(1)如果当前节点不是master,则将请求发送到master节点执行masterOperation()方法
(2)如果当前集群block了,则等待集群状态更新,然后重新执行完整的innerExecute()方法
然后进入到TransportCreateIndexAction.masterOperation()方法中,创建CreateIndexClusterStateUpdateRequest对象,用来创建索引时更新集群状态信息的请求,其中settings和mappings及aliases默认为空集合
public class TransportCreateIndexAction extends TransportMasterNodeOperationAction<CreateIndexRequest, CreateIndexResponse> {
private final MetaDataCreateIndexService createIndexService;
@Override
protected void masterOperation(final CreateIndexRequest request, final ClusterState state, final ActionListener<CreateIndexResponse> listener) throws ElasticsearchException {
String cause = request.cause();
if (cause.length() == 0) {
cause = "api";
}
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases()).customs(request.customs());
// 执行创建索引
createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new CreateIndexResponse(response.isAcknowledged()));
}
@Override
public void onFailure(Throwable t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create", t, request.index());
} else {
logger.debug("[{}] failed to create", t, request.index());
}
listener.onFailure(t);
}
});
}
}
然后调用MetaDataCreateIndexService的createIndex()方法,如果能获取到锁信息则直接执行重载的createIndex()方法,否则交给线程池去执行
public class MetaDataCreateIndexService extends AbstractComponent {
public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index());
if (mdLock.tryAcquire()) {
createIndex(request, listener, mdLock);
return;
}
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new ActionRunnable(listener) {
@Override
public void doRun() throws InterruptedException {
if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) {
listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock"));
return;
}
createIndex(request, listener, mdLock);
}
});
}
}
在重载从createIndex()方法中,通过提交一个更新集群状态的任务来实现创建索引的具体逻辑
public class MetaDataCreateIndexService extends AbstractComponent {
private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener, final Semaphore mdLock) {
ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX);
request.settings(updatedSettingsBuilder.build());
clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]",
Priority.URGENT,
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
mdLock.release();
super.onAllNodesAcked(t);
}
@Override
public void onAckTimeout() {
mdLock.release();
super.onAckTimeout();
}
@Override
public void onFailure(String source, Throwable t) {
mdLock.release();
super.onFailure(source, t);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// 创建索引的具体逻辑
// ...
}
});
}
}
提交StateUpdateTask任务时,会创建一个UpdateTask对象,然后执行其run()方法,即MetaDataCreateIndexService中创建的AckedClusterStateUpdateTask匿名对象
public class InternalClusterService extends AbstractLifecycleComponent<ClusterService> implements ClusterService {
public void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask) {
if (!lifecycle.started()) {
return;
}
try {
final UpdateTask task = new UpdateTask(source, priority, updateTask);
if (updateTask instanceof TimeoutClusterStateUpdateTask) {
final TimeoutClusterStateUpdateTask timeoutUpdateTask = (TimeoutClusterStateUpdateTask) updateTask;
updateTasksExecutor.execute(task, threadPool.scheduler(), timeoutUpdateTask.timeout(), new Runnable() {
@Override
public void run() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
timeoutUpdateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source()));
}
});
}
});
} else {
updateTasksExecutor.execute(task);
}
} catch (EsRejectedExecutionException e) {
if (!lifecycle.stoppedOrClosed()) {
throw e;
}
}
}
}
在UpdateTask的run()方法中,会调用ClusterStateUpdateTask.execute()方法获取新的集群状态,
class UpdateTask extends TimedPrioritizedRunnable {
public final ClusterStateUpdateTask updateTask;
UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
super(priority, source);
this.updateTask = updateTask;
}
@Override
public void run() {
if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster_service not started", source);
return;
}
logger.debug("processing [{}]: execute", source);
ClusterState previousClusterState = clusterState;
// 当前节点是否为master
if (!previousClusterState.nodes().localNodeMaster() && updateTask.runOnlyOnMaster()) {
logger.debug("failing [{}]: local node is no longer master", source);
updateTask.onNoLongerMaster(source);
return;
}
// 新的集群状态
ClusterState newClusterState;
long startTimeNS = System.nanoTime();
try {
// 调用task的execute方法,获取新的集群状态
newClusterState = updateTask.execute(previousClusterState);
} catch (Throwable e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("failed to execute cluster state update in ").append(executionTime).append(", state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
sb.append(previousClusterState.nodes().prettyPrint());
sb.append(previousClusterState.routingTable().prettyPrint());
sb.append(previousClusterState.readOnlyRoutingNodes().prettyPrint());
logger.trace(sb.toString(), e);
}
warnAboutSlowTaskIfNeeded(executionTime, source);
updateTask.onFailure(source, e);
return;
}
// 集群状态没有发生更改
if (previousClusterState == newClusterState) {
if (updateTask instanceof AckedClusterStateUpdateTask) {
//no need to wait for ack if nothing changed, the update can be counted as acknowledged
((AckedClusterStateUpdateTask) updateTask).onAllNodesAcked(null);
}
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
}
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
logger.debug("processing [{}]: took {} no change in cluster_state", source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, source);
return;
}
try {
Discovery.AckListener ackListener = new NoOpAckListener();
// 当前节点是master
if (newClusterState.nodes().localNodeMaster()) {
// only the master controls the version numbers
Builder builder = ClusterState.builder(newClusterState).version(newClusterState.version() + 1);
// 重新构建routing table
if (previousClusterState.routingTable() != newClusterState.routingTable()) {
builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1));
}
// 重新构建meta data
if (previousClusterState.metaData() != newClusterState.metaData()) {
builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));
}
newClusterState = builder.build();
if (updateTask instanceof AckedClusterStateUpdateTask) {
final AckedClusterStateUpdateTask ackedUpdateTask = (AckedClusterStateUpdateTask) updateTask;
if (ackedUpdateTask.ackTimeout() == null || ackedUpdateTask.ackTimeout().millis() == 0) {
ackedUpdateTask.onAckTimeout();
} else {
try {
ackListener = new AckCountDownListener(ackedUpdateTask, newClusterState.version(), newClusterState.nodes(), threadPool);
} catch (EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex);
}
//timeout straightaway, otherwise we could wait forever as the timeout thread has not started
ackedUpdateTask.onAckTimeout();
}
}
}
}
newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED);
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("cluster state updated, source [").append(source).append("]\n");
sb.append(newClusterState.prettyPrint());
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source);
}
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String summary = nodesDelta.shortSummary();
if (summary.length() > 0) {
logger.info("{}, reason: {}", summary, source);
}
}
// TODO, do this in parallel (and wait)
for (DiscoveryNode node : nodesDelta.addedNodes()) {
if (!nodeRequiresConnection(node)) {
continue;
}
try {
transportService.connectToNode(node);
} catch (Throwable e) {
// the fault detection will detect it as failed as well
logger.warn("failed to connect to node [" + node + "]", e);
}
}
// if we are the master, publish the new state to all nodes
// we publish here before we send a notification to all the listeners, since if it fails
// we don't want to notify
if (newClusterState.nodes().localNodeMaster()) {
logger.debug("publishing cluster state version {}", newClusterState.version());
discoveryService.publish(newClusterState, ackListener);
}
// update the current cluster state
// 更新集群的state
clusterState = newClusterState;
logger.debug("set local cluster state to version {}", newClusterState.version());
for (ClusterStateListener listener : preAppliedListeners) {
try {
listener.clusterChanged(clusterChangedEvent);
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
}
}
for (DiscoveryNode node : nodesDelta.removedNodes()) {
try {
transportService.disconnectFromNode(node);
} catch (Throwable e) {
logger.warn("failed to disconnect to node [" + node + "]", e);
}
}
newClusterState.status(ClusterState.ClusterStateStatus.APPLIED);
for (ClusterStateListener listener : postAppliedListeners) {
try {
listener.clusterChanged(clusterChangedEvent);
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
}
}
//manual ack only from the master at the end of the publish
if (newClusterState.nodes().localNodeMaster()) {
try {
ackListener.onNodeAck(newClusterState.nodes().localNode(), null);
} catch (Throwable t) {
logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode());
}
}
// 调用task的clusterStateProcessed()方法
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
}
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {})", source, executionTime, newClusterState.version());
warnAboutSlowTaskIfNeeded(executionTime, source);
} catch (Throwable t) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
StringBuilder sb = new StringBuilder("failed to apply updated cluster state in ").append(executionTime).append(":\nversion [").append("], source [").append(source).append("]\n");
sb.append(newClusterState.nodes().prettyPrint());
sb.append(newClusterState.routingTable().prettyPrint());
sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint());
logger.warn(sb.toString(), t);
// TODO: do we want to call updateTask.onFailure here?
}
}
}
在完成索引创建完成后,集群状态信息会发生变化,elasticsearch会将这个变化发布到其他节点,以维持集群统一的状态信息
网友评论