ES 当发起各种Action类的Request时, 在集群环境中通常需要进行请求的转发, 此时就会出现RPC通信, 针对此RPC通信ES 是通过NettyTransport类来完成转发与接收功能的. 实际上就是基于Netty的一个通信代码模块.
在Node启动以及TransportClient初始化的过程中都会创建NettyTransport
Node节点启动时
当节点Node启动时会根据依赖注入获取TransportService实例, 并调用其start方法,最终会转向NettyTransport的doStart 方法, 进而创建Netty RPC的客户端以及服务端对象, 通过MessageHandler来处理请求或者处理Response.
public Node start() {
//... 省略
injector.getInstance(MappingUpdatedAction.class).setClient(client);
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndexingMemoryController.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(IndicesTTLService.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(SnapshotShardsService.class).start();
injector.getInstance(RoutingService.class).start();
injector.getInstance(SearchService.class).start();
injector.getInstance(MonitorService.class).start();
injector.getInstance(RestController.class).start();
// TODO hack around circular dependencies problems
injector.getInstance(GatewayAllocator.class).setReallocation(injector.getInstance(ClusterService.class), injector.getInstance(RoutingService.class));
injector.getInstance(ResourceWatcherService.class).start();
injector.getInstance(GatewayService.class).start();
// Start the transport service now so the publish address will be added to the local disco node in ClusterService
TransportService transportService = injector.getInstance(TransportService.class);
transportService.start();
injector.getInstance(ClusterService.class).start();
//... 省略
return this;
}
转而AbstractLifecycleComponent的start方法
public T start() {
if (!lifecycle.canMoveToStarted()) {
return (T) this;
}
for (LifecycleListener listener : listeners) {
listener.beforeStart();
}
doStart();
lifecycle.moveToStarted();
for (LifecycleListener listener : listeners) {
listener.afterStart();
}
return (T) this;
}
doStart方法此处只分析NettyTransport的doStart方法(不分析LocalTransport的)
那段代码中有如下两个比较关键:
clientBootstrap = createClientBootstrap(); // 启动客户端
createServerBootstrap(name, mergedSettings); // 启动Server端
看看客户端的构造:
private ClientBootstrap createClientBootstrap() {
if (blockingClient) {
clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX))));
} else {
int bossCount = settings.getAsInt("transport.netty.boss_count", 1);
clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)),
bossCount,
new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)), workerCount),
new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer"))));
}
clientBootstrap.setPipelineFactory(configureClientChannelPipelineFactory());
//... 省略
return clientBootstrap;
}
上面代码一个注意的地方是clientBootstrap.setPipelineFactory(configureClientChannelPipelineFactory());
public ChannelPipelineFactory configureClientChannelPipelineFactory() {
return new ClientChannelPipelineFactory(this);
}
protected static class ClientChannelPipelineFactory implements ChannelPipelineFactory {
protected final NettyTransport nettyTransport;
public ClientChannelPipelineFactory(NettyTransport nettyTransport) {
this.nettyTransport = nettyTransport;
}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline channelPipeline = Channels.pipeline();
SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
if (nettyTransport.maxCumulationBufferCapacity != null) {
if (nettyTransport.maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
} else {
sizeHeader.setMaxCumulationBufferCapacity((int) nettyTransport.maxCumulationBufferCapacity.bytes());
}
}
if (nettyTransport.maxCompositeBufferComponents != -1) {
sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents);
}
channelPipeline.addLast("size", sizeHeader);
// using a dot as a prefix means, this cannot come from any settings parsed
channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger, ".client"));
return channelPipeline;
}
}
上面的代码中channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger, ".client"));指定了使用MessageChannelHandler来进行接收处理请求或者Response
createServerBootstrap(name, mergedSettings); // 启动Server端 基本和createClientBootstrap()流程相似, 最终都是通过MessageChannelHandler来进行接收处理请求或者Response.
创建TransportClient时
当调用build方法时会根据依赖注入获取TransportService实例, 并调用其start方法,最终会转向NettyTransport的doStart 方法, 进而创建Netty RPC的客户端以及服务端对象, 通过MessageHandler来处理请求或者处理Response.
它的具体流程和上面基本一致.
整体的结构图如图所示
image.png
通过这样的方式, 当Client发起一个请求, 比如先发给协调器, 协调器可能需要通过NettyTranport(也可能是LocalTransport, 看具体情况)转发到其他的Node, 最终会通过MessageChannelHandler进行处理Request以及Response
网友评论