netty源码分析-线程池

作者: 数齐 | 来源:发表于2017-09-09 21:23 被阅读241次

    众所周知,netty是一款性能非常出色的nio框架,作为dubbo等众多优秀项目底层的数据传输框架,研究吃透它,对于我们今后的开发是绝对有益无害的,所以从今天开始我们就研究netty。本次分析基于netty4,请诸位看官自行下载jar包及源码。好了,我们今天说一下netty的线程池。
    我们经常会看到netty的代码中有下面这一句。

    EventLoopGroup workerGroup = new NioEventLoopGroup();
    

    简单的new了一个事件的处理组(也没看官方怎么解释这个概念的,自己定义了一下吧,勿喷)。但他里面所作的事情却远不止看到的这么简单。这也是我们阅读源码的一个准则,不要忽略每一个你认为的不起眼的代码,也许他的作用是举足轻重的。他的具体实现

    ### io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup()
    /**
     * Create a new instance using the default number of threads, the default {@link ThreadFactory} and
     * the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     */
    public NioEventLoopGroup() {
        this(0);
    }
    
    ### io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    
    ### MultithreadEventLoopGroup.java:39
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    

    ### 表示代码出自的类及方法名,因为他们的相关性很大,所以我就放到同一个代码块中,防止思维跳跃太大,大家跟不上节奏。虽然现在初始化的时候设置了线程数为0,但是并不是最后的结果,经过了诸多的构造函数的调用及父类构造函数的引用,在这里做了一个转化,当为0时,会取DEFAULT_EVENT_LOOP_THREADS的值,而他的值,他取的是,如果设置io.netty.eventLoopThreads的值就取这个值,没有设置的话,会取默认值可用核数的两倍,同1比较去一个最大的进行赋值。最后我们到达了最好的构造函数

    ### io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
    
    /**
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param executor          the Executor to use, or {@code null} if the default should be used.
     * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
    
        if (executor == null) {   //如何executor为空,那么设置默认执行器为ThreadPerTaskExecutor
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
    
        children = new EventExecutor[nThreads];   //MultithreadEventExecutorGroup是一个总的管理的类,具体和线程相关的都交给他的children进行处理,是一个EventExecutor的数组
    
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);   //初始化每一个EventExecutor的实例,下面会有详细解释
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }
    
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
    
        chooser = chooserFactory.newChooser(children);  //创建线程的选择器,选择是有哪个线程来处理
    
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
    
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
    
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);   //将这些子处理器设置为只读,不能添加
    }
    
    

    在上面的代码中都有关键步骤的注释,总的来说就是最后的脏活累活都不是这个Group干的,都交给自己内部的children来干,都交给EventExecutor来干,我们看一下这个EventExecutor是如何实例化的

    ### io.netty.channel.nio.NioEventLoopGroup#newChild
    
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
    
    ### io.netty.channel.nio.NioEventLoop#NioEventLoop
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
    

    EventExecutor是使用NioEventLoop进行初始化的,这个NioEventLoop是SingleThreadEventLoop的子类,所以super调用的是SingleThreadEventLoop的构造方法

    
    /**
     * Create a new instance
     *
     * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
     * @param executor          the {@link Executor} which will be used for executing
     * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
     *                          executor thread
     * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
     * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
     */
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks);    //设置任务的处理队列,后续任务会添加到这里面
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
    

    OK,每一个处理子事件处理器都会有一个任务的队列。目前为止线程池的初始化就告一段落了,感觉没过瘾,咱们就下一篇见。

    相关文章

      网友评论

        本文标题:netty源码分析-线程池

        本文链接:https://www.haomeiwen.com/subject/rppajxtx.html