处理Get请求的Action为TransportGetAction
,首先可以参考一下TransportAction类继承层次, TransportGetAction
继承自TransportSingleShardAction
,其继承层次如下图所示:
在TransportAction类继承层次介绍过TransportSingleShardAction
子类主要处理那些只涉及单个shard的操作,如果发生错误之后可以继续到其他副本上操作。
下面我们直接看TransportSingleShardAction.doExecute
函数
//TransportSingleShardAction
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
new AsyncSingleAction(request, listener).start();
}
//TransportSingleShardAction.AsyncSingleAction
private AsyncSingleAction(Request request, ActionListener<Response> listener) {
this.listener = listener;
ClusterState clusterState = clusterService.state();
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version());
}
nodes = clusterState.nodes();
ClusterBlockException blockException = checkGlobalBlock(clusterState);
if (blockException != null) {
throw blockException;
}
String concreteSingleIndex;
//resolveIndex在子类TransportGetAction没有任务操作,直接返回true
if (resolveIndex(request)) {
concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();
} else {
concreteSingleIndex = request.index();
}
this.internalRequest = new InternalRequest(request, concreteSingleIndex);
//TransportGetAction覆写该函数注释为:
//update the routing
resolveRequest(clusterState, internalRequest);
blockException = checkRequestBlock(clusterState, internalRequest);
if (blockException != null) {
throw blockException;
}
//这个方法在TransportGetAction中定义,负责找到可以处理该请求的分片Iterator
this.shardIt = shards(clusterState, internalRequest);
}
在构造AsyncSingleAction
之后,会调用其start
方法:
//TransportSingleShardAction.AsyncSingleAction
public void start() {
//在本节点没有获取到处理该请求的分片迭代器
if (shardIt == null) {
//下面是官网的注释,写的很清楚,直接在本地尝试进行具体的Get操作
//能找到Get结果则正常返回,否则返回失败。
//本地执行Get操作逻辑和下面介绍的把请求发送给其他节点的操作一样,
//都是调用transportShardAction注册的ShardTransportHandler处理
// just execute it on the local node
transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(), new TransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
});
} else {
//如果在本节点获取到可以处理该请求的分片
perform(null);
}
}
perform
函数的逻辑如下:
private void perform(@Nullable final Exception currentFailure) {
//异常的相关处理,记录最后发生的异常,如果操作失败
//(在本节点或者所有节点操作都失败)返回该异常给客户端
Exception lastFailure = this.lastFailure;
if (lastFailure == null || TransportActions.isReadOverrideException(currentFailure)) {
lastFailure = currentFailure;
this.lastFailure = currentFailure;
}
//从分片列表中获取下一个分片,这里也是重试逻辑所在,如果获取到shardRouting
//但是操作失败,会再次执行perform函数,此时会获取到下一个ShardRouting
final ShardRouting shardRouting = shardIt.nextOrNull();
//如果没有获取到ShardRouting则返回失败给客户端
if (shardRouting == null) {
Exception failure = lastFailure;
if (failure == null || isShardNotAvailableException(failure)) {
failure = new NoShardAvailableActionException(null, LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure);
} else {
logger.debug(() -> new ParameterizedMessage("{}: failed to execute [{}]", null, internalRequest.request()), failure);
}
listener.onFailure(failure);
return;
}
//从ShardRouting中获取node
DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
//获取失败则调用AsyncSingleAction.onFailure,在该函数中会再次调用perform函数
if (node == null) {
onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
} else {
···
//当前节点为协调节点,会发送Get请求到ShardIt中的第一个节点
//使用的action为transportShardAction
transportService.sendRequest(node, transportShardAction, internalRequest.request(), new TransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
//返回成功会调用listener.onResponse返回结果给客户端
@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
}
//发生异常则调用AsyncSingleAction.onFailure重试
@Override
public void handleException(TransportException exp) {
onFailure(shardRouting, exp);
}
});
}
}
}
//AsyncSingleAction.onFailure主要记录日志并调用perform重试
private void onFailure(ShardRouting shardRouting, Exception e) {
if (e != null) {
logger.trace(() -> new ParameterizedMessage("{}: failed to execute [{}]", shardRouting, internalRequest.request()), e);
}
perform(e);
}
上面代码中我们看到协调节点会依次从解析到的节点迭代器获取节点,发送请求给该节点,使用的action为action为transportShardAction
。在TransportSingleShardAction
的构造函数中可以看到注册的请求处理handler如下:
//TransportSingleShardAction
transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler());
TransportSingleShardAction.ShardTransportHandler
如下:
//TransportSingleShardAction.ShardTransportHandler
private class ShardTransportHandler implements TransportRequestHandler<Request> {
@Override
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
}
//调用异步操作
asyncShardOperation(request, request.internalShardId, new HandledTransportAction.ChannelActionListener<>(channel,
transportShardAction, request));
}
}
TransportSingleShardAction.asyncShardOperation
在子类TransportGetAction
被重写了,如下:
@Override
protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
//这是Get请求的一个参数,如果realtime=true,则该请求会直接刷新索引,
//然后进行查询并返回
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
super.asyncShardOperation(request, shardId, listener);
} else {
//如果realtime=false,则注册一个刷新listener,该索引刷新时会调用,然后进行查询并返回
//这里就需要等待系统的刷新时间达到,和设置的刷新频率有关。
indexShard.awaitShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
});
}
}
TransportSingleShardAction.asyncShardOperation
如下:
//TransportSingleShardAction
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
threadPool.executor(getExecutor(request, shardId)).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
//shardOperation是真正处理请求的地方,在TransportSingleShardAction子类
//TransportGetAction中定义
listener.onResponse(shardOperation(request, shardId));
}
});
}
下面看TransportGetAction.shardOperation
//TransportGetAction
@Override
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.refresh() && !request.realtime()) {
indexShard.refresh("refresh_flag_get");
}
GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
return new GetResponse(result);
}
最终会在Engine
中处理Get操作,下面大致看一下InternalEngine.get
函数实现:
//InternalEngine
@Override
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
SearcherScope scope;
//realtime=true下面会执行一次refresh操作
if (get.realtime()) {
VersionValue versionValue = null;
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
// we need to lock here to access the version map to do this truly in RT
versionValue = getVersionFromMap(get.uid().bytes());
}
if (versionValue != null) {
//如果该文档标识被删除了,则返回不存在
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS;
}
//版本冲突检查
if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
get.versionType().explainConflictForReads(versionValue.version, get.version()));
}
//通过函数调用轨迹可以看到这里默认会返回false
//从下面的注释也可以看出这里只用于update操作
if (get.isReadFromTranslog()) {
// this is only used for updates - API _GET calls will always read form a reader for consistency
// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
if (versionValue.getLocation() != null) {
try {
Translog.Operation operation = translog.readOperation(versionValue.getLocation());
if (operation != null) {
// in the case of a already pruned translog generation we might get null here - yet very unlikely
TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
.getIndexSettings().getIndexVersionCreated());
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), logger),
new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0));
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
throw new EngineException(shardId, "failed to read operation from translog", e);
}
} else {
trackTranslogLocation.set(true);
}
}
//realtime=true,执行刷新
refresh("realtime_get", SearcherScope.INTERNAL);
}
scope = SearcherScope.INTERNAL;
} else {
// we expose what has been externally expose in a point in time snapshot via an explicit refresh
scope = SearcherScope.EXTERNAL;
}
// no version, get the version from the index, we know that we refresh on flush
//获取最终的文档
return getFromSearcher(get, searcherFactory, scope);
}
}
网友评论