transporter层支持netty,mina,http等协议。本文介绍基于netty4的实现。
一 NettyClient
1.1 类关系图
NettyClient类关系图.png1.2 初始化
1.2.1 NettyClient初始化
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
return ChannelHandlers.wrap(handler, url);
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
- 设置threadpool属性默认为cache
- handler封装
1.2.2 AbstractClient初始化
- send.reconnect配置支持发送时重连断开的连接
- 初始化netty client,配置channel属性,配置编解码和收包函数,配置通信事件分发器
- 保存收包异步处理线程池,停机时做销毁处理
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(NioSocketChannel.class);
if (getConnectTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyClientHandler);
}
});
}
- 启动线程池定时调度任务检查channel连接状态,对断开的连接进行重连
- 和服务端建立tcp连接
ChannelFuture future = bootstrap.connect(getConnectAddress());
1.2.3 AbstractEndpoint初始化
- 保存连接超时和通信超时
1.2.4 AbstractPeer初始化
- 保存MultiMessageHandler,netty通信时调用handler处理对应通信事件
1.3 收包handler介绍
1.3.1 MultiMessageHandler
- 如果一次收包包含多个消息时,会解码成MultiMessage批量消息类型。
- 对批量消息,遍历依次调用下层收包处理函数。
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
1.3.2 HeartbeatHandler
心跳报文处理,见exchanger层介绍
1.3.3 异步线程池收包处理
- 根据dispather配置选择异步收包处理函数。
配置值 | 说明 |
---|---|
all | 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,⼼跳等。 |
direct | 所有消息都不派发到线程池,全部在 IO 线程上直接执⾏。 |
message | 只有请求响应消息派发到线程池。其它连接断开事件,⼼跳等消息,直接在 IO 线程上执⾏。 |
execution | 只有请求消息派发到线程池。响应和其它连接断开事件,⼼跳等消息,直接在 IO 线程上执⾏。 |
connection | 一个单线程线程池,处理连接断开事件,connect.queue.capacity 配置指定可缓存的连接请求最大数量。其它消息派发到另一个线程池。 |
-
WrappedChannelHandler
根据threadpool配置创建线程池。queues属性值可设置等待blockqueue的长度。
配置值 | 说明 |
---|---|
fixed | 固定⼤⼩线程池,new ThreadPoolExecutor(200, 200, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url))
|
cached | 缓存线程池。new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60*1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url))
|
limited | 最大线程数量线程池。new ThreadPoolExecutor(0, 200, Long.MAX_VALUE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url))
|
- 异步线程执行任务
ChannelEventRunnable
根据channel的状态调用handler的不同接口函数,这里调用的exchange层的DecodeHandler。
public enum ChannelState {
CONNECTED,
DISCONNECTED,
SENT,[图片上传中...(image.png-b4f324-1546695374507-0)]
RECEIVED,
CAUGHT
}
1.3.4 收包handler完整层次图
收包handler.png二 NettyServer
2.1 类关系图
NettyServer类.png2.2 初始化
2.2.1 NettyServer初始化
- 封装收包处理函数
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
2.2.2 AbstractServer初始化
- 配置连接请求事件分发器
bossGroup
,通信事件分发器workerGroup
- 配置channel属性
- 配置编解码函数和收包处理函数
- 创建netty server
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
2.3 收包处理
收包处理函数与NettyClient流程相同。
网友评论