1. 写lucene索引入口
在上篇文章中主要讲述了elasticsearch索引的创建过程,即CreateIndexAction.execute()方法的执行过程。在完成索引创建时,会调用listener.onResponse()方法,回调TransportIndexAction.innerExecute()方法写入索引数据
public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
@Override
protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(request);
createIndexRequest.index(request.index());
createIndexRequest.mapping(request.type());
createIndexRequest.cause("auto(index api)");
createIndexRequest.masterNodeTimeout(request.timeout());
createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(request, listener);
}
@Override
public void onFailure(Throwable e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
try {
innerExecute(request, listener);
} catch (Throwable e1) {
listener.onFailure(e1);
}
} else {
listener.onFailure(e);
}
}
});
} else {
innerExecute(request, listener);
}
}
private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
super.doExecute(request, listener); // TransportShardReplicationOperationAction.doExecute()
}
}
2. 获取索引对应的主分片
由于TransportIndexAction继承TransportShardReplicationOperationAction,因此会调用TransportIndexAction的doExecute()方法,具体的逻辑由PrimaryPhase.doRun()实现
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
new PrimaryPhase(request, listener).run();
}
final class PrimaryPhase extends AbstractRunnable {
private final ActionListener<Response> listener;
private final InternalRequest internalRequest;
private final ClusterStateObserver observer;
private final AtomicBoolean finished = new AtomicBoolean(false);
private volatile Releasable indexShardReference;
PrimaryPhase(Request request, ActionListener<Response> listener) {
this.internalRequest = new InternalRequest(request);
this.listener = listener;
this.observer = new ClusterStateObserver(clusterService, internalRequest.request().timeout(), logger);
}
@Override
public void onFailure(Throwable e) {
finishWithUnexpectedFailure(e);
}
protected void doRun() {
if (checkBlocks() == false) {
return;
}
final ShardIterator shardIt = shards(observer.observedState(), internalRequest);
final ShardRouting primary = resolvePrimary(shardIt);
if (primary == null) {
retryBecauseUnavailable(shardIt.shardId(), "No active shards.");
return;
}
if (primary.active() == false) {
logger.trace("primary shard [{}] is not yet active, scheduling a retry.", primary.shardId());
retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node.");
return;
}
if (observer.observedState().nodes().nodeExists(primary.currentNodeId()) == false) {
logger.trace("primary shard [{}] is assigned to anode we do not know the node, scheduling a retry.", primary.shardId(), primary.currentNodeId());
retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node.");
return;
}
routeRequestOrPerformLocally(primary, shardIt);
}
}
}
3. 获取索引分片逻辑
在上面操作中,主要是查找请求写入的索引数据对应的所有shard,然后筛选出主分片primary shard,根据请求信息获取shard的逻辑如下:
public class PlainOperationRouting extends AbstractComponent implements OperationRouting {
@Inject
public PlainOperationRouting(Settings indexSettings, HashFunction hashFunction,
AwarenessAllocationDecider awarenessAllocationDecider) {
this.useType = indexSettings.getAsBoolean("cluster.routing.operation.use_type", false);
}
private int shardId(ClusterState clusterState, String index, String type, @Nullable String id, @Nullable String routing) {
if (routing == null) {
if (!useType) {
return Math.abs(hash(id) % indexMetaData(clusterState, index).numberOfShards());
} else {
return Math.abs(hash(type, id) % indexMetaData(clusterState, index).numberOfShards());
}
}
return Math.abs(hash(routing) % indexMetaData(clusterState, index).numberOfShards());
}
(1)如果请求中没有设置routing,分两种情况
① 如果配置文件中没有设置cluster.routing.operation.use_type参数或者设置为false,则shard id为对索引id进行DJB哈希,取绝对值后对索引总分片数求余
dbj_hash(id) % number_of_shards
②如果配置cluster.routing.operation.use_type参数为true,则shard id为对索引的type和id进行DJB哈希,取绝对值后对索引总分片数求余
(2)如果配置了routing信息,则shard id为直接对routing进行DJB哈希,取绝对值后对索引总分片数求余
4. 发送写入索引请求
获取到Primary Shard后通过routeRequestOrPerformLocally()方法,将请求发送到shard对应的节点上执行performOnPrimary()方法,如果shard的节点就在当前node上,那么直接执行performOnPrimary()
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
protected TransportShardReplicationOperationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters) {
transportService.registerHandler(actionName, new OperationTransportHandler());
}
protected void routeRequestOrPerformLocally(final ShardRouting primary, final ShardIterator shardsIt) {
if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
try {
if (internalRequest.request().operationThreaded()) {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
finishAsFailed(t);
}
@Override
protected void doRun() throws Exception {
performOnPrimary(primary, shardsIt);
}
});
} else {
performOnPrimary(primary, shardsIt);
}
} catch (Throwable t) {
finishAsFailed(t);
}
} else {
DiscoveryNode node = observer.observedState().nodes().get(primary.currentNodeId());
transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponseInstance();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(Response response) {
finishOnRemoteSuccess(response);
}
@Override
public void handleException(TransportException exp) {
try {
if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
retryPrimaryException(exp)) {
internalRequest.request().setCanHaveDuplicates();
logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
retry(exp);
} else {
finishAsFailed(exp);
}
} catch (Throwable t) {
finishWithUnexpectedFailure(t);
}
}
});
}
}
}
通过构造方法,我们可以看到elasticsearch会将这个action注册到OperationTransportHandler对象上。这个handler接收消息后,重新执行doExecute()方法
class OperationTransportHandler extends BaseTransportRequestHandler<Request> {
@Override
public Request newInstance() {
return newRequestInstance();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
request.listenerThreaded(false);
request.operationThreaded(true);
execute(request, new ActionListener<Response>() {
@Override
public void onResponse(Response result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Throwable e1) {
logger.warn("Failed to send response for " + actionName, e1);
}
}
});
}
}
最终都是通过performOnPrimary()方法来执行写入索引操作
- 在主分片上写数据之前,会通过checkWriteConsistency()方法检查在写入一致性设置前提下是否可写。如果可写,先创建IndexShard的引用对象,然后创建操作主分片的PrimaryOperation对象,最终通过shardOperationOnPrimary在primary shard上写入索引。写入完成后,创建replication phase对象,通过finishAndMoveToReplication在replica shard上写入索引。
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
void performOnPrimary(final ShardRouting primary, final ShardIterator shardsIt) {
// 检查在写入一致性设置前提下是否可写
final String writeConsistencyFailure = checkWriteConsistency(primary);
if (writeConsistencyFailure != null) {
retryBecauseUnavailable(primary.shardId(), writeConsistencyFailure);
return;
}
final ReplicationPhase replicationPhase;
try {
// 获取index shard的引用
indexShardReference = getIndexShardOperationsCounter(primary.shardId());
// 构建primary operation请求
PrimaryOperationRequest por = new PrimaryOperationRequest(primary.id(), internalRequest.concreteIndex(), internalRequest.request());
// 执行shard上的索引操作, TransportDeleteAction TransportIndexAction TransportShardBulkAction TransportShardDeleteAction TransportShardDeleteByQueryAction
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(observer.observedState(), por);
logger.trace("operation completed on primary [{}]", primary);
// 构建replication阶段
replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener, indexShardReference);
} catch (Throwable e) {
// ...
}
// 将primary shard上的数据拷贝到replication上
finishAndMoveToReplication(replicationPhase);
}
}
在shardOperationOnPrimary()方法中,先通过SourceToParse将请求的信息转化为包含source、type、id、routing、parent、timestamp和 ttl 的对象,然后通过请求中的opType判断是Index还是Create操作,默认为Index,最终通过indexShard.index(index)或者indexShard.create(create)写入lucene索引
public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
@Override
protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
final IndexRequest request = shardRequest.request;
// validate, if routing is required, that we got routing
// index meta data
IndexMetaData indexMetaData = clusterState.metaData().index(shardRequest.shardId.getIndex());
MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
if (mappingMd != null && mappingMd.routing().required()) {
if (request.routing() == null) {
throw new RoutingMissingException(shardRequest.shardId.getIndex(), request.type(), request.id());
}
}
// index service
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
// index shard
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
// request转化为source type id routing parent timestamp ttl
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
long version;
boolean created;
try {
Engine.IndexingOperation op;
// 如果是 index request
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
if (index.parsedDoc().mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID());
}
// 执行index请求
indexShard.index(index);
version = index.version();
op = index;
created = index.created();
} else {
// 如果是 create request
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
if (create.parsedDoc().mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID());
}
// 执行create请求
indexShard.create(create);
version = create.version();
op = create;
created = true;
}
// _refresh 参数
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_index");
} catch (Throwable e) {
// ignore
}
}
// update the version on the request, so it will be used for the replicas
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
// 返回响应
IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created);
return new Tuple<>(response, shardRequest.request);
} catch (WriteFailureException e) {
if (e.getMappingTypeToUpdate() != null) {
DocumentMapper docMapper = indexService.mapperService().documentMapper(e.getMappingTypeToUpdate());
if (docMapper != null) {
mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID());
}
}
throw e.getCause();
}
}
}
在IndexShard的index方法中,先判断分片是否可写,判断方法主要是通过分片状态来判断,对于主分片来说,如果不是STARTED和RELOCATED,那么就不允许写入分片。若分片可写,就通过内部引擎InternalEngine对象来操作lucene索引
public class IndexShard extends AbstractIndexShardComponent {
public ParsedDocument index(Engine.Index index) throws ElasticsearchException {
// 判断分片可写
writeAllowed(index.origin());
index = indexingService.preIndex(index);
try {
if (logger.isTraceEnabled()) {
logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
}
engine().index(index);
index.endTime(System.nanoTime());
} catch (RuntimeException ex) {
indexingService.failedIndex(index);
throw ex;
}
indexingService.postIndex(index);
return index.parsedDoc();
}
}
6. 操作lucene索引
(1)写入lucene索引主要是通过lucene的IndexWriter对象来操作,在写入lucene前需要先更新待写入文档的version。
(2)更新version的方式有四种,分别是EXTERNAL、EXTERNAL_GTE、FORCE和INTERNAL,前三种都是将index请求的version当中文档的version,第四种是将当前的version+1当作文档的version。
(3)先从内存缓存versionMap中获取当前文档的version,如果没有获取到,则从lucene的reader中获取version,否则判断当前版本是否被GC删除,如果没有删除,那么就当作当前版本
(4)在写入lucene前,通过是否存在当前文档版本来决定是调用addDocuments添加索引还是调用updateDocument()更新文档
public class InternalEngine extends Engine {
@Override
public void index(Index index) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (index.origin() == Operation.Origin.RECOVERY) {
// Don't throttle recovery operations
innerIndex(index);
} else {
try (Releasable r = throttle.acquireThrottle()) {
innerIndex(index);
}
}
} catch (OutOfMemoryError | IllegalStateException | IOException t) {
maybeFailEngine("index", t);
throw new IndexFailedEngineException(shardId, index, t);
}
checkVersionMapRefresh();
}
private void innerIndex(Index index) throws IOException {
synchronized (dirtyLock(index.uid())) {
// 获取currentVersion
final long currentVersion;
VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(index.uid());
} else {
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
currentVersion = versionValue.version();
}
}
// 获取更新后的Version
long updatedVersion;
long expectedVersion = index.version();
if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
if (index.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
}
}
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.updateVersion(updatedVersion);
// 当前不存在文档版本, 则为create
if (currentVersion == Versions.NOT_FOUND) {
// document does not exists, we can optimize for create
index.created(true);
if (index.docs().size() > 1) {
indexWriter.addDocuments(index.docs(), index.analyzer());
} else {
indexWriter.addDocument(index.docs().get(0), index.analyzer());
}
} else {
// 已经存在文档版本, 则update
if (versionValue != null) {
index.created(versionValue.delete()); // we have a delete which is not GC'ed...
}
if (index.docs().size() > 1) {
indexWriter.updateDocuments(index.uid(), index.docs(), index.analyzer());
} else {
indexWriter.updateDocument(index.uid(), index.docs().get(0), index.analyzer());
}
}
// 增加translog
Translog.Location translogLocation = translog.add(new Translog.Index(index));
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
indexingService.postIndexUnderLock(index);
}
}
}
至此完成了在主分片上创建lucene索引,完成后将进入到下一阶段,执行TransportShardReplicationOperationAction.PrimaryPhase.finishAndMoveToReplication()在分片上创建索引
7. 开始写副本
在操作shard之前,会判断shard是主分片还是副本。如果是primary shard, 那么只在新shard上拷贝数据,否则在所有的replication上拷贝数据
在performOnReplica()方法中会判断副本所在的shard是否是当前节点,如果不是,则需要将请求发送到对应的节点执行,action为transportReplicaAction对象,会被注册到ReplicaOperationTransportHandler上。
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
protected TransportShardReplicationOperationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters) {
transportService.registerHandler(transportReplicaAction, new ReplicaOperationTransportHandler());
}
final class PrimaryPhase extends AbstractRunnable {
void finishAndMoveToReplication(ReplicationPhase replicationPhase) {
if (finished.compareAndSet(false, true)) {
replicationPhase.run();
} else {
assert false : "finishAndMoveToReplication called but operation is already finished";
}
}
}
final class ReplicationPhase extends AbstractRunnable {
@Override
protected void doRun() {
if (pending.get() == 0) {
doFinish();
return;
}
ShardRouting shard;
shardIt.reset();
while ((shard = shardIt.nextOrNull()) != null) {
if (shard.unassigned()) {
continue;
}
if (shard.primary()) {
if (originalPrimaryShard.currentNodeId().equals(shard.currentNodeId()) == false) {
performOnReplica(shard, shard.currentNodeId());
}
if (shard.relocating()) {
performOnReplica(shard, shard.relocatingNodeId());
}
} else if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings()) == false) {
// index.shadow_replicas = false
performOnReplica(shard, shard.currentNodeId());
if (shard.relocating()) {
performOnReplica(shard, shard.relocatingNodeId());
}
}
}
}
void performOnReplica(final ShardRouting shard, final String nodeId) {
if (!observer.observedState().nodes().nodeExists(nodeId)) {
onReplicaFailure(nodeId, null);
return;
}
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), replicaRequest);
if (!nodeId.equals(observer.observedState().nodes().localNodeId())) {
final DiscoveryNode node = observer.observedState().nodes().get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest,
transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty vResponse) {
onReplicaSuccess();
}
@Override
public void handleException(TransportException exp) {
// ...
}
});
} else {
// 提交给线程池操作
if (replicaRequest.operationThreaded()) {
try {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
protected void doRun() {
try {
shardOperationOnReplica(shardRequest);
onReplicaSuccess();
} catch (Throwable e) {
onReplicaFailure(nodeId, e);
failReplicaIfNeeded(shard.index(), shard.id(), e);
}
}
@Override
public boolean isForceExecution() {
return true;
}
@Override
public void onFailure(Throwable t) {
onReplicaFailure(nodeId, t);
}
});
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
onReplicaFailure(nodeId, e);
}
} else {
try {
shardOperationOnReplica(shardRequest);
onReplicaSuccess();
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
onReplicaFailure(nodeId, e);
}
}
}
}
}
}
ReplicaOperationTransportHandler在接收到消息后,依然调用shardOperationOnReplica()方法来写入副本数据
class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {
@Override
public ReplicaOperationRequest newInstance() {
return new ReplicaOperationRequest();
}
@Override
public String executor() {
return executor;
}
@Override
public boolean isForceExecution() {
return true;
}
@Override
public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
try (Releasable shardReference = getIndexShardOperationsCounter(request.shardId)) {
shardOperationOnReplica(request);
} catch (Throwable t) {
failReplicaIfNeeded(request.shardId.getIndex(), request.shardId.id(), t);
throw t;
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
从shardOperationOnReplica()可以看出写入副本的操作和写入主分片的操作类似,都是先将请求信息解析成SourceToParse,然后在通过IndexShard在分片上写入lucene索引。
public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
// index shard
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
// request
IndexRequest request = shardRequest.request;
// source
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates());
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
indexShard.create(create);
}
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_index");
} catch (Exception e) {
// ignore
}
}
}
}
网友评论