美文网首页Netty
Netty源码分析 - NioEventLoop事件处理

Netty源码分析 - NioEventLoop事件处理

作者: 晴天哥_王志 | 来源:发表于2020-04-04 21:06 被阅读0次

系列

Netty源码分析 - Bootstrap服务端
Netty源码分析 - Bootstrap客户端
netty源码分析 - ChannelHandler
netty源码分析 - EventLoop类关系
netty源码分析 - register分析
Netty源码分析 - NioEventLoop事件处理
netty源码分析 - accept过程分析
Netty源码分析 - ByteBuf
Netty源码分析 - 粘包和拆包问题

开篇

  • 这篇文章的主要目的是分析NioEventLoop的事件处理线程的启动流程 以及 NioEventLoop如何处理消息事件的流程

  • NioEventLoop线程启动的整体思路在于NioSocketChannel第一次注册到NioEventLoop的时候,会执行eventLoop.execute()方法进入NioEventLoop的执行流程当中,按照NioEventLoop#execute => NioEventLoop#startThread => NioEventLoop#doStartThread顺序最终往NioEventLoop的ExecutorService提交一个事件处理任务,该任务负责处理事件消息和注册任务。

  • NioEventLoop的事件处理流程的整体思路在事件处理线程获取事件selectedKeys,遍历selectedKeys挨个进行处理,每个selectedKey绑定了对应的channel对象,进入channel对象的处理流程当中。

NioEventLoop 线程启动

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    protected abstract class AbstractUnsafe implements Unsafe {

        public final void register(EventLoop eventLoop, final ChannelPromise promise) {

            // 省略部分代码
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    // 执行SingleThreadEventExecutor#execute会启动工作线程
                    // 这里的Runnable对象就是NioEventLoop的工作线程,负责处理事件
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                }
            }
        }
    }
}
  • AbstractChannel#register负责将channel注册到eventLoop当中,即NioEventLoop当中。
  • eventLoop#execute负责执行NioEventLoop的execute方法,即父类SingleThreadEventExecutor#execute。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    private volatile Thread thread;
    private final Queue<Runnable> taskQueue;

    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }

    public void execute(Runnable task) {
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            // 添加task任务
            addTask(task);
        } else {
            // 启动NioEventLoop对应的工作线程
            startThread();
            // 添加task任务
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
    }

    // 负责把task添加到taskQueue当中
    protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!offerTask(task)) {
            reject(task);
        }
    }

    final boolean offerTask(Runnable task) {
        if (isShutdown()) {
            reject();
        }
        return taskQueue.offer(task);
    }


    // 启动NioEventLoop对应的线程
    private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }
    // 通过ExecutorService来启动NioEventLoop的任务处理线程对象
    // 
    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    // 执行run方法来处理NioEventLoop事件
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                  // 省略代码
                }
            }
        });
    }
}
  • SingleThreadEventExecutor#execute方法内部会执行addTask和startThread两个方法。
  • addTask方法负责往NioEventLoop添加任务,如Channel注册到NioEventLoop的任务。
  • startThread方法通过调用doStartThread来启动NioEventLoop任务处理线程,即通过NioEventLoop的executor.execute()来启动一个任务实现的。
  • NioEventLoop的事件处理任务的Runnable对象的run()方法内部会执行SingleThreadEventExecutor.this.run()进行事件处理的循环。
  • SingleThreadEventExecutor.this指代的是NioEventLoop对象,执行NioEventLoop#run方法。

EventLoop 事件处理

NioEventLoop#run

public final class NioEventLoop extends SingleThreadEventLoop {

    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            return selectNow();
        }
    };

    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        // 先处理事件
                        processSelectedKeys();
                    } finally {
                        // 再处理任务
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        // 先处理事件
                        processSelectedKeys();
                    } finally {
                        // 再处理任务
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }

            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
}
  • NioEventLoop#run的processSelectedKeys方法负责处理NioEventLoop的事件。
  • NioEventLoop#run的runAllTasks方法负责处理NioEventLoop的任务。

NioEventLoop#processSelectedKeys

public final class NioEventLoop extends SingleThreadEventLoop {

    private SelectedSelectionKeySet selectedKeys;

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            selectedKeys[i] = null;
            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                for (;;) {
                    i++;
                    if (selectedKeys[i] == null) {
                        break;
                    }
                    selectedKeys[i] = null;
                }

                selectAgain();
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }

    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        if (selectedKeys.isEmpty()) {
            return;
        }

        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }
}
  • NioEventLoop#processSelectedKeys方法负责处理NioEventLoop事件,分为processSelectedKeysOptimized和processSelectedKeysPlain。
  • NioEventLoop#processSelectedKeysOptimized处理优化的selector的消息事件。
  • NioEventLoop#processSelectedKeysPlain处理没经过优化的selector的消息事件。
  • NioEventLoop#processSelectedKey的进行真正的事件处理流程,Nio事件参数SelectionKey对象包含了AbstractNioChannel对象。
  • 一个Selector中可能被绑定上了成千上万个Channel, 通过K+attachment 的手段, 精确的取出发生指定事件的channel, 进而获取channel中的unsafe类进行下一步处理。

NioEventLoop#processSelectedKey

public final class NioEventLoop extends SingleThreadEventLoop {

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        // 省略代码
        try {
            int readyOps = k.readyOps();
            // 处理OP_CONNECT事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
            // 处理OP_WRITE事件
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }
            // 处理OP_READ和OP_ACCEPT事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
}
  • NioEventLoop#processSelectedKey负责处理不同类型的事件。
  • OP_CONNECT连接事件,OP_WRITE写事件,OP_READ读事件,OP_ACCEPT接受事件。
  • AbstractNioChannel#unsafe返回的AbstractNioChannel.NioUnsafe对象,由unsafe继续处理事件。

SingleThreadEventExecutor#runAllTasks

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        // 从NioEventLoop的taskQueue获取代处理的任务
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            // 安全执行任务
            safeExecute(task);
            runTasks ++;

            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }


    protected boolean runAllTasks() {
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;

        do {
            fetchedAll = fetchFromScheduledTaskQueue();
            if (runAllTasksFrom(taskQueue)) {
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        afterRunningAllTasks();
        return ranAtLeastOne;
    }
}
  • runAllTasks负责从NioEventLoop的taskQueue获取代处理的任务并进行执行。
  • runAllTasks还是由NioEventLoop的事件处理线程来进行处理的。

NioEventLoop#openSelector

public final class NioEventLoop extends SingleThreadEventLoop {

    private SelectedSelectionKeySet selectedKeys;

    private Selector openSelector() {
        final Selector selector;
        try {
            selector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        if (DISABLE_KEYSET_OPTIMIZATION) {
            return selector;
        }

        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

        if (!(maybeSelectorImplClass instanceof Class) ||
                // ensure the current selector implementation is what we can instrument.
                !((Class<?>) maybeSelectorImplClass).isAssignableFrom(selector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", selector, t);
            }
            return selector;
        }

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    selectedKeysField.setAccessible(true);
                    publicSelectedKeysField.setAccessible(true);

                    selectedKeysField.set(selector, selectedKeySet);
                    publicSelectedKeysField.set(selector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                } catch (RuntimeException e) {
                }
            }
        });

        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
        } else {
            selectedKeys = selectedKeySet;
            logger.trace("instrumented a special java.util.Set into: {}", selector);
        }

        return selector;
    }
}
  • 创建出SelectedSelectionKeySet的实例 selectedKeySet。
  • 使用反射,将 unwrappedSelector 中的 selectedKeysField字段,替换成 selectedKeySet。
  • 最后一步, 也很重要 selectedKeys = selectedKeySet。
  • 现在想获取装有感兴趣Key的 HashSet集合,已经不可能了,取而代之的是更优秀的selectedKeySet,也就是下面我们使用的selectedKeys ,于是我们想处理感性趣的事件,直接从selectedKeys中取, Selector轮询到感兴趣的事件,也会直接往selectedKeys中放。

相关文章

网友评论

    本文标题:Netty源码分析 - NioEventLoop事件处理

    本文链接:https://www.haomeiwen.com/subject/clyjphtx.html