TransportAction类继承层次
ElasticSearch内部RPC通信发送请求会使用action
(String类型)来告知请求处理方使用哪个Handler处理该请求。
请求处理方会事先注册action
(String)->handler
(TransportRequestHandler)的映射,在请求到来时会在注册的handlers中找到处理该请求的Handler进行响应。
TransportAction
类继承层次如下图所示:

可见,主要有三类TransportAction
:HandledTransportAction
、TransportReplicationAction
、TransportSingleShardAction
。下面分别进行介绍:
1 HandledTransportAction
先看源码注释:
A TransportAction that self registers a handler into the transport service
表示一个自己向transport
注册一个handler的TransportAction
。其构造函数会接受一个String
类型的action参数,并注册handler,注册的handler为TransportHandler
:
//HandledTransportAction
protected HandledTransportAction(Settings settings, String actionName, ...) {
super(settings, actionName, actionFilters, transportService.getTaskManager());
//TransportHandler是
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, false, canTripCircuitBreaker, requestReader,
new TransportHandler());
}
//HandledTransportAction.TransportHandler
class TransportHandler implements TransportRequestHandler<Request> {
@Override
public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
// We already got the task created on the network layer - no need to create it again on the transport layer
Logger logger = HandledTransportAction.this.logger;
//直接调用外部类HandledTransportAction的execute方法进行请求//处理并返回response,参数ChannelActionListener可以用于在请求处理
//完毕时获取Channel向请求发起方返回响应。
execute(task, request, new ChannelActionListener<>(channel, actionName, request));
}
}
2 TransportReplicationAction
该类的源码注释如下:
Base class for requests that should be executed on a primary copy followed by replica copies. Subclasses can resolve the target shard and provide implementation for primary and replica operations.The action samples cluster state on the receiving node to reroute to node with primary copy and on the primary node to validate request before primary operation followed by sampling state again for resolving nodes with replica copies to perform replication.
即TransportReplicationAction
是所有那些为了完成请求处理,先在主分片处理之后,然后发送到副分片处理的Action类的基类。
这些类一般会在会在构造函数向TransportReplicationAction
传递一个actionName,TransportReplicationAction
会在其自己构造函数中注册三个Handler,如下:
//TransportReplicationAction
protected TransportReplicationAction(Settings settings, String actionName, ...) {
super(settings, actionName, actionFilters,
...
//这里会拼接主分片和副分片接受请求时的handler使用的action
this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
registerRequestHandlers(actionName, transportService, request, replicaRequest, executor);
...
}
protected void registerRequestHandlers(String actionName, TransportService transportService, ...) {
//注册接收请求的handler,这里用于协调节点接受请求,属于请求入口,
//后续协调节点会解析处理该请求的主分片所在节点,然后使用
//transportPrimaryAction将该请求转发给主分片节点,主分片节点
//处理完请求之后,会使用transportReplicaAction将请求发送给
//所有副分片所在节点,副分片会对请求进行处理。
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
//注册主分片节点处理handler
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
new PrimaryOperationTransportHandler());
// we must never reject on because of thread pool capacity on replicas
//注册副分片节点处理handler
transportService.registerRequestHandler(transportReplicaAction,
() -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true,
new ReplicaOperationTransportHandler());
}
3 TransportSingleShardAction
同样,先看源码注释:
A base class for operations that need to perform a read operation on a single shard copy. If the operation fails, the read operation can be performed on other shard copies. Concrete implementations can provide their own list of candidate shards to try the read operation on.
即处理只需要在一个分片上即可完成的操作,如果在该分片上处理失败之后,可以将请求发送到其他分片再次尝试。
其注册Handler如下:
//TransportSingleShardAction
protected TransportSingleShardAction(Settings settings, String actionName, ...) {
...
if (!isSubAction()) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
}
transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler());
}
网友评论