Netty源码解析

作者: 丶含光 | 来源:发表于2020-10-11 23:23 被阅读0次

    Netty版本4.0.29.Final,以构造客户端连接服务端的角度来追踪源码

    一 创建Netty事件循环组

    NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    

    NioEventLoopGroup的构造器中会调用父类MultithreadEventLoopGroup的构造器

    1. SelectorProvider.provider()返回运行JVM的操作系统提供的provider
    2. 如果没有指定线程数量,默认为两倍核数
        // NioEventLoopGroup
        public NioEventLoopGroup() {
            this(0);
        }
        public NioEventLoopGroup(int nThreads) {
            this(nThreads, null);
        }
        public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
            this(nThreads, threadFactory, SelectorProvider.provider());
        }
        public NioEventLoopGroup(
                int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
            super(nThreads, threadFactory, selectorProvider);
        }
    
        // MultithreadEventLoopGroup
        private static final int DEFAULT_EVENT_LOOP_THREADS;
    
        static {
            DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
    
            if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
            }
        }
    
        protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
            super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
        }
    

    在父类MultithreadEventExecutorGroup的构造器中

    1. 创建默认的线程工厂,后续用来创建线程
    2. 创建一个SingleThreadEventExecutor数组,数组大小为两倍核数,保存在属性children上
    3. 创建选择器保存在属性chooser上
    4. 在newChild方法中创建EventExecutor填充到数组上
    5. newChild是父类MultithreadEventExecutorGroup提供的一个抽象方法,由于当前是在创建NioEventLoopGroup对象,具体实现还是由NioEventLoopGroup完成。
    6. 可以看到NioEventLoopGroup中完成了NioEventLoop的创建
        // MultithreadEventExecutorGroup
        protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (threadFactory == null) {
                threadFactory = newDefaultThreadFactory();
            }
    
            children = new SingleThreadEventExecutor[nThreads];
            if (isPowerOfTwo(children.length)) {
                chooser = new PowerOfTwoEventExecutorChooser();
            } else {
                chooser = new GenericEventExecutorChooser();
            }
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    children[i] = newChild(threadFactory, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
        }
    
        // NioEventLoopGroup
        @Override
        protected EventExecutor newChild(
                ThreadFactory threadFactory, Object... args) throws Exception {
            return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
        }
    

    二 创建Netty事件循环

    1. 将一开始创建的provider保存在属性provider上
    2. openSelector中返回操作系统提供的selector(多路复用器)
    3. 如果未禁用优化(默认未禁用),使用反射将操作系统返回的selector中的属性selectedKeys和publicSelectedKeys改为可写,并用Netty的SelectedSelectionKeySet代替
    4. 最后再看父类SingleThreadEventExecutor构造器
        // NioEventLoop
        NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
            super(parent, threadFactory, false);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            provider = selectorProvider;
            selector = openSelector();
        }
    
        private Selector openSelector() {
            final Selector selector;
            try {
                selector = provider.openSelector();
            } catch (IOException e) {
                throw new ChannelException("failed to open a new selector", e);
            }
    
            if (DISABLE_KEYSET_OPTIMIZATION) {
                return selector;
            }
    
            try {
                SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
                Class<?> selectorImplClass =
                        Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
    
                // Ensure the current selector implementation is what we can instrument.
                if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
                    return selector;
                }
    
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    
                selectedKeysField.setAccessible(true);
                publicSelectedKeysField.setAccessible(true);
    
                selectedKeysField.set(selector, selectedKeySet);
                publicSelectedKeysField.set(selector, selectedKeySet);
    
                selectedKeys = selectedKeySet;
                logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
            } catch (Throwable t) {
                selectedKeys = null;
                logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
            }
    
            return selector;
        }
    
    1. 事件循环组保存在属性parent上。对于客户端来说,这里的parent还是开始创建的NioEventLoopGroup(服务端可以有两个具有父子关系的事件循环组)
    2. 使用线程工厂创建线程,封装了匿名任务(后续会分析该任务),保存在属性thread上(未启动)
    3. 使用NioEventLoop的newTaskQueue创建一个无锁并发的单消费者多生产者队列,保存在属性taskQueue上
        // SingleThreadEventExecutor
        protected SingleThreadEventExecutor(
                EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
    
            this.parent = parent;
            this.addTaskWakesUp = addTaskWakesUp;
    
            thread = threadFactory.newThread(new Runnable() {
                @Override
                public void run() {
                    // 暂时不关注任务内逻辑 省略
                }
            });
    
            taskQueue = newTaskQueue();
        }
    
        // NioEventLoop
        @Override
        protected Queue<Runnable> newTaskQueue() {
            // This event loop never calls takeTask()
            return PlatformDependent.newMpscQueue();
        }
    
        // PlatformDependent
        public static <T> Queue<T> newMpscQueue() {
            return new MpscLinkedQueue<T>();
        }
    

    上面已经完成了NioEventLoop的创建,并保存在NioEventLoopGroup的数组属性上。而关于NioEventLoop在具备线程池能力时,在何时启动已经创建的线程呢?在SingleThreadEventExecutor::execute中可以找到答案

    1. 当任务被提交到NioEventLoop以后,首先判断提交任务的线程与thread属性保存的线程是否是同一个
    2. 如果不是同一个,startThread中根据维护的STATE_UPDATER来判断当前NioEventLoop是否已经启动过,如果没有,使用CAS更新该NioEventLoop状态为已经启动
    3. 然后启动该线程,后续如果再执行该方法,由于线程已经启动,方法内什么都不会做
    4. 再回到execute中,addTask将任务生产到之前创建的队列中
        // SingleThreadEventExecutor
        @Override
        public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
            boolean inEventLoop = inEventLoop();
            if (inEventLoop) {
                addTask(task);
            } else {
                startThread();
                addTask(task);
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }
        // AbstractEventExecutor
        @Override
        public boolean inEventLoop() {
            return inEventLoop(Thread.currentThread());
        }
    
        // SingleThreadEventExecutor
        @Override
        public boolean inEventLoop(Thread thread) {
            return thread == this.thread;
        }
        // SingleThreadEventExecutor
        private void startThread() {
            if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    schedule(new ScheduledFutureTask<Void>(
                            this, Executors.<Void>callable(new PurgeTask(), null),
                            ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
                    thread.start();
                }
            }
        }
        // SingleThreadEventExecutor
        protected void addTask(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
            if (isShutdown()) {
                reject();
            }
            taskQueue.add(task);
        }
    

    而关于NioEventLoop内线程启动后的逻辑,可以在创建该线程时看到有向线程提交一个任务。根据任务内SingleThreadEventExecutor.this.run()可以定位到创建线程提交任务的NioEventLoop::run

            // SingleThreadEventExecutor
            thread = threadFactory.newThread(new Runnable() {
                @Override
                public void run() {
                    boolean success = false;
                    updateLastExecutionTime();
                    try {
                        SingleThreadEventExecutor.this.run();
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                        for (;;) {
                            int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                            if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                                break;
                            }
                        }
                        // Check if confirmShutdown() was called at the end of the loop.
                        if (success && gracefulShutdownStartTime == 0) {
                            logger.error(
                                    "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                    "before run() implementation terminates.");
                        }
    
                        try {
                            // Run all remaining tasks and shutdown hooks.
                            for (;;) {
                                if (confirmShutdown()) {
                                    break;
                                }
                            }
                        } finally {
                            try {
                                cleanup();
                            } finally {
                                STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                                threadLock.release();
                                if (!taskQueue.isEmpty()) {
                                    logger.warn(
                                            "An event executor terminated with " +
                                            "non-empty task queue (" + taskQueue.size() + ')');
                                }
    
                                terminationFuture.setSuccess(null);
                            }
                        }
                    }
                }
            });
    

    NioEventLoop::run是Netty的核心所在。它是NioEventLoop的线程执行的唯一任务。方法内无限循环,阻塞等待IO事件或队列中的任务

    1. 判断NioEventLoop的队列中是否有任务
    2. 如果有,执行一次selectNow,这会导致多路复用器上准备就绪的事件被分配到selectedKeys上(jdk nio基础,操作系统介入)
    3. 如果没有,执行select(oldWakenUp);,阻塞oldWakenUp时间后,开始下一个循环(阻塞期间等待多路复用器上准备就绪的事件被分配到selectedKeys上)
    4. 根据设置的IO事件的执行比例(默认50%),计算出执行队列任务的最长执行时间(先记录IO事件执行的总耗时,然后根据比例,计算出执行队列任务的最长执行时间)
        // NioEventLoop
        @Override
        protected void run() {
            for (;;) {
                boolean oldWakenUp = wakenUp.getAndSet(false);
                try {
                    if (hasTasks()) {
                        selectNow();
                    } else {
                        select(oldWakenUp);
    
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        processSelectedKeys();
                        runAllTasks();
                    } else {
                        final long ioStartTime = System.nanoTime();
    
                        processSelectedKeys();
    
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
    
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Unexpected exception in the selector loop.", t);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
    
    执行IO事件
    1. 在有IO事件就绪时,处理for循环本次循环取出的这些事件
    2. 这里有看到unsafe类的介入,对于客户端而言,可能存在的IO事件有建立连接的应答,或者服务端主动发送消息给客户端
    3. 交由unsafe来处理的对应方法分别为unsafe::finishConnect和unsafe::read
    4. 对unsafe的分析将在后面完成,这里暂时不展开
        // NioEventLoop
        private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized(selectedKeys.flip());
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    
        private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
            for (int i = 0;; i ++) {
                final SelectionKey k = selectedKeys[i];
                if (k == null) {
                    break;
                }
                // null out entry in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys[i] = null;
    
                final Object a = k.attachment();
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (needsToSelectAgain) {
                    // null out entries in the array to allow to have it GC'ed once the Channel close
                    // See https://github.com/netty/netty/issues/2363
                    for (;;) {
                        if (selectedKeys[i] == null) {
                            break;
                        }
                        selectedKeys[i] = null;
                        i++;
                    }
    
                    selectAgain();
                    // Need to flip the optimized selectedKeys to get the right reference to the array
                    // and reset the index to -1 which will then set to 0 on the for loop
                    // to start over again.
                    //
                    // See https://github.com/netty/netty/issues/1523
                    selectedKeys = this.selectedKeys.flip();
                    i = -1;
                }
            }
        }
    
        private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                    if (!ch.isOpen()) {
                        // Connection already closed - no need to handle write.
                        return;
                    }
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    
    执行队列任务

    关于队列中的任务从哪里来?一是源码内部,启动时会直接提交任务到队列中;二是可以直接取出channel中的NioEventLoop向其提交任务;三是使用Channel写数据时,都是以任务的形式提交到队列中。与Channel绑定的NioEventLoop循环时会消费提交到队列中任务并执行,后续在分析unsafe时会一并提及。

    1. IO事件执行完成后,回到NioEventLoop::run中,在runAllTasks中开始执行队列中的任务
    2. pollTask从队列中取出一个任务,接着在for循环中执行该任务,并重复取出任务、执行任务的过程
    3. 退出for循环的条件有两个,一是队列中没有需要执行的任务,二是到了执行定时任务的时间
        // NioEventLoop
        protected boolean runAllTasks(long timeoutNanos) {
            fetchFromScheduledTaskQueue();
            Runnable task = pollTask();
            if (task == null) {
                return false;
            }
    
            final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
            long runTasks = 0;
            long lastExecutionTime;
            for (;;) {
                try {
                    task.run();
                } catch (Throwable t) {
                    logger.warn("A task raised an exception.", t);
                }
    
                runTasks ++;
    
                // Check timeout every 64 tasks because nanoTime() is relatively expensive.
                // XXX: Hard-coded value - will make it configurable if it is really a problem.
                if ((runTasks & 0x3F) == 0) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    if (lastExecutionTime >= deadline) {
                        break;
                    }
                }
    
                task = pollTask();
                if (task == null) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }
    
            this.lastExecutionTime = lastExecutionTime;
            return true;
        }
    

    三 客户端Channel类型

    客户端Bootstrap配置引导时,需要指定Channel类型,后续会使用反射创建该Channel类型实例。

    1. 假设以Channel类型为NioSocketChannel来跟踪源码
    2. 一系列构造器之间的调用,将JDK原生的SocketChannel作为属性保存在父类AbstractChannel的parent属性上
    3. 创建NioSocketChannelUnsafe实例保存在父类AbstractChannel属性unsafe上
    4. 创建管道DefaultChannelPipeline保存在父类AbstractChannel属性pipeline上
    5. 管道DefaultChannelPipeline中默认拥有head和tail两个ChannelHandler,构成只有头尾两节点的双向链表。
        // NioSocketChannel
        public NioSocketChannel() {
            this(newSocket(DEFAULT_SELECTOR_PROVIDER));
        }
        public NioSocketChannel(SocketChannel socket) {
            this(null, socket);
        }
        public NioSocketChannel(Channel parent, SocketChannel socket) {
            super(parent, socket);
            config = new NioSocketChannelConfig(this, socket.socket());
        }
        // AbstractNioByteChannel
        protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
            super(parent, ch, SelectionKey.OP_READ);
        }
        // AbstractNioChannel
        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);
            } catch (IOException e) {
                try {
                    ch.close();
                } catch (IOException e2) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Failed to close a partially initialized socket.", e2);
                    }
                }
    
                throw new ChannelException("Failed to enter non-blocking mode.", e);
            }
        }
        // AbstractChannel
        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            unsafe = newUnsafe();
            pipeline = new DefaultChannelPipeline(this);
        }
        // NioSocketChannel
        @Override
        protected AbstractNioUnsafe newUnsafe() {
            return new NioSocketChannelUnsafe();
        }
        // DefaultChannelPipeline
        public DefaultChannelPipeline(AbstractChannel channel) {
            if (channel == null) {
                throw new NullPointerException("channel");
            }
            this.channel = channel;
    
            tail = new TailContext(this);
            head = new HeadContext(this);
    
            head.next = tail;
            tail.prev = head;
        }
    

    四 客户端建立连接

    bootstrap.connect()
    

    客户端完成NioEventLoopGroup和Bootstrap的创建及对Bootstrap进行相关的设置后,使用Bootstrap尝试与服务端建立连接。在同步与异步之间,推荐使用异步来监听连接事件的结果。

    1. 一些必须的参数校验
    2. 由于是与服务器建立连接,localAddress值为null
    3. doConnect中先调用initAndRegister异步创建通道及注册
    4. 无论是使用isDone还是使用监听器,都是在通道创建完成后,使用doConnect0进行连接
        // Bootstrap
        public ChannelFuture connect() {
            validate();
            SocketAddress remoteAddress = this.remoteAddress;
            if (remoteAddress == null) {
                throw new IllegalStateException("remoteAddress not set");
            }
    
            return doConnect(remoteAddress, localAddress());
        }
    
        private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            final ChannelPromise promise = channel.newPromise();
            if (regFuture.isDone()) {
                doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
            } else {
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
                    }
                });
            }
    
            return promise;
        }
    

    AbstractBootstrap是Bootstrap的父类,initAndRegister中完成了通道的创建、初始化以及注册

    1. 使用Bootstrap中默认的BootstrapChannelFactory,根据设置的channel,使用反射创建channel实例
    2. 假设使用上述NioSocketChannel,当然也可以是别的channel类型
    3. init负责对创建的channel初始化
    4. 初始化完成后异步完成注册
        // AbstractBootstrap
        final ChannelFuture initAndRegister() {
            final Channel channel = channelFactory().newChannel();
            try {
                init(channel);
            } catch (Throwable t) {
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
    
            ChannelFuture regFuture = group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    
            // If we are here and the promise is not failed, it's one of the following cases:
            // 1) If we attempted registration from the event loop, the registration has been completed at this point.
            //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
            // 2) If we attempted registration from the other thread, the registration request has been successfully
            //    added to the event loop's task queue for later execution.
            //    i.e. It's safe to attempt bind() or connect() now:
            //         because bind() or connect() will be executed *after* the scheduled registration task is executed
            //         because register(), bind(), and connect() are all bound to the same thread.
    
            return regFuture;
        }
    
    Channel初始化
    1. init方法还是交由子类Bootstrap实现
    2. 取出channel上的管道,将客户端自定义的handler添加进去,底层其实是对双向链表的变更,如DefaultChannelPipeline::addLast0所示,这时会形成head、自定义ChannelInitializer、tail三个节点
    3. 对channel的option和attr进行设置,以支持将自定义参数配置到channel上
        // Bootstrap
        @Override
        @SuppressWarnings("unchecked")
        void init(Channel channel) throws Exception {
            ChannelPipeline p = channel.pipeline();
            p.addLast(handler());
    
            final Map<ChannelOption<?>, Object> options = options();
            synchronized (options) {
                for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
                    try {
                        if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                            logger.warn("Unknown channel option: " + e);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to set a channel option: " + channel, t);
                    }
                }
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
            }
        }
        // DefaultChannelPipeline
        private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
            checkMultiplicity(newCtx);
    
            AbstractChannelHandlerContext prev = tail.prev;
            newCtx.prev = prev;
            newCtx.next = tail;
            prev.next = newCtx;
            tail.prev = newCtx;
    
            name2ctx.put(name, newCtx);
    
            callHandlerAdded(newCtx);
        }
    
    Channel注册

    group()取到开始配置的NioEventLoopGroup,register在父类MultithreadEventLoopGroup中

    1. next()内借助之前生成的chooser的next()方法,选出MultithreadEventExecutorGroup中child数组上的一个NioEventLoop
    2. 接着变成NioEventLoop::register方法,该方法来自父类SingleThreadEventLoop
    3. 在SingleThreadEventLoop中再转换为channel上unsafe::register,关于unsafe暂时先不展开介绍。
        // MultithreadEventLoopGroup
        @Override
        public ChannelFuture register(Channel channel) {
            return next().register(channel);
        }
        // MultithreadEventExecutorGroup
        @Override
        public EventExecutor next() {
            return chooser.next();
        }
        // GenericEventExecutorChooser
        private final class GenericEventExecutorChooser implements EventExecutorChooser {
            @Override
            public EventExecutor next() {
                return children[Math.abs(childIndex.getAndIncrement() % children.length)];
            }
        }
        // SingleThreadEventLoop
        @Override
        public ChannelFuture register(Channel channel) {
            return register(channel, new DefaultChannelPromise(channel, this));
        }
        @Override
        public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
            if (channel == null) {
                throw new NullPointerException("channel");
            }
            if (promise == null) {
                throw new NullPointerException("promise");
            }
    
            channel.unsafe().register(this, promise);
            return promise;
        }
    
    Channel管道初始化

    虽说暂时不引入unsafe类逻辑,但在unsafe::register中,通道注册完成后会调用管道的fireChannelRegistered方法,进而执行自定义的ChannelInitializer,最终形成完整的管道。
    目前管道上链表有三个节点:head、自定义ChannelInitializer、tail(规定自头向尾的流向为In,自尾向头的为Out)
    它们的类型都是AbstractChannelHandlerContext,每个Context上都持有对应的ChannelHandler

    1. 首先head执行父类方法fireChannelRegistered,该方法找到以In流向的下一个AbstractChannelHandlerContext
    2. 然后在invokeChannelRegistered中找到Context持有的Handler,执行Handler::channelRegistered
    3. 自定义的ChannelInitializer属于ChannelInboundHandler的子类,流向为In。其channelRegistered方法执行时,调用被重写的initChannel方法
    4. initChannel方法内一般定义的是业务自身往管道添加Handler的逻辑ChannelPipeline::addLast
    5. addLast之前自定义ChannelInitializer已经有分析过,目的是将ChannelHandler封装为Context添加到管道上,并根据实现的接口标记节点流向,最终形成完整的双向链表
    6. 最后回到ChannelInitializer::channelRegistered中,将自定义的ChannelInitializer移出,因为它只需要在channel初始化时发挥作用,不需要存在于实际的管道中
        // DefaultChannelPipeline
        @Override
        public ChannelPipeline fireChannelRegistered() {
            head.fireChannelRegistered();
            return this;
        }
        // AbstractChannelHandlerContext
        @Override
        public ChannelHandlerContext fireChannelRegistered() {
            final AbstractChannelHandlerContext next = findContextInbound();
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelRegistered();
            } else {
                executor.execute(new OneTimeTask() {
                    @Override
                    public void run() {
                        next.invokeChannelRegistered();
                    }
                });
            }
            return this;
        }
    
        private void invokeChannelRegistered() {
            try {
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        }
    
        // ChannelInitializer
        @Override
        @SuppressWarnings("unchecked")
        public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            ChannelPipeline pipeline = ctx.pipeline();
            boolean success = false;
            try {
                initChannel((C) ctx.channel());
                pipeline.remove(this);
                ctx.fireChannelRegistered();
                success = true;
            } catch (Throwable t) {
                logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
            } finally {
                if (pipeline.context(this) != null) {
                    pipeline.remove(this);
                }
                if (!success) {
                    ctx.close();
                }
            }
        }
    
    Channel连接

    至此,Channel的初始化及注册已经完成。回到Bootstrap::doConnect中,在Channel注册这个异步任务完成后,开始真正的连接服务端

    1. 获取到Channel绑定的NioEventLoop,提交任务到线程池
    2. 任务被触发时,使用channel进行连接
    3. 使用channel上的管道pipeline进行连接,从tailf开始,以Out的流向,找到下一个Out节点,执行其invokeConnect
    4. 最终流到head节点,head内继续使用unsafe与服务端进行连接的建立
        // Bootstrap类
        private static void doConnect0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        if (localAddress == null) {
                            channel.connect(remoteAddress, promise);
                        } else {
                            channel.connect(remoteAddress, localAddress, promise);
                        }
                        promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }
    
        // AbstractChannel
        @Override
        public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
            return pipeline.connect(remoteAddress, promise);
        }
         // DefaultChannelPipeline
        @Override
        public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
            return tail.connect(remoteAddress, promise);
        }
        // AbstractChannelHandlerContext
        @Override
        public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
            return connect(remoteAddress, null, promise);
        }
    
        @Override
        public ChannelFuture connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    
            if (remoteAddress == null) {
                throw new NullPointerException("remoteAddress");
            }
            if (!validatePromise(promise, false)) {
                // cancelled
                return promise;
            }
    
            final AbstractChannelHandlerContext next = findContextOutbound();
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeConnect(remoteAddress, localAddress, promise);
            } else {
                safeExecute(executor, new OneTimeTask() {
                    @Override
                    public void run() {
                        next.invokeConnect(remoteAddress, localAddress, promise);
                    }
                }, promise, null);
            }
    
            return promise;
        }
    
        private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
            try {
                ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        }
    
        // HeadContext
        @Override
        public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) throws Exception {
            unsafe.connect(remoteAddress, localAddress, promise);
        }
    

    五 客户端发送数据

    channel.write("test");
    

    channel连接建立完成后,可以使用channel写数据

    1. 使用channel写数据时,从channel开始,经过管道,找到tail开始执行写数据逻辑
    2. tail作为双向链表的尾部,使用自尾向头的Out流向,找到下一个Context,直到最终流到head节点
    3. head节点转化为unsafe::write,在后面分析
        // AbstractChannel
        @Override
        public ChannelFuture write(Object msg) {
            return pipeline.write(msg);
        }
    
        // DefaultChannelPipeline
        @Override
        public ChannelFuture write(Object msg) {
            return tail.write(msg);
        }
    
        // AbstractChannelHandlerContext
        @Override
        public ChannelFuture write(Object msg) {
            return write(msg, newPromise());
        }
        @Override
        public ChannelFuture write(final Object msg, final ChannelPromise promise) {
            if (msg == null) {
                throw new NullPointerException("msg");
            }
    
            if (!validatePromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return promise;
            }
            write(msg, false, promise);
    
            return promise;
        }
        private void write(Object msg, boolean flush, ChannelPromise promise) {
    
            AbstractChannelHandlerContext next = findContextOutbound();
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeWrite(msg, promise);
                if (flush) {
                    next.invokeFlush();
                }
            } else {
                int size = channel.estimatorHandle().size(msg);
                if (size > 0) {
                    ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
                    // Check for null as it may be set to null if the channel is closed already
                    if (buffer != null) {
                        buffer.incrementPendingOutboundBytes(size);
                    }
                }
                Runnable task;
                if (flush) {
                    task = WriteAndFlushTask.newInstance(next, msg, size, promise);
                }  else {
                    task = WriteTask.newInstance(next, msg, size, promise);
                }
                safeExecute(executor, task, promise, msg);
            }
        }
        private void invokeWrite(Object msg, ChannelPromise promise) {
            try {
                ((ChannelOutboundHandler) handler()).write(this, msg, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        }
    
        // HeadContext
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
        }
    

    写数据不会直接发送数据到服务端,而是会缓存起来,直到调用flush才会完成数据的最终发送。flush逻辑与写数据一样,channel到pipeline,tail到head。最终交由unsafe类来完成

    channel.flush();
    
        @Override
        public void flush(ChannelHandlerContext ctx) throws Exception {
            unsafe.flush();
        }
    

    六 总结

    以客户端建立连接发送数据流程为例(服务端大部分逻辑相似),总结Netty的工作流程:

    1. 声明NioEventLoopGroup(事件循环组),内部会创建两倍核数大小的数组,并为数组上的每一个位置分配NioEventLoop(事件循环),NioEventLoop内包含一个未启动线程和一个任务队列
    2. 声明Bootstrap(引导),指定事件循环组、远程地址、option、attr、channel类型和自定义的ChannelInitializer
    3. 使用Bootstrap发起建立连接请求
    • 使用反射根据指定的channel类型创建channel实例
    • 对channel实例进行初始化,配置自定义ChannelInitializer到channel管道中,设置option和attr
    • 从NioEventLoopGroup维护的数组中选出NioEventLoop,借助NioEventLoop开始channel的注册逻辑
    • 这个过程会激活NioEventLoop内创建的线程。该线程进而启动一个无限for循环,开始接收多路复用器上就绪的IO事件,同时执行NioEventLoop内队列上的任务
    • 接着channel会与该NioEventLoop绑定,并将channel注册到多路复用器上
    • channel注册完成后,执行为channel指定的ChannelInitializer,最终在channel上形成完整的管道配置
    • 再设置channel为readPending的状态
    • 待channel准备就绪后,使用channel建立与服务端的连接,发起建立连接的请求,由NioEventLoop完成
    • 处理服务端返回的建立请求应答,IO事件由NioEventLoop完成
    1. Bootstrap在连接建立完成后返回channel实例,使用channel实例完成数据的发送。同时与channel绑定的NioEventLoop内的线程,监听并处理多路复用器上准备就绪的IO事件(接收服务端数据应答或服务端主动发起的数据)

    相关文章

      网友评论

        本文标题:Netty源码解析

        本文链接:https://www.haomeiwen.com/subject/hcqhpktx.html