netty源码分析之服务端

作者: 隔壁老王的隔壁啊 | 来源:发表于2017-10-24 15:05 被阅读29次

    一、前言

    上篇简单分析了下future、promise,由于这个在netty源码中用的实在太多,不得不提前了解下,这篇就开始分析源码了,先从server端开始~~~

    二、源码分析

    首先简单贴下server端的代码,这里只贴了核心代码

    public void startServer() throws Exception {
            // 只监听一个端口,因此只需要设置成线程数为1即可
            EventLoopGroup boss = new NioEventLoopGroup(1);
            EventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(boss, worker);
                // 底层实现1、首先创建ReflectiveChannelFactory2、调用反射机制创建
                // NioServerSocketChannel,同时还会创建pipeline,默认创建DefaultChannelPipeline
                b.channel(NioServerSocketChannel.class);
                b.childHandler(nettyTelnetInitializer);
                // 服务器绑定端口监听,bind()操作异步的
                ChannelFuture f = b.bind(SERVER_PORT);
                // 这个函数将会调用ChannelInitializer的initChannel(ChannelHandlerContext)方法
                // 只有在这个方法执行之后,server才算是绑定了端口
                f = f.sync();
                // 监听服务器关闭监听
                f.channel().closeFuture().sync();
            } finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    

    在startServer()方法中,首先创建了两个NioEventLoopGroup类,由于netty是基于reactor线程模型的,这里两个类也分别代表boss和workers。(至于什么是reactor线程模型,下下篇再介绍吧)

    紧接着创建ServerBootstrap调用group()方法,

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }
    @SuppressWarnings("unchecked")
    public B group(EventLoopGroup group) {
        if (group == null) {
            throw new NullPointerException("group");
        }
        if (this.group != null) {
            throw new IllegalStateException("group set already");
        }
        this.group = group;
        return (B) this;
    }
    

    再然后设置channel,由于是服务的,因此设置NioServerSocketChannel,底层源码

    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }
    
        this.channelFactory = channelFactory;
        return (B) this;
    }
    

    由此可见,默认ChannelFactory是ReflectiveChannelFactory。再看它的构造函数

    // 这里的clazz很明显就是NioServerSocketChannel,主要为了后面的反射使用
    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }
    

    设置完一些参数之后之后,接着就是注册和绑定了.绑定操作在AbstractBootstrap的doBind()

    private ChannelFuture doBind(final SocketAddress localAddress) {
        // 初始化channel和注册channel,都是异步操作
        final ChannelFuture regFuture = initAndRegister();
        // 获取channel
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
    
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            // 为Future添加一个监听器,当Future完成之后调用该方法,同时,当该方法执行之后,将会
            // 调用sync()方法中的wait()之后的代码
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
                        // 当注册成功之后,设置字段,表示已注册
                        promise.registered();
                        // 开始绑定ip和端口号
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
    // 初始化和注册
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 通过反射创建channel
            channel = channelFactory.newChannel();
            // 初始化channel
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // 注册channel
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
    
        return regFuture;
    }
    //ServerBootstrap类执行初始化
    @Override
    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }
        
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        // 获取对应channel的管道,在设置channel的时候就初始化了,默认是DefaultChannelPipeline
        ChannelPipeline p = channel.pipeline();
    
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
        // 为该管道添加一个handler
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
    
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        // 添加ServerBootstrapAcceptor,当客户端开始连接的时候,将会调用ServerBootstrapAcceptor的channelRead()方法
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
    // 注册操作
    // 在上面的注册方法最后调用如下
    // MultithreadEventLoopGroup类
    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    // 关于这里的next()是如何获取EventExecutor就不点进去看了,主要是通过EventExecutorChooser来获取.
    
    
    // 接着来到这里了SingleThreadEventLoop类
    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
    
    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
    // AbstractUnsafe类
    @Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        // .......略
        AbstractChannel.this.eventLoop = eventLoop;
    
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }
    
    private void register0(ChannelPromise promise) {
        try {
            // .......略
            doRegister();
            // .......略
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
    
    // AbstractNioChannel类
    // OK,到了这里终于看见了selectKey了,这就是真正执行注册了
    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 获取SelectionKeyImpl对象
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
    //AbstractSelectableChannel类
    public final SelectionKey register(Selector sel, int ops,
                                           Object att)
            throws ClosedChannelException
        {
            synchronized (regLock) {
                if (!isOpen())
                    throw new ClosedChannelException();
                if ((ops & ~validOps()) != 0)
                    throw new IllegalArgumentException();
                if (blocking)
                    throw new IllegalBlockingModeException();
                SelectionKey k = findKey(sel);
                if (k != null) {
                    k.interestOps(ops);
                    k.attach(att);
                }
                if (k == null) {
                    // New registration
                    synchronized (keyLock) {
                        if (!isOpen())
                            throw new ClosedChannelException();
                        // 到了这里终于到了nio的selector了~~~
                        k = ((AbstractSelector)sel).register(this, ops, att);
                        addKey(k);
                    }
                }
                return k;
            }
        }
    // 绑定
    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
    
        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    // 绑定IP/端口号
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    

    ok,到这里,channel的初始化和注册都完成了,DefaultChannelPromise,接着我们看最后一个方法吧.

    // 此时的f是:DefaultChannelPromise的子类:PendingRegistrationPromise
    f = f.sync()
    //DefaultChannelPromise类
    @Override
    public ChannelPromise sync() throws InterruptedException {
        super.sync();
        return this;
    }
    //DefaultPromise<V>类
    @Override
    public Promise<V> sync() throws InterruptedException {
        await();
        rethrowIfFailed();
        return this;
    }
    

    这里的await方法比较重要,我们看源码,前面的不重要,最重要的是wait()方法,这个是object的方法,也就是说到了这里它会
    进入"睡眠"状态,直到future 完成.这个我们可以在

    @Override
    public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            return this;
        }
    
        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }
    
        checkDeadLock();
    
        synchronized (this) {
            while (!isDone()) {
                incWaiters();
                try {
                    wait();
                } finally {
                    decWaiters();
                }
            }
        }
        return this;
    }
    

    三、总结

    本来准备客户端服务放在一起分析的,不过可能占用篇幅过大,就分成两篇了.
    总的来说,server端过程如下:
    1、初始化channel
    2、NioEventLoop类的regist()方法,将channel注册到selector
    3、同步,就是调用sync()方法,该方法必须调用
    虽然总结的比较简单,但是源码还是挺复杂的.
    下篇就就分析client端的了~

    相关文章

      网友评论

        本文标题:netty源码分析之服务端

        本文链接:https://www.haomeiwen.com/subject/wjuauxtx.html