之前我们说过,Dubbo是用Netty实现的网络通信,下面我们就来分析一下Dubbo使用Netty的具体实现。
先来看一下服务端的实现。先看一下NettyServer的实现
protected void doOpen() {
// 设置日志工厂
NettyHelper.setNettyLoggerFactory();
// 实例化 ServerBootstrap
bootstrap = new ServerBootstrap();
// 创建线程组
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
// 创建 NettyServerHandler 对象
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
// 设置 `channels` 属性
channels = nettyServerHandler.getChannels();
bootstrap
// 设置它的线程组
.group(bossGroup, workerGroup)
// 设置 Channel类型
.channel(NioServerSocketChannel.class) // Server
// 设置可选项
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// 设置责任链路
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
// 创建 NettyCodecAdapter 对象
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder()) // 解码
.addLast("encoder", adapter.getEncoder()) // 解码
.addLast("handler", nettyServerHandler); // 处理器
}
});
// 服务器绑定端口监听
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
就是先创建一个ServerBootStrap,然后创建NettyServerHandler,把编解码器添加到ChannelPipeline上,然后把NettyServerHandler也添加上去,然后绑定端口,启动服务端。
再来看一下NettyServerHandler的实现
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 交给下一个节点处理
ctx.fireChannelActive();
// 创建 NettyChannel 对象
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
// 添加到 `channels` 中
if (channel != null) {
channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
}
// 提交给 `handler` 处理器。
handler.connected(channel);
} finally {
// 移除 NettyChannel 对象,若已断开
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 发送消息
super.write(ctx, msg, promise);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
// 提交给 `handler` 处理器。
handler.sent(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
NettyCodecAdapter,管理Netty的编解码器
/**
* Netty 编码器
*/
private final ChannelHandler encoder = new InternalEncoder();
/**
* Netty 解码器
*/
private final ChannelHandler decoder = new InternalDecoder();
再来看一下客户端的实现
protected void doOpen() {
// 设置日志工厂
NettyHelper.setNettyLoggerFactory();
// 创建 NettyClientHandler 对象
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
// 实例化 ServerBootstrap
bootstrap = new Bootstrap();
bootstrap
// 设置它的线程组
.group(nioEventLoopGroup)
// 设置可选项
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
// 设置 Channel类型
.channel(NioSocketChannel.class);
// 设置连接超时时间
if (getTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
}
// 设置责任链路
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) {
// 创建 NettyCodecAdapter 对象
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder()) // 解码
.addLast("encoder", adapter.getEncoder()) // 编码
.addLast("handler", nettyClientHandler); // 处理器
}
});
}
基本过程和服务端基本一样,只不过客户端会设置超时时间,也是需要把编解码器添加到ChannelPipeline中,然后把NettyClientHandler也添加进去。
再来看一下NettyClientHandler的实现
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.sent(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
再看一下NettyChannel这个类,它继承自AbstractChannel,提供了对Channel的操作,看一下具体的实现
public void send(Object message, boolean sent) throws RemotingException {
// 检查连接状态
super.send(message, sent);
boolean success = true; // 如果没有等待发送成功,默认成功。
int timeout = 0;
try {
// 发送消息
ChannelFuture future = channel.writeAndFlush(message);
// 等待发送成功
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
// 若发生异常,抛出
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
// 发送失败,抛出异常
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
再看一下AbstractClient的send方法
public void send(Object message, boolean sent) throws RemotingException {
// 未连接时,开启重连功能,则先发起连接
if (send_reconnect && !isConnected()) {
connect();
}
// 发送消息
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
channel.send(message, sent);
}
再到connect方法
protected void connect() throws RemotingException {
// 获得锁
connectLock.lock();
try {
// 已连接,
if (isConnected()) {
return;
}
// 初始化重连线程
initConnectStatusCheckCommand();
// 执行连接
doConnect();
// 连接失败,抛出异常
if (!isConnected()) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getTimeout() + "ms.");
// 连接成功,打印日志
} else {
if (logger.isInfoEnabled()) {
logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}
// 设置重连次数归零
reconnect_count.set(0);
// 设置未打印过错误日志
reconnect_error_log_flag.set(false);
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
} finally {
// 释放锁
connectLock.unlock();
}
}
再到initConnectStatusCheckCommand方法
private synchronized void initConnectStatusCheckCommand() {
//reconnect=false to close reconnect
// 获得获得重连频率,默认开启。
int reconnect = getReconnectParam(getUrl());
// 若开启重连功能,创建重连线程
if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
// 创建 Runnable 对象
Runnable connectStatusCheckCommand = new Runnable() {
public void run() {
try {
// 未连接,重连
if (!isConnected()) {
connect();
// 已连接,记录最后连接时间
} else {
lastConnectedTime = System.currentTimeMillis();
}
} catch (Throwable t) {
// 超过一定时间未连接上,才打印异常日志。并且,仅打印一次。
String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
// wait registry sync provider list
if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
if (!reconnect_error_log_flag.get()) {
reconnect_error_log_flag.set(true);
logger.error(errorMsg, t);
return;
}
}
// 每一定次发现未重连,才打印告警日志。
if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
logger.warn(errorMsg, t);
}
}
}
};
// 发起定时任务
reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
}
}
其实是通过一个定时任务来检测是否连接成功了
再来看一下getReconnectParam方法
private static int getReconnectParam(URL url) {
int reconnect;
String param = url.getParameter(Constants.RECONNECT_KEY);
if (param == null || param.length() == 0 || "true".equalsIgnoreCase(param)) {
reconnect = Constants.DEFAULT_RECONNECT_PERIOD; // 默认 2000 毫秒
} else if ("false".equalsIgnoreCase(param)) {
reconnect = 0;
} else {
try {
reconnect = Integer.parseInt(param);
} catch (Exception e) {
throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:" + param);
}
if (reconnect < 0) {
throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:" + param);
}
}
return reconnect;
}
获取重连的频率
Dubbo的网络通信就分析到这里了。
网友评论