写入流程
1.名词解释
预处理节点:数据前置处理转换的节点,支持 pipeline 设置,对数据进行过滤、转换等操作
协调节点:主要作用于请求转发,请求响应处理等轻量级操作
2.整体流程图
image红色:协调节点
绿色:主分片节点
蓝色:副分片节点
2.1 协调节点流程
协调节点的流程代码在TransportBulkAction类里
2.1.1 参数检查
校验index、type、source、contentType不为空,参数检查遇到异常会拒绝当前请求
2.1.2 处理pipeline请求
请求中指定了pipeline参数,则先使用相应的pipeline进行处理
2.1.3 自动创建索引
具体步骤
- 收集请求中所有索引
- 创建不存在的索引
- 将创建索引请求发送到Master节点
- Master节点返回创建成功或失败的信息
- 对失败的响应进行标记,对成功的响应执行写流程
相关源码
// 在bulk操作执行之前,尝试创建所有我们需要的索引
// Step 1: 收集请求中的所有索引
final Set<String> indices = bulkRequest.requests.stream()
//删除不存在的索引
.filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
|| request.versionType() == VersionType.EXTERNAL
|| request.versionType() == VersionType.EXTERNAL_GTE)
.map(DocWriteRequest::index)
.collect(Collectors.toSet());
/* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create
* that we'll use when we try to run the requests. */
筛选出我们可以创建的不存在的索引。同时创建一个map来存放我们不能创建的索引
final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
Set<String> autoCreateIndices = new HashSet<>();
ClusterState state = clusterService.state();
for (String index : indices) {
boolean shouldAutoCreate;
try {
shouldAutoCreate = shouldAutoCreate(index, state);
} catch (IndexNotFoundException e) {
shouldAutoCreate = false;
indicesThatCannotBeCreated.put(index, e);
}
if (shouldAutoCreate) {
autoCreateIndices.add(index);
}
}
// Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
if (autoCreateIndices.isEmpty()) {
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
} else {
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
//遍历所有需要创建的索引
for (String index : autoCreateIndices) {
//发送创建索引请求
createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
@Override
//收到执行成功响应
public void onResponse(CreateIndexResponse result) {
//将计数器递减,计数器的值为需要创建的索引数量
if (counter.decrementAndGet() == 0) {
//全部创建完毕时执行
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
}
}
@Override
//收到失败的响应
public void onFailure(Exception e) {
if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) {
//将创建失败索引对应的请求置空
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest request = bulkRequest.requests.get(i);
if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
bulkRequest.requests.set(i, null);
}
}
}
if (counter.decrementAndGet() == 0) {
executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
inner.addSuppressed(e);
listener.onFailure(inner);
}), responses, indicesThatCannotBeCreated);
}
}
});
}
}
2.1.4 对请求的预先处理
该部分实现在TransportBulkAction.BulkOperation.doRun里
检查请求参数、自动生成doc的Id(如果指定Id的话,就不用自动生成)
2.1.5 检测集群状态
协调节点在处理前会检测集群状态,若集群异常会阻塞等待Master节点直至超时(阻塞操作)
private boolean handleBlockExceptions(ClusterState state) {
ClusterBlockException blockException = state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked, scheduling a retry", blockException);
retry(blockException);
} else {
onFailure(blockException);
}
return true;
}
return false;
}
2.1.6 请求合并
对所有请求进行分析,如果这些写入操作的文档的主分片都属于同一个,那么就把这些请求合并为1个。
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest request = bulkRequest.requests.get(i);
if (request == null) {
continue;
}
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, request));
}
2.1.7 路由
根据路由算法,确定主分片所在节点
2.1.8 转发请求并等待响应
根据上边路由确定的主分片所在节点,将请求进行转发并等待返回结果,收到所有返回结果之后,再转发给客户端。
protected void doRun() {
setPhase(task, "routing");
final ClusterState state = observer.setAndGetObservedState();
if (handleBlockExceptions(state)) {
return;
}
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
final String concreteIndex = concreteIndex(state);
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
if (indexMetaData == null) {
retry(new IndexNotFoundException(concreteIndex));
return;
}
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
throw new IndexClosedException(indexMetaData.getIndex());
}
// resolve all derived request fields, so we can route and apply it
resolveRequest(indexMetaData, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest";
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
//获取主分片所在节点
final ShardRouting primary = primary(state);
if (retryIfUnavailable(state, primary)) {
return;
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
//如果主分片在本节点,则在本地执行,否则转发出去
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
performLocalAction(state, primary, node, indexMetaData);
} else {
performRemoteAction(state, primary, node);
}
}
2.2 主分片节点流程
代码入口TransportReplicationAction.PrimaryOperationTransportHandler#messageReceived,然后进入AsyncPrimaryAction#doRun方法
protected class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
public PrimaryOperationTransportHandler() {
}
@Override
public void messageReceived(final ConcreteShardRequest<Request> request, final TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required for this operation");
}
@Override
public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, channel, (ReplicationTask) task).run();
}
}
2.2.1. 检查请求
AsyncPrimaryAction#doRun方法中会调用acquirePrimaryShardReference方法。
该方法会对请求进行校验
- 当前是否为主分片
- allocationId是否是预期值
- PrimaryTerm是否是预期值
IndexShard indexShard = getIndexShard(shardId);
//当前是否为主分片
if (indexShard.routingEntry().primary() == false) {
throw new ReplicationOperation.RetryOnPrimaryException(indexShard.shardId(),
"actual shard is not a primary " + indexShard.routingEntry());
}
final String actualAllocationId = indexShard.routingEntry().allocationId().getId();
//allocationId是否是预期值
if (actualAllocationId.equals(allocationId) == false) {
throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
}
final long actualTerm = indexShard.getPendingPrimaryTerm();
//PrimaryTerm是否是预期值
if (actualTerm != primaryTerm) {
throw new ShardNotFoundException(shardId, "expected aID [{}] with term [{}] but found [{}]", allocationId,
primaryTerm, actualTerm);
}
2.2.2 是否延迟执行
判断请求是否需要延迟执行,如果需要延迟则放入队列,否则继续。
private void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution,
final Object debugInfo, final StackTraceElement[] stackTrace) {
//节点关闭直接返回
if (closed) {
onAcquired.onFailure(new IndexShardClosedException(shardId));
return;
}
final Releasable releasable;
try {
synchronized (this) {
if (queuedBlockOperations > 0) {
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
final ActionListener<Releasable> wrappedListener;
if (executorOnDelay != null) {
wrappedListener =
new PermitAwareThreadedActionListener(threadPool, executorOnDelay,
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution);
} else {
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired);
}
//放入延迟队列
delayedOperations.add(new DelayedOperation(wrappedListener, debugInfo, stackTrace));
return;
} else {
releasable = acquire(debugInfo, stackTrace);
}
}
} catch (final InterruptedException e) {
onAcquired.onFailure(e);
return;
}
//调用AsyncPrimaryAction的onResponse方法
onAcquired.onResponse(releasable);
}
2.2.3 判断主分片是否已经发生迁移
如果已经迁移:
- 将phase状态设为“primary_delegation”
- 关闭当前分片的primaryShardReference,及时释放资源
- 获取已经迁移到的目标节点,将请求转发到该节点,并等待执行结果
- 拿到结果后,将task状态更新为“finish”。
//已经迁移
if (primaryShardReference.isRelocated()) {
//关闭当前分片的primaryShardReference,及时释放资源
primaryShardReference.close();
//将phase状态设为“primary_delegation”
setPhase(replicationTask, "primary_delegation");
// delegate primary phase to relocation target
// it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
// phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
final ShardRouting primary = primaryShardReference.routingEntry();
assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
//获取已经迁移到的目标节点
DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
//将请求转发到该节点,并等待执行结果
transportService.sendRequest(relocatingNode, transportPrimaryAction,
new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm),
transportOptions,
new TransportChannelResponseHandler<Response>(logger, channel, "rerouting indexing to target primary " + primary,
TransportReplicationAction.this::newResponseInstance) {
@Override
public void handleResponse(Response response) {
setPhase(replicationTask, "finished");
super.handleResponse(response);
}
@Override
//拿到结果后,将task状态更新为“finish”。
public void handleException(TransportException exp) {
setPhase(replicationTask, "finished");
super.handleException(exp);
}
});
}
else { //没有迁移
// 将task状态更新为“primary”
setPhase(replicationTask, "primary");
final ActionListener<Response> listener = createResponseListener(primaryShardReference);
//转发请求给副本分片
createReplicatedOperation(request,
ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
primaryShardReference)
.execute(); //ReplicationOperation类的execute方法
}
如果没有迁移:
- 将task状态更新为“primary”
- 主分片准备操作(主要部分)
- 转发请求给副本分片
2.2.4 开始写入主分片(关键步骤)
primary所在的node收到协调节点发过来的写入请求后,开始正式执行写入的逻辑,写入执行的入口是在ReplicationOperation类的execute方法
public void execute() throws Exception {
.......
//关键,这里开始执行写主分片
primaryResult = primary.perform(request);
.......
}
perform方法:
@Override
public PrimaryResult perform(Request request) throws Exception {
PrimaryResult result = shardOperationOnPrimary(request, indexShard);
assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest()
+ "] with a primary failure [" + result.finalFailure + "]";
return result;
}
最终会调用InternalEngine#index方法,写数据,先写Lucene(前文提到过的内存缓冲区)再写translog,这么做的目的是写入Lucene时,Lucene会再对数据进行一些检查,有可能出现写入Lucene失败的情况。如果先写translog,那么就要处理写入translog成功但是写入Lucene一直失败的问题,所以ES采用了先写Lucene的方式。
public IndexResult index(Index index) throws IOException {
.......
final IndexResult indexResult;
if (plan.earlyResultOnPreFlightError.isPresent()) {
indexResult = plan.earlyResultOnPreFlightError.get();
assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
} else if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
// 将数据写入lucene,最终会调用lucene的文档写入接口
indexResult = indexIntoLucene(index, plan);
} else {
indexResult = new IndexResult(
plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
}
if (index.origin().isFromTranslog() == false) {
final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
// 写入translog
location = translog.add(new Translog.Index(index, indexResult)); //写translog
......
indexResult.setTranslogLocation(location);
}
.......
}
2.2.5 开始写副分片
public void execute() throws Exception {
.......
//关键,这里开始执行写主分片
primaryResult = primary.perform(request);
.......
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
........
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
// 关键步骤,写完primary后这里转发请求到replicas
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
}
successfulShards.incrementAndGet(); // mark primary as successful
decPendingAndFinishIfNeeded();
}
在写完primary后,会继续写replicas,接下来需要将请求转发到从节点上,如果replica shard未分配,则直接忽略;如果replica shard正在搬迁数据到其他节点,则将请求转发到搬迁的目标shard上,否则,转发到replica shard。replicaRequest是在写入主分片后,从primaryResult中获取,并非原始Request。
private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
final ReplicationGroup replicationGroup) {
// for total stats, add number of unassigned shards and
// number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
totalShards.addAndGet(replicationGroup.getSkippedShards().size());
final ShardRouting primaryRouting = primary.routingEntry();
for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
if (shard.isSameAllocation(primaryRouting) == false) {
performOnReplica(shard, replicaRequest, globalCheckpoint);
}
}
}
performOnReplica方法会将请求转发到目标节点,如果出现异常,如端节点挂掉、shard写入失败等,对于这些异常,primary认为该replica shard发生故障不可用,将会向master汇报并移除该replica
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest, final long globalCheckpoint) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
}
totalShards.incrementAndGet();
pendingActions.incrementAndGet();
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, new ActionListener<ReplicaResponse>() {
@Override
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
} catch (final AlreadyClosedException e) {
// okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
// fail the primary but fall through and let the rest of operation processing complete
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
primary.failShard(message, e);
}
decPendingAndFinishIfNeeded();
}
@Override
public void onFailure(Exception replicaException) {
logger.trace(() -> new ParameterizedMessage(
"[{}] failure while performing [{}] on replica {}, request [{}]",
shard.shardId(), opType, shard, replicaRequest), replicaException);
// Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
if (TransportActions.isShardNotAvailableException(replicaException) == false) {
RestStatus restStatus = ExceptionsHelper.status(replicaException);
shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
}
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
//处理副分片写入失败
replicasProxy.failShardIfNeeded(shard, message,
replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
}
});
}
2.3 副分片流程
执行与主分片基本相同的流程
2.4 检查点
上述写入完成之后,再返回客户端结果之前,会执行GlobalCheckpointSyncAction全局检查点的操作,该部分模块里边有一段很重要的代码,会影响ES的写入
@Override
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
final Request request, final IndexShard indexShard) throws Exception {
maybeSyncTranslog(indexShard);
return new PrimaryResult<>(request, new ReplicationResponse());
}
上面代码中有个maybeSyncTranslog方法,该方法会判断要不要同步对Translog执行flush操作,也就是把translog中的日志文件纪录的索引数据,写到磁盘中,而且是同步写入,该方法源码如下:
private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST &&
indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
indexShard.sync();
}
}
在ES的默认配置中,为了保证数据的可靠性,每一次对translog的写入,都同步刷到磁盘中,保证数据不丢失。默认配置如下:
index.translog.durability:request
也就是上边代码中的Translog.Durability.REQUEST,但是这种配置写入性能非常的差,所以ES提供了额外的定时异步写入的方式,下边是参考的配置项
index.translog.durability:async
index.translog.sync_interval:120s
至此写入流程分析完毕
3 调优建议
- 副分片写入过程需要重新建立索引,所以每个主分片的副分片数量不宜太多
- ES的默认配置有许多影响性能的坑在里边,比如refresh刷新频率、translog随请求同步刷磁盘,建议如果降低refresh的刷新频率,并将translog改成异步写入磁盘
- 由于ES利用了系统缓存区来临时存放segment数据,所以该部分区域的内存占比,不宜太小
- 尽量用批量写入代替单条数据写入
- 由于ES对磁盘的利用率很大,有条件的话建议使用固态硬盘
网友评论