美文网首页
Dubbo源码分析(十) 网络通信

Dubbo源码分析(十) 网络通信

作者: skyguard | 来源:发表于2018-11-13 09:55 被阅读0次

之前我们说过,Dubbo是用Netty实现的网络通信,下面我们就来分析一下Dubbo使用Netty的具体实现。
先来看一下服务端的实现。先看一下NettyServer的实现

protected void doOpen() {
    // 设置日志工厂
    NettyHelper.setNettyLoggerFactory();

    // 实例化 ServerBootstrap
    bootstrap = new ServerBootstrap();

    // 创建线程组
    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));

    // 创建 NettyServerHandler 对象
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    // 设置 `channels` 属性
    channels = nettyServerHandler.getChannels();

    bootstrap
            // 设置它的线程组
            .group(bossGroup, workerGroup)
            // 设置 Channel类型
            .channel(NioServerSocketChannel.class) // Server
            // 设置可选项
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            // 设置责任链路
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) {
                    // 创建 NettyCodecAdapter 对象
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder()) // 解码
                            .addLast("encoder", adapter.getEncoder())  // 解码
                            .addLast("handler", nettyServerHandler); // 处理器
                }
            });

    // 服务器绑定端口监听
    // bind
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}

就是先创建一个ServerBootStrap,然后创建NettyServerHandler,把编解码器添加到ChannelPipeline上,然后把NettyServerHandler也添加上去,然后绑定端口,启动服务端。
再来看一下NettyServerHandler的实现

  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 交给下一个节点处理
    ctx.fireChannelActive();

    // 创建 NettyChannel 对象
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        // 添加到 `channels` 中
        if (channel != null) {
            channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
        }
        // 提交给 `handler` 处理器。
        handler.connected(channel);
    } finally {
        // 移除 NettyChannel 对象,若已断开
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        handler.received(channel, msg);
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    // 发送消息
    super.write(ctx, msg, promise);
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        // 提交给 `handler` 处理器。
        handler.sent(channel, msg);
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

NettyCodecAdapter,管理Netty的编解码器

  /**
 * Netty 编码器
 */
private final ChannelHandler encoder = new InternalEncoder();
/**
 * Netty 解码器
 */
private final ChannelHandler decoder = new InternalDecoder();

再来看一下客户端的实现

protected void doOpen() {
    // 设置日志工厂
    NettyHelper.setNettyLoggerFactory();

    // 创建 NettyClientHandler 对象
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);

    // 实例化 ServerBootstrap
    bootstrap = new Bootstrap();
    bootstrap
            // 设置它的线程组
            .group(nioEventLoopGroup)
            // 设置可选项
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
            // 设置 Channel类型
            .channel(NioSocketChannel.class);

    // 设置连接超时时间
    if (getTimeout() < 3000) {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
    } else {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
    }

    // 设置责任链路
    bootstrap.handler(new ChannelInitializer() {
        @Override
        protected void initChannel(Channel ch) {
            // 创建 NettyCodecAdapter 对象
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                    .addLast("decoder", adapter.getDecoder()) // 解码
                    .addLast("encoder", adapter.getEncoder()) // 编码
                    .addLast("handler", nettyClientHandler); // 处理器
        }
    });
}

基本过程和服务端基本一样,只不过客户端会设置超时时间,也是需要把编解码器添加到ChannelPipeline中,然后把NettyClientHandler也添加进去。
再来看一下NettyClientHandler的实现

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        handler.received(channel, msg);
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    super.write(ctx, msg, promise);
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        handler.sent(channel, msg);
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

再看一下NettyChannel这个类,它继承自AbstractChannel,提供了对Channel的操作,看一下具体的实现

 public void send(Object message, boolean sent) throws RemotingException {
    // 检查连接状态
    super.send(message, sent);

    boolean success = true; // 如果没有等待发送成功,默认成功。
    int timeout = 0;
    try {
        // 发送消息
        ChannelFuture future = channel.writeAndFlush(message);
        // 等待发送成功
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        // 若发生异常,抛出
        Throwable cause = future.cause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }

    // 发送失败,抛出异常
    if (!success) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}

再看一下AbstractClient的send方法

public void send(Object message, boolean sent) throws RemotingException {
    // 未连接时,开启重连功能,则先发起连接
    if (send_reconnect && !isConnected()) {
        connect();
    }
    // 发送消息
    Channel channel = getChannel();
    //TODO Can the value returned by getChannel() be null? need improvement.
    if (channel == null || !channel.isConnected()) {
        throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
    }
    channel.send(message, sent);
}

再到connect方法

protected void connect() throws RemotingException {
    // 获得锁
    connectLock.lock();
    try {
        // 已连接,
        if (isConnected()) {
            return;
        }
        // 初始化重连线程
        initConnectStatusCheckCommand();
        // 执行连接
        doConnect();
        // 连接失败,抛出异常
        if (!isConnected()) {
            throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                    + ", cause: Connect wait timeout: " + getTimeout() + "ms.");
        // 连接成功,打印日志
        } else {
            if (logger.isInfoEnabled()) {
                logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                        + ", channel is " + this.getChannel());
            }
        }
        // 设置重连次数归零
        reconnect_count.set(0);
        // 设置未打印过错误日志
        reconnect_error_log_flag.set(false);
    } catch (RemotingException e) {
        throw e;
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                + ", cause: " + e.getMessage(), e);
    } finally {
        // 释放锁
        connectLock.unlock();
    }
}

再到initConnectStatusCheckCommand方法

private synchronized void initConnectStatusCheckCommand() {
    //reconnect=false to close reconnect
    // 获得获得重连频率,默认开启。
    int reconnect = getReconnectParam(getUrl());
    // 若开启重连功能,创建重连线程
    if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
        // 创建 Runnable 对象
        Runnable connectStatusCheckCommand = new Runnable() {
            public void run() {
                try {
                    // 未连接,重连
                    if (!isConnected()) {
                        connect();
                    // 已连接,记录最后连接时间
                    } else {
                        lastConnectedTime = System.currentTimeMillis();
                    }
                } catch (Throwable t) {
                    // 超过一定时间未连接上,才打印异常日志。并且,仅打印一次。
                    String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
                    // wait registry sync provider list
                    if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
                        if (!reconnect_error_log_flag.get()) {
                            reconnect_error_log_flag.set(true);
                            logger.error(errorMsg, t);
                            return;
                        }
                    }
                    // 每一定次发现未重连,才打印告警日志。
                    if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
                        logger.warn(errorMsg, t);
                    }
                }
            }
        };
        // 发起定时任务
        reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
    }
}

其实是通过一个定时任务来检测是否连接成功了
再来看一下getReconnectParam方法

private static int getReconnectParam(URL url) {
    int reconnect;
    String param = url.getParameter(Constants.RECONNECT_KEY);
    if (param == null || param.length() == 0 || "true".equalsIgnoreCase(param)) {
        reconnect = Constants.DEFAULT_RECONNECT_PERIOD; // 默认 2000 毫秒
    } else if ("false".equalsIgnoreCase(param)) {
        reconnect = 0;
    } else {
        try {
            reconnect = Integer.parseInt(param);
        } catch (Exception e) {
            throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:" + param);
        }
        if (reconnect < 0) {
            throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:" + param);
        }
    }
    return reconnect;
}

获取重连的频率
Dubbo的网络通信就分析到这里了。

相关文章

网友评论

      本文标题:Dubbo源码分析(十) 网络通信

      本文链接:https://www.haomeiwen.com/subject/dcbcfqtx.html