美文网首页
Netty创建NioEventLoopGroup简单分析

Netty创建NioEventLoopGroup简单分析

作者: 梦想实现家_Z | 来源:发表于2020-03-27 23:26 被阅读0次
NioEventLoopGroup parentGroup = new NioEventLoopGroup(1);

进来看看构造函数:

public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor)null);
    }

好吧,直接看最根本的:

public NioEventLoopGroup(int nThreads) {
        super(nThreads, (Executor)null, new Object[]{SelectorProvider.provider(), DefaultSelectStrategyFactory.INSTANCE, RejectedExecutionHandlers.reject()});
    }

现在进入父类:MultithreadEventLoopGroup:

// 获取“io.netty.eventLoopThreads”的值,没有的话,就设置成可用的处理器数量的2倍
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
     // 没有设置线程数的话,就是默认为可用的处理器数量的2倍
         super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

再深入一步,进入MultithreadEventExecutorGroup:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

好吧,最终的实现在这里:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
            // 用来创建终止监听器(不是重点)
        this.terminatedChildren = new AtomicInteger();
            // 用来处理终止事件(不是重点)
        this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
            // 保证nThreads参数是合理的
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        } else {
            if (executor == null) {
                // 注意这个
                // 创建一个任务执行器,因为executor确实就是null,所以会调用这一步。
                executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
            }
            // 初始化EventExecutor数组长度,也就是说,NioEventLoopGroup会创建一个指定线程个数的EventExecutor数组,不难猜想,一个线程对应数组中的一个EventExecutor
            this.children = new EventExecutor[nThreads];

            int j;
            // 下面就是给EventExecutor数组的每一个位置放上实例对象
            for(int i = 0; i < nThreads; ++i) {
                boolean success = false;
                boolean var18 = false;

                try {
                    var18 = true;
                    // 注意这个
                    // 这里就是给每一个索引位置赋值
                    this.children[i] = this.newChild((Executor)executor, args);
                    success = true;
                    var18 = false;
                } catch (Exception var19) {
                    throw new IllegalStateException("failed to create a child event loop", var19);
                } finally {
                    if (var18) {
                        if (!success) {
                            int j;
                            // 失败的话,就挨个“优雅关闭”
                            for(j = 0; j < i; ++j) {
                                this.children[j].shutdownGracefully();
                            }
                            // 还要挨个检查一下是否所有的EventExecutor里面的任务都已经完成
                            for(j = 0; j < i; ++j) {
                                EventExecutor e = this.children[j];

                                try {
                                    // 如果没有完成
                                    while(!e.isTerminated()) {
                                        // 等待所有任务完成
                                        e.awaitTermination(2147483647L, TimeUnit.SECONDS);
                                    }
                                } catch (InterruptedException var20) {
                                    Thread.currentThread().interrupt();
                                    break;
                                }
                            }
                        }

                    }
                }
                // 下面这一段的逻辑和上面一样,也是判断创建失败的情况
                if (!success) {
                    for(j = 0; j < i; ++j) {
                        this.children[j].shutdownGracefully();
                    }

                    for(j = 0; j < i; ++j) {
                        EventExecutor e = this.children[j];

                        try {
                            while(!e.isTerminated()) {
                                e.awaitTermination(2147483647L, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException var22) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
            // 好了,EventExecutor数组准备好了,现在把它装在一个分发器上面,分发规则由this.chooser来决定
            this.chooser = chooserFactory.newChooser(this.children);
            // 这个很明显,创建了一个监听器
            FutureListener<Object> terminationListener = new FutureListener<Object>() {
                public void operationComplete(Future<Object> future) throws Exception { 
                    // 这个很好理解,就是看terminatedChildren增长后的位置是否和数组的长度一致,说白了,就是判断是否终止
                    if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {
                        MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null);
                    }

                }
            };
            EventExecutor[] var24 = this.children;
            j = var24.length;
            // 把上面创建的监听器给每一个EventExecutor,监听终止事件
            for(int var26 = 0; var26 < j; ++var26) {
                EventExecutor e = var24[var26];
                e.terminationFuture().addListener(terminationListener);
            }
            // 把EventExecutor数组放进一个不可修改的集合中
            Set<EventExecutor> childrenSet = new LinkedHashSet(this.children.length);
            Collections.addAll(childrenSet, this.children);
            this.readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    }

关于上面的一个chooser:

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() {
    }

    // 根据EventExecutor的数量来决定选用哪种分发器,
    // 2的n次方就用PowerOfTwoEventExecutorChooser
    // 否则就用GenericEventExecutorChooser
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        return (EventExecutorChooser)(isPowerOfTwo(executors.length) ? new DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser(executors) : new DefaultEventExecutorChooserFactory.GenericEventExecutorChooser(executors));
    }
  
    // 判断是不是2的n次方
    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }
        // 主要是这个方法,不是2的n次方就通过取模的方式获取下一个NioEventLoop
        public EventExecutor next() {
            return this.executors[Math.abs(this.idx.getAndIncrement() % this.executors.length)];
        }
    }

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }
        // 2的n次方,就通过相与,相对比取模效率高,建议使用的时候数量设置为2的n次方
        public EventExecutor next() {
            return this.executors[this.idx.getAndIncrement() & this.executors.length - 1];
        }
    }
}
好吧,总结一下,new NioEventLoopGroup(n)就是创建了一个n个元素的EventExecutor数组。大概就是这样: image-20200327230759313.png

EventExecutor是个啥呢?

咱们先来看看ThreadPerTaskExecutor是个啥:

// 利用ThreadFactory创建一个ThreadPerTaskExecutor
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = (ThreadFactory)ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }
        // 正儿八经地执行还是交给了threadFactory创建出来的线程,说白了,就是“外包”给ThreadFactory去创建线程执行
    public void execute(Runnable command) {
        this.threadFactory.newThread(command).start();
    }

threadFactory就是这个DefaultThreadFactory的实例对象:

public Thread newThread(Runnable r) {
                // 这里就是对jdk原生的Thread做了一层包装,反正还是一个Thread
        Thread t = this.newThread(FastThreadLocalRunnable.wrap(r), this.prefix + this.nextId.incrementAndGet());

        try {
            if (t.isDaemon() != this.daemon) {
                t.setDaemon(this.daemon);
            }

            if (t.getPriority() != this.priority) {
                t.setPriority(this.priority);
            }
        } catch (Exception var4) {
        }

        return t;
    }

protected Thread newThread(Runnable r, String name) {
        return new FastThreadLocalThread(this.threadGroup, r, name);
    }

所以说,这个ThreadPerTaskExecutor就是一个专门执行任务的执行器

现在,咱们可以回到如何创建EventExecutor了:

this.children[i] = this.newChild((Executor)executor, args);

在MultithreadEventExecutorGroup中,newChild方法是一个抽象方法:

protected abstract EventExecutor newChild(Executor var1, Object... var2) throws Exception;

所以咱们还是要回到子类NioEventLoopGroup上:

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            // 如果args里面有四个参数,那么最后一个参数就是EventLoopTaskQueueFactory,否则,就没有设置EventLoopTaskQueueFactory。根据上下文可以知道,args只有三个元素,所以queueFactory=null
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory)args[3] : null;
            // 调用构造函数创建一个NioEventLoop,说明NioEventLoop是EventExecutor的子类
        return new NioEventLoop(this, executor, (SelectorProvider)args[0], ((SelectStrategyFactory)args[1]).newSelectStrategy(), (RejectedExecutionHandler)args[2], queueFactory);
    }

进一步:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler);
            // 这个可以不看,断言后赋值
        this.provider = (SelectorProvider)ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
            // 这个可以不看,断言后赋值
        this.selectStrategy = (SelectStrategy)ObjectUtil.checkNotNull(strategy, "selectStrategy");
            //需要注意,这是优化点
        NioEventLoop.SelectorTuple selectorTuple = this.openSelector();
            // 这是优化后的Selector
        this.selector = selectorTuple.selector;
            // 这是jdk的Selector
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }

进父类构造函数看看:

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
            // 这个tailTasks=null
        this.tailTasks = (Queue)ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
    }

再进一步:

// 这就是一个线程池,它的作用就是从任务队列中不断地取出任务执行。详细分析可了解线程池的工作原理
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.threadLock = new CountDownLatch(1);
        this.shutdownHooks = new LinkedHashSet();
        this.state = 1;
        this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
        this.executor = ThreadExecutorMap.apply(executor, this);
        this.taskQueue = (Queue)ObjectUtil.checkNotNull(taskQueue, "taskQueue");
        this.rejectedExecutionHandler = (RejectedExecutionHandler)ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
至此,我们可以得出NioEventLoop就是一个干活的线程池,但是它又不是一个普通的线程池,它是一个专门处理监听selector事件的线程池。 image-20200327222418277.png

也就是说,new NioEventLoopGroup(2)就是创建了一个线程池数组,里面有两个线程池。

下面来看一下优化点:

private NioEventLoop.SelectorTuple openSelector() {
        final AbstractSelector unwrappedSelector;
        try {
            unwrappedSelector = this.provider.openSelector();
        } catch (IOException var7) {
            throw new ChannelException("failed to open a new selector", var7);
        }

        if (DISABLE_KEY_SET_OPTIMIZATION) {
            return new NioEventLoop.SelectorTuple(unwrappedSelector);
        } else {
            Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                public Object run() {
                    try {
                        return Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
                    } catch (Throwable var2) {
                        return var2;
                    }
                }
            });
            if (maybeSelectorImplClass instanceof Class && ((Class)maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
                final Class<?> selectorImplClass = (Class)maybeSelectorImplClass;
                final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
              // 下面是通过反射修改原生selector中的两个属性值:selectedKeys和publicSelectedKeys
              // 将本来是HashSet的数据结构,修改成了数组
              // 目的就是当io事件变多时,HashSet数据量会变大,也意味着更容易产生hash冲突导致产生链表
              // 链表的add操作的时间复杂度相对于数组的操作来说,就不是一倍两倍了
                Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                    public Object run() {
                        try {
                            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                            if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                                long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                                long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField);
                                if (selectedKeysFieldOffset != -1L && publicSelectedKeysFieldOffset != -1L) {
                                    PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                                    PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                                    return null;
                                }
                            }

                            Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                            if (cause != null) {
                                return cause;
                            } else {
                                cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                                if (cause != null) {
                                    return cause;
                                } else {
                                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                                    return null;
                                }
                            }
                        } catch (NoSuchFieldException var7) {
                            return var7;
                        } catch (IllegalAccessException var8) {
                            return var8;
                        }
                    }
                });
                if (maybeException instanceof Exception) {
                    this.selectedKeys = null;
                    Exception e = (Exception)maybeException;
                    logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
                    return new NioEventLoop.SelectorTuple(unwrappedSelector);
                } else {
                    this.selectedKeys = selectedKeySet;
                    logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
                    return new NioEventLoop.SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
                }
            } else {
                if (maybeSelectorImplClass instanceof Throwable) {
                    Throwable t = (Throwable)maybeSelectorImplClass;
                    logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
                }

                return new NioEventLoop.SelectorTuple(unwrappedSelector);
            }
        }
    }

看看优化的本质,在SelectedSelectionKeySet中:

// 时间复杂度就是O(1),常数级别
public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        } else {
                // 直接后面追加
            this.keys[this.size++] = o;
            if (this.size == this.keys.length) {
                // 扩容
                this.increaseCapacity();
            }
            return true;
        }
    }

private void increaseCapacity() {
            // 不够就翻倍
        SelectionKey[] newKeys = new SelectionKey[this.keys.length << 1];
        System.arraycopy(this.keys, 0, newKeys, 0, this.size);
        this.keys = newKeys;
    }

以上就是我对于new NioEventLoopGroup()操作的简单分析。

相关文章

网友评论

      本文标题:Netty创建NioEventLoopGroup简单分析

      本文链接:https://www.haomeiwen.com/subject/objduhtx.html