美文网首页
netty源码分析(二)

netty源码分析(二)

作者: 无聊之园 | 来源:发表于2019-05-16 19:20 被阅读0次

前面已经做好了一些初始化工作了。

ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();

public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
public ChannelFuture bind(SocketAddress localAddress) {
        // 校验工作,只是校验一下有没有初始化什么的,不管,
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

private ChannelFuture doBind(final SocketAddress localAddress) {
     // 初始化以及注册,这个方法会做很多时间,看下文
    final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
    
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
final ChannelFuture initAndRegister() {
        /**
前面serverBootstrap的channel()方法传入了NioServerSocketChannel类型,初始化了channel工厂,这里用这个工厂创建NioServerSocketChannel实例,
创建NioServerSocketChannel实例会调用NioServerSocketChannel的构造方法,看下文
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }
*/
        final Channel channel = channelFactory.newChannel();
        try {
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

创建NioServerSocketChannel实例。

    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
              // nio的ServerSocketChannel的open方法也是这么创建ServerSocketChannel对象的,
// 所以provider是nio用来创建channel的底层类
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }
// 创建了ServerSocketChannel后,创建NioServerSocketChannel
public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
// NioServerSocketChannel的ch变量维护了nio的ServerSocketChannel,
// ServerSocketChannel配置成非堵塞
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        // unsafe其实是NioSocketChannelUnsafe类,里面的方法都是
        // doWrite、doRead等操作nio的包装类
        unsafe = newUnsafe();
        // 管道,我们自定义的handler往管道里添加
        pipeline = newChannelPipeline();
    }

Pipeline初始化的时候,就设置号了head和tail,结构为:head《=》tail双向链表。
Head实现了ChannelOutBoundHandler和ChannelInboudhandler,所以是输入和输出处理器,
tail则只实现了ChannelInboudhandler,所以是输入处理器

protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
// pipeline,维护了一个链表
protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        // tail是ChannelInboundHandler接口实现类,也就是In类型handler
        tail = new TailContext(this);
        // head是ChannelOutboundHandler、ChannelInboundHandler实现类,
//也就是in类型和out类型handler
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }
 final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler 
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler

创建NioServerSocketChannel总结:NioServerSocketChannel内部开启了ServerSocketChannel,并设置为非堵塞,维护了一个pipline管道,管道有head和tail,和unsafe操作nio读写等的操作类。

回到initAndRegister方法

 final ChannelFuture initAndRegister() {
     // 前面分析了,这里的channel是NioServerSocketChannel,里面维护了pipling等很多东西
     final Channel channel = channelFactory.newChannel();
        try {
            // 现在分析这里,这里也很重要,看下文
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }
void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            channel.config().setOptions(options);
        }
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        ChannelPipeline p = channel.pipeline();
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
// 这里,往管道里添加了一个ChannelInitializer,
// ChannelInitializer是inboudhandler输入处理器,它的
// channelRegister方法会调用initChannel方法,在channel 
// 注册到select后,会传播调用pipeLine的一个一个
// inboudHandler的channelRegister方法,所以会调用到这里
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    // 这里把我们自己传入的handler,LoggingHandler放入了管道
                    pipeline.addLast(handler);
                }
                // 还放入了这个东高西,这个东西很重要,用来处理和客户端连接的,是inboud处理器,看下文
                pipeline.addLast(new ServerBootstrapAcceptor(
                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }

ServerBootstrapAcceptor这个类,NioServerSocketChannel关联的select的线程,一直在轮询接受客户端连接,接受了连接之后,就是这里在进行分发给child EventLoopGroup线程池处理。

  ServerBootstrapAcceptor(
                EventLoopGroup childGroup, ChannelHandler childHandler,
                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
            this.childGroup = childGroup;
            this.childHandler = childHandler;
            this.childOptions = childOptions;
            this.childAttrs = childAttrs;
        }
@Override
        // 这是NioServerSocketChannel的管道里的Handler,这里只监听客户端连接进来的事件,客户端的读、写会分发给child EventLoopGroup来处理
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
          // 这里只监听客户端连接进来的事件 ,所以这里的channel,是客户端刚建
// 立的NioSocketChannel
          final Channel child = (Channel) msg;
            // 和客户端建立的NioSocketChannel的管道,把我们传入的
// child handler:MyServerInitializer 放进去,child handler传入的handler是真正
// 处理SocketChannel读和写等交互操作的
            child.pipeline().addLast(childHandler);
            for (Entry<ChannelOption<?>, Object> e: childOptions) {
                try {
                    if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to set a channel option: " + child, t);
                }
            }
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
            try {
                // 把和客户端建立的SocketChanel,注册进childGroup线程池,线程池
// 会选一个EventLoop,然后EventLoop启动线程,循环select 监听读写事件,进行读写操作,一个channel对应一个线程
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

到这里initAndRegister方法里的init就分析完了,总结一下:创建了NioServerSocketChannel之后,调用init方法,会往NioServerSocketChannel的管道里放一个ChannelInitializer,ChannelInitializer的初始化方法initChannel会把我们的传入的LogHandler传入进去,并且还加入一个处理客户端连接的handler:ServerBootstrapAcceptor,ServerBootstrapAcceptor是真正对和客户端连接的SocketChannel进行处理的,处理的方式是:SocketChannel的管道会添加我们传入的自定义childHandler(这个handler是我们自己处理业务的),之后把SocketChanel,注册进childGroup线程池,由childGroup线程池分发线程处理, childGroup会选一个EventLoop,然后EventLoop启动线程,循环select,交互操作。

再总结:一个server启动,boss eventLoopGroup只会启动一个线程,监听NioServerSocketChannel注册的select,监听客户端的连接时间,和客户端建立连接之后,从child EventLoopGroup选一个eventLoop,启动一个线程,eventLoop内部维护了一个select,child eventLoop启动的线程循环select,然后分队对read和write等事件进行处理。

一个socketChannel对应一个线程,单线程读写,一个SocketChannel维护一个自己的select,boss eventLoopGroup其实只是开启了一个线程,循环监听接受客户端连接的select(ServerSocketChanel自己维护的select)。

有点绕。。。 不要紧,继续看,看完之后,回头就理解。

init分析完了,分析ChannelFuture regFuture = group().register(channel);

        ChannelFuture regFuture = group().register(channel);

// MultithreadEventLoopGroup类
 @Override
    public ChannelFuture register(Channel channel) {
          // next()从group EventLoopGroup,通过前面说的选择器,选择一个EventLoop,执行register注册。
        return next().register(channel);
    }
// SingleThreadEventLoop类
 @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

@Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
// NioServerSocketChannel注册到这个eventLoop
 @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
            // channel和eventLoop关联起来
            AbstractChannel.this.eventLoop = eventLoop;
            // inEventLoop方法判断,当前线程,和eventLoop内部开启的线程是否是
//同一个,这里eventLoop还没有开启线程呢,不进入这里
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    // 这个eventLoop是SingleThreadEventLoop对象。
//eventLoop的execute方法就好像线程池execute方法一样,提交一个任务,让线
// 程池运行。看下文
                    eventLoop.execute(new OneTimeTask() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }
 @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        boolean inEventLoop = inEventLoop();
     // 如果是nioEventLoop自己线程里面,则放入队列,否则,启动新线程。
  // 这里,eventLoop里面还没有线程,所以不会进入这里
     if (inEventLoop) {
            addTask(task);
        } else {
            // 启动线程
            startThread();
            // 把任务放入eventLoop的队列
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }
private void doStartThread() {
        assert thread == null;
   // 这个EventLoop里的executor是ThreadPerTaskExecutor
    // 这里真这个启动线程,异步。
     executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
                boolean success = false;
                updateLastExecutionTime();
                try {
                    // 这个线程的任务就是跑NioEventLoop,也就是
// SingleThreadEventExecutor的run方法
                    SingleThreadEventExecutor.this.run();
                    success = true;
                }
    }

新启动的线程,轮询select,接受客户端的连接

// 轮询select
protected void run() {
        for (;;) {
            try {
// hasTask有任务,则进入default分支,没任务则进入select分支
// 因为这里,主线程已经把注册任务放入了队列,所以这里队列是有任务的
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
// 这里会一直select轮询,接收客户端的连接                        
                      select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // 有任务
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                // ioRatio 是io和run task的比值,io指的的监听select用户的连接io,
// ioRatio为100,则说明,一定先select接受用户的连接,之后,才处理任务。默
//认是50,则说明,select用户连接的时间和run task的时间是一半一半,即使任
// 务队列中还有任务没有执行完,也返回轮询select接受用户连接
                if (ioRatio == 100) {
                    processSelectedKeys();
                    runAllTasks();
                } else {
                    final long ioStartTime = System.nanoTime();
                  // 处理selectKey,
                  // 第一次selectedKeys为null,所以这里什么都没做
// 后面执行了select.select()方法后,selectedKeys不为null,则这里会做很多事情
                  processSelectedKeys();
                    final long ioTime = System.nanoTime() - ioStartTime;
                  // 第一次其实进入这里会做很多事情
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
        }
    }

前面说了,启动线程之后,addTask(task)往单线程池里添加了注册任务,这里会取出任务执行

protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        // 取出任务
        Runnable task = pollTask();
        if (task == null) {
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            try {
              // 执行,也就之前addTask的register0任务,下文分解
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception.", t);
            }

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
            // 再取
            task = pollTask();
          // 没有任务则跳出  
          if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
              // 这里是把Server channel注册到现在的Group()里的一个eventloop的selector中
              // channel.register(select,0,this)
                doRegister();
                neverRegistered = false;
                registered = true;
                safeSetSuccess(promise);
              // 这里调用pipeline的传播channelRegister方法,会
// 调用一个一个inboud handler的channelRegistered,最后调
// 用到了前面说的channelIniiter的channelRegister方法,然后
// channelRegister方法调用ChannelInitializer方法,然后把我
// 们自定义的handler添加到pipeline,还添加了
// ServerBootstrapAcceptor,用来分发channel连接的
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
@Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 这里,channel.register(select,0,this)注册
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
             ...
        }
    }
private void processSelectedKeys() {
      // selectedKeys 就是第一章里,新建EventLoopGroup中new Child初始化
// EventLoop的时候,openSelect初始化select的时候,nio的select会通过反射把
// selectedKeys对象设置进去,这样Select的select()堵塞的时候,这里的
// selectKeys中的对象就会发生变化。
        if (selectedKeys != null) {
            // new EventLoop的时候,selectedKeys 初始化过了,进入这里
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

前面select.select()轮询,接收到了selectionKey之后,这里进行处理

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            // selectKey有值则进入这里
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys[i] = null;
            // channel register select 的时候会通过:k.attach(att);设置了channnel
            final Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                // 进入这里
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                for (;;) {
                    i++;
                    if (selectedKeys[i] == null) {
                        break;
                    }
                    selectedKeys[i] = null;
                }

                selectAgain();
                // Need to flip the optimized selectedKeys to get the right reference to the array
                // and reset the index to -1 which will then set to 0 on the for loop
                // to start over again.
                //
                // See https://github.com/netty/netty/issues/1523
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
           // k 不合法,不会进入这里
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }
        
        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
             // 接受客户端连接accept时间是这里进行的处理
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            // 新建的连接,在这里,会传播调用fireChannelRead
//方法,最后,调用到ServerBootstrapAcceptor类的
//channelRead方法,然后把这个channel注册到childGroup线
//程池中的一个EventLoop的select上去,然后这个childCroup
//的线程循环select
                unsafe.read();
                // channel关闭了
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                // 来了写事件,则强迫刷调缓冲区
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                // 这里会回调监听器的operationComplete方法,说明已经连接成功了
                unsafe.finishConnect();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
@Override
        public void read() {
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    // 这里一个个调用pipeLine的handle对象,这里会调用前面说的
// ServerHandleAccept,来对客户端连接的SocketChannel分发到child Group线
// 程池。
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();
                }
    }

 @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        // head为管道的head,前面说了head是inbound也是outbound,read属于输
// 入,所以会一个一个调用所有的inboud的read的方法,从head开始调
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        // 传入的参数ext为head, executor返回的channel对应的EventLoop
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            
            next.invokeChannelRead(m);
        } 
    }
// 调用head的channelRead方法,然后fireChannelRead调用管道下一个in类型handler
 private void invokeChannelRead(Object msg) {
        if (isAdded()) {
            try {
                // handle返回我们设置的logHandle,调用logHandler的channelRead发方法
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                // 一旦出现异常,则一个一个调用inboud的exceptionCaught,机制和read一样
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
// logHandler的channelRead执行了自己的逻辑之后,ctx.fireChannelRead(msg)往下传递,找下一个handle。
@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (logger.isEnabled(internalLevel)) {
            logger.log(internalLevel, format(ctx, "RECEIVED", msg));
        }
        ctx.fireChannelRead(msg);
    }

@Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
      // 找到下一个in类型的handler,然后执行,如此下去
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }
// 找到下一个inboud
private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

也就是说:添加的handler有,ChannelInboundHandler,和ChannelOutboundHandler,输入则一个一个调用ChannelInboundHandler,输出则一个一个调用ChannelOutboundHandler,这里说的输入是说接收数据,输出说写数据出去,出现了异常也是看是输入还是输出的时候发生的异常,然后调用ChannelInboundHandler还是ChannelOutboundHandler的异常处理。
输入的调用的顺序是:head-》-》往下,输出的时候是Tail=》=》一个一个往上。

差不多了,还有一点点细节:比如,register方法是怎么注册的,bind是怎么bind的,但是不重要,整个个流程分析了。
其他的细节问题,碰到问题,再看看,也能看明白。

总结:
一个server启动,boss eventLoopGroup只会启动一个线程,监听NioServerSocketChannel注册的select,监听客户端的连接事件,和客户端建立连接之后,从child EventLoopGroup选一个eventLoop,启动一个线程,eventLoop内部维护了一个select,把这个新建立的连接注册到child EventLoopGroup中的eventLoop的select,child eventLoop启动的线程循环select,然后分队对read和write等事件进行处理。

一个channel维护一个pipeline, 一个eventLoop对应一个select,channel会注册到eventLoop中去,也就是channel对应一个线程,对应一个select,但是这个eventloop可能被许多个channel注册了,也就是,这个select可能监听了许多个channel,也就是:select 和 线程是一对一,select 和 channel是一对多,线程和channel是一对多。

单线程读写channel,所以没有并发问题,但是如果一个channel堵塞了,可能就会影响到和这个channel共用select,共用线程的其他channel得不到处理。

而且:比如 EventLoopGroup bossstrap = new NioEventLoopGroup();
这种写法,其实可以可以这么写: EventLoopGroup bossstrap = new NioEventLoopGroup(1);,省的初始化多个EventLoop,当然空间也占不了多少。

对比nio:nio是一个select监听所有的客户端连接,和读写事件等,netty是一个select只监听客户端连接,但是多个select只监听读写等事件。
所以说,单从select和多线程方面,netty不会比nio效率高太多,nio一个select监听到事件之后,使用线程池处理的话,性能和netty不会差很多,不过nio可能会存在多线程操作的锁的问题。
使用netty最大的必要性是:netty比nio简单,工作量小,自带很多功能,二nio要实现这些功能要重新写,然后netty稳定,修复了nio的一些bug。

相关文章

网友评论

      本文标题:netty源码分析(二)

      本文链接:https://www.haomeiwen.com/subject/svpiaqtx.html