美文网首页
Netty中的NioEventLoopGroup类实例化

Netty中的NioEventLoopGroup类实例化

作者: small瓜瓜 | 来源:发表于2019-06-15 20:04 被阅读0次
    NioEventLoopGroup类的继承结构

    首先我们用无参构造方法创建NioEventLoopGroup的实例

    NioEventLoopGroup bossGroup = new NioEventLoopGroup();
    

    1. NioEventLoopGroup源码

        public NioEventLoopGroup() {
            this(0);
        }
      
        public NioEventLoopGroup(int nThreads) {
            this(nThreads, (Executor) null);
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor) {
            this(nThreads, executor, SelectorProvider.provider());
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor) {
            this(nThreads, executor, SelectorProvider.provider());
        }
    
        public NioEventLoopGroup(
                int nThreads, Executor executor, final SelectorProvider selectorProvider) {
            this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
        }
    

    通过上面的一系列的构造方法的调用,可以看出
    nThreads = 0
    executor = null
    selectorProvider = SelectorProvider.provider()
    selectStrategyFactory=DefaultSelectStrategyFactory.INSTANCE
    ejectedExecutionHandlers.reject()
    最后传入上面的参数,调用其父类MultithreadEventLoopGroup的构造方法

    2. MultithreadEventLoopGroup源码

        private static final int DEFAULT_EVENT_LOOP_THREADS;
        static {
            DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
            if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
            }
        }
        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
        }
    

    从这个构造方法中,进行了对nThreads的三目运算,因为nThreads=0,所以nThreads被赋值为DEFAULT_EVENT_LOOP_THREADS,我们来看看这个值多少呢?
    具体的赋值语句是
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2))
    NettyRuntime.availableProcessors() * 2 的结果是本机处理器核数的两倍,在默认情况下,系统中不存在以"io.netty.eventLoopThreads"为key的键值对。所以最终结果为核数的两倍。(如果手动设置"io.netty.eventLoopThreads"的值,这最后的结果是取设置的值和1中最大的值)
    下一个该注意的点:这里构造方法通过一个可变长的参数,将
    selectorProvider
    selectStrategyFactory
    ejectedExecutionHandlers.reject()
    都转成了Object类型

    注意:这里是一个重点,NioEventLoopGroup默认初始化多少线程数:处理器核数的两倍
    我们再接着看调用的父类的构造方法

    3. MultithreadEventExecutorGroup源码(这段代码较长,详细分析就直接在源码中了)

        private final EventExecutor[] children;
        private final Set<EventExecutor> readonlyChildren;
        private final AtomicInteger terminatedChildren = new AtomicInteger();
        private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
        private final EventExecutorChooserFactory.EventExecutorChooser chooser;
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    /*
    nThreads = 两倍的处理器核数
    executor = null
    selectorProvider = SelectorProvider.provider()
    selectStrategyFactory=DefaultSelectStrategyFactory.INSTANCE
    ejectedExecutionHandlers.reject()
    */
            this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
        }
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
    // nThreads 大于等于1
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    // executor 等于 null
            if (executor == null) {
    // 下面为executor进行初始化
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    // children初始化
            children = new EventExecutor[nThreads];
    // 通过for循环为children赋值
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
    // 这里我们来看看,newChild方法做了什么,在本类中,该方法是抽象的,实现类为NioEventLoopGroup
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    // 这里生成了一个EventExecutorChooser 实例,我看看newChooser方法吧
            chooser = chooserFactory.newChooser(children);
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
        
        protected ThreadFactory newDefaultThreadFactory() {
    // 将本类的Class对象传入
            return new DefaultThreadFactory(getClass());
        }
    
        protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
    

    DefaultThreadFactory源码

        private static final AtomicInteger poolId = new AtomicInteger();
        private final AtomicInteger nextId = new AtomicInteger();
        private final String prefix;
        private final boolean daemon;
        private final int priority;
        protected final ThreadGroup threadGroup;
    
        public DefaultThreadFactory(Class<?> poolType) {
            this(poolType, false, Thread.NORM_PRIORITY);
        }
        public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
            this(toPoolName(poolType), daemon, priority);
        }
        
        public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
            this(poolName, daemon, priority, System.getSecurityManager() == null ?
                    Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
        }
    
        public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
    /*
    poolName = "multithreadEventExecutorGroup"
    daemon = false
    priority = Thread.NORM_PRIORITY (也就是5,一般等级)
    threadGroup线程组对象
    */
            if (poolName == null) {
                throw new NullPointerException("poolName");
            }
            if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
                throw new IllegalArgumentException(
                        "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
            }
    // 下面就是一些简单的赋值操作了,设置好了这个线程工厂的线程前缀名称,域,优先级,线程组
            prefix = poolName + '-' + poolId.incrementAndGet() + '-';
            this.daemon = daemon;
            this.priority = priority;
            this.threadGroup = threadGroup;
        }
    
        public static String toPoolName(Class<?> poolType) {
    // poolType = MultithreadEventExecutorGroup.class
            if (poolType == null) {
                throw new NullPointerException("poolType");
            }
    // poolName为"MultithreadEventExecutorGroup",字符串长度大于1,所以直接进入第三个判断分支
            String poolName = StringUtil.simpleClassName(poolType);
            switch (poolName.length()) {
                case 0:
                    return "unknown";
                case 1:
                    return poolName.toLowerCase(Locale.US);
                default:
    // 通过这段代码,主要是将"MultithreadEventExecutorGroup"第一个字母转成小写,所以最后的返回为"multithreadEventExecutorGroup"
                    if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {
                        return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);
                    } else {
                        return poolName;
                    }
            }
        }
    
        @Override
        public Thread newThread(Runnable r) {
            Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
            try {
                if (t.isDaemon() != daemon) {
                    t.setDaemon(daemon);
                }
    
                if (t.getPriority() != priority) {
                    t.setPriority(priority);
                }
            } catch (Exception ignored) {
                // Doesn't matter even if failed to set.
            }
            return t;
        }
    

    StringUtil源码

    private static final char PACKAGE_SEPARATOR_CHAR = '.';
    public static String simpleClassName(Class<?> clazz) {
    /* 
    clazz = MultithreadEventExecutorGroup.class
    所以className = "io.netty.util.concurrent.MultithreadEventExecutorGroup"
    最后返回的 className.substring(lastDotIdx + 1)为
    "MultithreadEventExecutorGroup"
    */
            String className = checkNotNull(clazz, "clazz").getName();
            final int lastDotIdx = className.lastIndexOf(PACKAGE_SEPARATOR_CHAR);
            if (lastDotIdx > -1) {
                return className.substring(lastDotIdx + 1);
            }
            return className;
        }
    

    ThreadPerTaskExecutor源码

    // 一个线程工厂,可以直接通过execute执行任务
    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
    // DefaultThreadFactory
            this.threadFactory = threadFactory;
        }
    
        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start();
        }
    }
    

    NioEventLoopGroup源码

    @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    /*
    executor 为 ThreadPerTaskExecutor的实例
    args[0] = SelectorProvider.provider()
    args[1] = DefaultSelectStrategyFactory.INSTANCE
    args[2] = ejectedExecutionHandlers.reject()
    */
    // 这里创建了一个NioEventLoop实例
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
        }
    

    NioEventLoop源码

        public final class NioEventLoop extends SingleThreadEventLoop {
    
        private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
    
        private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
    
        private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
                SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
    
        private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
        private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
    
        private final IntSupplier selectNowSupplier = new IntSupplier() {
            @Override
            public int get() throws Exception {
                return selectNow();
            }
        };
    
        // Workaround for JDK NIO bug.
        //
        // See:
        // - http://bugs.sun.com/view_bug.do?bug_id=6427854
        // - https://github.com/netty/netty/issues/203
        static {
            final String key = "sun.nio.ch.bugLevel";
            final String bugLevel = SystemPropertyUtil.get(key);
            if (bugLevel == null) {
                try {
                    AccessController.doPrivileged(new PrivilegedAction<Void>() {
                        @Override
                        public Void run() {
                            System.setProperty(key, "");
                            return null;
                        }
                    });
                } catch (final SecurityException e) {
                    logger.debug("Unable to get/set System Property: " + key, e);
                }
            }
    
            int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
            if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
                selectorAutoRebuildThreshold = 0;
            }
    
            SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
    
            if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
                logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
            }
        }
    
        /**
         * The NIO {@link Selector}.
         */
        private Selector selector;
        private Selector unwrappedSelector;
        private SelectedSelectionKeySet selectedKeys;
    
        private final SelectorProvider provider;
    
        /**
         * Boolean that controls determines if a blocked Selector.select should
         * break out of its selection process. In our case we use a timeout for
         * the select method and the select method will block for that time unless
         * waken up.
         */
        private final AtomicBoolean wakenUp = new AtomicBoolean();
    
        private final SelectStrategy selectStrategy;
    
        private volatile int ioRatio = 50;
        private int cancelledKeys;
        private boolean needsToSelectAgain;
          NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            }
    
            provider = selectorProvider;
    // 这里我们可以看到,这个是java.nio.channels.Selector,其实我们注册的ClannelHandler,就是注册在这里
            final SelectorTuple selectorTuple = openSelector();
            selector = selectorTuple.selector;
            unwrappedSelector = selectorTuple.unwrappedSelector;
            selectStrategy = strategy;
        }
        
        private SelectorTuple openSelector() {
            final Selector unwrappedSelector;
            try {
                unwrappedSelector = provider.openSelector();
            } catch (IOException e) {
                throw new ChannelException("failed to open a new selector", e);
            }
    
            if (DISABLE_KEY_SET_OPTIMIZATION) {
                return new SelectorTuple(unwrappedSelector);
            }
    
            Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        return Class.forName(
                                "sun.nio.ch.SelectorImpl",
                                false,
                                PlatformDependent.getSystemClassLoader());
                    } catch (Throwable cause) {
                        return cause;
                    }
                }
            });
    
            if (!(maybeSelectorImplClass instanceof Class) ||
                // ensure the current selector implementation is what we can instrument.
                !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
                if (maybeSelectorImplClass instanceof Throwable) {
                    Throwable t = (Throwable) maybeSelectorImplClass;
                    logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
                }
                return new SelectorTuple(unwrappedSelector);
            }
    
            final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
            final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
            Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    
                        if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                            // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                            // This allows us to also do this in Java9+ without any extra flags.
                            long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                            long publicSelectedKeysFieldOffset =
                                    PlatformDependent.objectFieldOffset(publicSelectedKeysField);
    
                            if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                                PlatformDependent.putObject(
                                        unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                                PlatformDependent.putObject(
                                        unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                                return null;
                            }
                            // We could not retrieve the offset, lets try reflection as last-resort.
                        }
    
                        Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                        if (cause != null) {
                            return cause;
                        }
                        cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                        if (cause != null) {
                            return cause;
                        }
    
                        selectedKeysField.set(unwrappedSelector, selectedKeySet);
                        publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                        return null;
                    } catch (NoSuchFieldException e) {
                        return e;
                    } catch (IllegalAccessException e) {
                        return e;
                    }
                }
            });
    
            if (maybeException instanceof Exception) {
                selectedKeys = null;
                Exception e = (Exception) maybeException;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
                return new SelectorTuple(unwrappedSelector);
            }
            selectedKeys = selectedKeySet;
            logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
            return new SelectorTuple(unwrappedSelector,
                                     new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
        }
    

    DefaultEventExecutorChooserFactory源码

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
            if (isPowerOfTwo(executors.length)) {
                return new PowerOfTwoEventExecutorChooser(executors);
            } else {
                return new GenericEventExecutorChooser(executors);
            }
        }
    
        private static boolean isPowerOfTwo(int val) {
            return (val & -val) == val;
        }
    
        private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
            private final AtomicInteger idx = new AtomicInteger();
            private final EventExecutor[] executors;
    
            PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
                this.executors = executors;
            }
    
            @Override
            public EventExecutor next() {
                return executors[idx.getAndIncrement() & executors.length - 1];
            }
        }
    
        private static final class GenericEventExecutorChooser implements EventExecutorChooser {
            private final AtomicInteger idx = new AtomicInteger();
            private final EventExecutor[] executors;
    
            GenericEventExecutorChooser(EventExecutor[] executors) {
                this.executors = executors;
            }
    
            @Override
            public EventExecutor next() {
                return executors[Math.abs(idx.getAndIncrement() % executors.length)];
            }
        }
    

    就不再深入下去了,本文就到这里吧。

    相关文章

      网友评论

          本文标题:Netty中的NioEventLoopGroup类实例化

          本文链接:https://www.haomeiwen.com/subject/awyjfctx.html