美文网首页Netty源码分析系列
Netty源码分析系列--10. Channel注册到Event

Netty源码分析系列--10. Channel注册到Event

作者: ted005 | 来源:发表于2018-11-06 23:50 被阅读44次

Channel的注册到EventLoop

前文中介绍了服务端ServerBootStrap绑定端口号时,很重要的一个方法是initAndRegister,当Channel初始化完成后,会进行注册

ChannelFuture regFuture = config().group().register(channel);
  • group方法返回NioEventLoopGroupbossGroup

  • NioEventLoopGroup继承了MultithreadEventLoopGroup, 其中有register方法。

@Override
public EventLoop next() {
    return (EventLoop) super.next();
}

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

进入子类SingleThreadEventLoop

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

可以看到,Channel被包装为ChannelPromise,它持有ChannelEventLoop
最终进入AbstractUnsaferegister方法:

private void register0(ChannelPromise promise) {
        try {
            
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            
            //1.
            doRegister();
            neverRegistered = false;
            registered = true;

            //2.
            pipeline.invokeHandlerAddedIfNeeded();

            safeSetSuccess(promise);
            
            //3.
            pipeline.fireChannelRegistered();
            
            if (isActive()) {
                if (firstRegistration) {
                    //4.
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    
                    beginRead();
                }
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
  1. AbstractUnsafe中的doRegister()方法为空,进入它的子类AbstractNioChannel:
@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // java NIO
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}
  • javaChannel方法返回一个SelectableChannel对象,然后调用register方法,注册到Selector
  • eventLoop().unwrappedSelector()的类型是Selector
  • 可以看到Channel的注册流程最终使用Java NIO实现。
  1. 注册完成后,依次调用Handler的回调方法。

Channel、EventLoop的关系

  • EventLoop继承了EventExecutor,可以提交和执行Runnable任务

  • 一个EventLoopGroup中会有多个EventLoop

  • NioEventLoop的父类SingleThreadEventExecutor中持有Thread对象,它负责处理EventLoop中所有的I/O事件

  • 一个Channel只会注册到一个EventLoop

  • 一个EventLoop上可以注册多个Channel

  • 由于EventLoop由单线程处理I/O事件,因此在ChannelHandler的回调方法中不能执行耗时较长的任务,否则会阻塞其他Channel中事件的处理。

  • 因此通常在ChannelHandler的回调方法中新启线程池,在新的线程中执行耗时任务。或者在Handler添加到ChannelPipeline时,指定事件执行组EventExecutorGroupNetty源码分析系列--8. Channel和ChannelPipeline中有介绍)

    Channel和EventLoop.png

如下Channel注册时的代码示例:

if (eventLoop.inEventLoop()) {
    register0(promise);
} else {
    try {
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                register0(promise);
            }
        });
    } catch (Throwable t) {
        logger.warn(
                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                AbstractChannel.this, t);
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

当方法inEventLoop()返回true时,直接执行调用register0,否则作为任务Task提交到EventLoop中稍后执行。

方法inEventLoop()的实现在NioEventLoop的父类SingleThreadEventExecutor中,如下:

@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

SingleThreadEventExecutor的父类AbstractEventExecutor把当前线程对象传入:

@Override
public boolean inEventLoop() {
    return inEventLoop(Thread.currentThread());
}

综上可以看到,inEventLoop()的逻辑就是判断当前线程是否是NioEventLoopSingleThreadEventExecutor)所持有的那个Thread对象。如果不是,就提交给NioEventLoop,让任务稍后由持有的那个Thread对象执行。

相关文章

网友评论

    本文标题:Netty源码分析系列--10. Channel注册到Event

    本文链接:https://www.haomeiwen.com/subject/ayzpxqtx.html