美文网首页
Netty服务启动流程分析

Netty服务启动流程分析

作者: 王凯_6a8b | 来源:发表于2019-01-24 17:22 被阅读0次

服务启动

服务启动可分为以下几步:
①:Boostrap.bind()
②:创建NioServerSocketChannel
③:将NioServerSocketChannel注册到EventLoopGroup(这里的EventLoopGroup指的是我们前面服务创建时指定的bossGroup)
下面结合源码进行分析:

ServerBoostrap-bind源码分析
//bind(port)最终调用doBind方法
public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}
//在doBind方法,主要看其第一行代码,intAndRegister方法:初始化NioServerSocketChannel并注册
private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
    .....
}
/**
 * 服务端NioServerSocketChannel的创建、初始化、注册
 * @return
 */
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        //这里的newChannel其实就是对我们之前channel方法传入的class进行实例化,即NioServerSocketChannel的创建
        channel = channelFactory().newChannel();
        //NioServerSocketChannle的初始化
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    //服务端Channel-NioServerSocketChannel的注册到EventLoop上
    ChannelFuture regFuture = group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}
NioServerSocketChannel的创建
/**
* NioServerSocketChannel的创建,服务端socketChannel的创建包含的内容:
*       实例化的时候注册OP_Accept事件
*       默认pipeline由DefaultChannelPipeline提供,其默认只包含HeadContext和TailContext两个不含ChannelHandler的AbstratChannelHandlerContext组成。
*/
/**
 * NioServerSocketChannel默认的构造函数
 */
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
 * 最终调用的构造器函数,可发现其①首先注册了OP_Accept事件,具体的操作发生在父类AbstractNioChannel的构造函数中
 * 而创建默认的Pipeline的动作则发生在父类AbstractChannel的构造函数中
 * ②然后初始化了一些服务端的基本配置
 */
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
//AbstractNioChannel父类中:Channel感兴趣的事件
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;//感兴趣的事件,后面,将Channel注册到EventLoop上的时候会用到
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }
        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}
//AbstractChannel父类中:创建默认的Pipeline
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

NioServerSocketChannel的初始化

NioServerSocketChannel的初始化,除了初始化一些参数外,还为pipeline添加一个ChannelInitializer的handler,这个特殊的handler会在channel register的时候被调用,然后回调initChannel方法,回调完成之后,会将当前channel从所属的pipeline中移除。

NioServerSocketChannel的初始化
//ServerBoostrap.init方法
//这里需重点关注最后为NioServerSocketChannel添加的Channelhandler,当OP_Accept事件发生的时候,NioSocketChannel的初始化会调用这里
void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }
 
    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
 
    ChannelPipeline p = channel.pipeline();
 
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }
    //重点
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = handler();
            //将我们之前添加的ChannelHanndler添加到SocketChannel的初始化中
            if (handler != null) {
                pipeline.addLast(handler);
            }
 
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

上述初始化代码中为pipeline添加了一个重要的类 ServerBootstrapAcceptor 这个类是专门负责处理IO监听事件的。它的里面提供了一个重要的方法channelRead,这个方法做了两件事:

  1. 为接收到的Channel添加我们在服务启动前设置的自定义的handler。
  2. 将接收到的channel注册到childGroup所在的EventLoopGroup中
NioServerSocketChannel的ChannelHandler
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    //省略部分代码...
    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;
        child.pipeline().addLast(childHandler);
        setChannelOptions(child, childOptions, logger);
        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
    //省略部分代码...
}

NioServerSocketChannel的注册

/**
 * NioServerSocketChannel的注册,这里的group是我们创建ServerBoostrap传入的第一个group即bossGroup
 */
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
  
    //next方法用于顺序返回group中的一个EventLoop,因为我们创建bossGroup的时候只指定了一个EventLoop,所以其实这里只有一个Eventloop可供选择
    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }
    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
}
//最终的注册会调用channel自身的register方法
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    ....
    channel.unsafe().register(this, promise);
    return promise;
}
//看一下channel自身的register方法,发现其会判断调用当前register的线程是否就是支撑当前EventLoop的线程
//如果是,则直接进行register,否则封装成一个任务,等待EventLoop之后进行调度执行
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
//在register0方法中,channel将自身注册到了EventLoop上Selector上,只不过注册的ops是0,表示只注册,不监听任何事件
//而真正的注册OP_Accept事件发生在,java原生的ServerSocketChannel绑定了localAddress之后,
public final void bind(final SocketAddress localAddress, final ChannelPromise){
 
    boolean wasActive = isActive();
    try {
        //java原生接口ServerSocketChannle的bind方法
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    //这里修改了NioServerSocketChannle注册的事件,将0->改为了感兴趣的事件即OP_Accpet事件
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
}

相关文章

网友评论

      本文标题:Netty服务启动流程分析

      本文链接:https://www.haomeiwen.com/subject/fjdujqtx.html