美文网首页bugstac...
Netty源码分析-03 Netty线程池

Netty源码分析-03 Netty线程池

作者: 史圣杰 | 来源:发表于2019-03-26 16:35 被阅读0次

    线程池是一个在多线程场景中运用很广泛的并发框架,需要异步执行或并发执行任务的程序都可以使用线程池。有任务到来时,如果不使用线程池,我们需要不断的创建/销毁线程,还需要对线程进行管理;而使用线程池,直接将任务提交到线程池即可。使用线程池有几个好处:无需重复创建/销毁线程,降低资源消耗;提高程序响应速度;提高线程的可管理性。

    3.1 实现原理

    线程池内部一般包含一个核心线程池,其内部的线程在创建之后一般不会销毁,执行完任务后线程会阻塞等待新任务到来。
    当向线程池提交任务时,线程池会做如下判断:

    • 核心线程池未满,创建线程执行任务
    • 核心线程池已满,若等待队列未满,则加入到等待队列;若等待队列已满但线程池未满,创建新线程执行任务;若等待队列和线程池均已满,则按照指定策略退出/拒绝任务/丢弃任务等。
    线程池执行流程

    了解了实现原理,我们先来自己实现一个线程池,首先定义线程池的接口

    ThreadPool
    线程池的接口里面最重要的方法是execute执行任务

    public interface ThreadPool<Job extends Runnable> {
        //提交一个Job,这个Job需要实现Runnable接口
        void execute(Job job);
        //关闭线程池
        void shutdown();
        //增加工作者线程
        void addWorkers(int num);
        //减少工作者线程
        void removeWorker(int num);
        //得到正在等待执行的任务数量
        int getJobSize();
    }
    

    CommonThreadPool
    在实现线程池时,我们需要定义线程池的大小,以及保存任务的列表jobs,下面是变量定义:

        // 线程池最大限制数
        private static final int MAX_WORKER_NUMBERS = 100;
        // 线程池默认的数量
        private static final int DEFAULT_WORKER_NUMBERS = 1;
        // 线程池最小数量
        private static final int MIN_WORKER_NUMBERS = 1;
        // 工作列表
        private final LinkedList<Job> jobs = new LinkedList<Job>();
    

    在线程池初始化时,我们要将核心线程池进行初始化,创建多个Worker线程,然后启动Worker线程。

    // num 为DEFAULT_WORKER_NUMBERS 默认线程池大小
    private void initializeWokers(int num) {
            // 创建多个线程,加入workers中,并启动
            for (int i = 0; i < num; i++) {
                Worker worker = new Worker();
                workers.add(worker);
                Thread thread = new Thread(worker, "ThreadPool-Worker-"
                        + threadNum.getAndIncrement());
                thread.start();
            }
        }
    

    Worker启动后,一直没有任务,需要阻塞在jobs上(jobs是上面定义的任务列表),Worker等待任务到来后唤醒获取队列中的任务并执行。下面的代码中,如果jobs为空,则线程等待;

    // worker的代码,首先要获取jobs的锁,
    synchronized (jobs) {
                        while (jobs.isEmpty()) {// 如果jobs是空的,则执行jobs.wait,使用while而不是if,因为wait后可能已经为空了,需要继续等待
                            try {
                                jobs.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                                Thread.currentThread().interrupt();// 中断
                                return;// 结束
                            }
                        }
                        job = jobs.removeFirst();// 第一个job
                        if (job != null) {
                            try {
                                job.run();//注意,这里是run而不是start,传入的Job
                            } catch (Exception e) {
                                // 忽略Job执行中的Exception
                                e.printStackTrace();
                            }
                        }
                    }
    

    提交任务时,只需要将任务加入jobs中,然后通知worker线程即可。worker线程获得锁后会取第一个任务执行。执行完毕,若jobs为空,worker线程继续进行休眠等待任务到来。

    @Override
        public void execute(Job job) {
            if (job == null)
                return;
            synchronized (jobs) {
                jobs.addLast(job);
                jobs.notify();
            }
        }
    

    完整的代码可以查看https://github.com/ssj234/JavaStudy_IO/tree/master/IOResearch/src/net/ssj/pool

    3.2 Java的Executor框架

    Java平台本身提供了Executor框架用来帮助我们使用线程池。

    Executor框架

    Executor框架最核心的类是ThreadPoolExecutor,这是各个线程池的实现类,有如下几个属性:

    • corePool:核心线程池的大小 m
    • maximumPool:最大线程池的大小
    • keepAliveTime: 休眠等待时间
    • TimeUnit unit : 休眠等待时间单位,如微秒/纳秒等
    • BlockingQueue workQueue:用来保存任务的工作队列
    • ThreadFactory: 创建线程的工厂
    • RejectedExecutionHandler:当线程池已经关闭或线程池Executor已经饱和,execute()方法将要调用的Handler

    通过Executor框架的根据类Executors,可以创建三种基本的线程池:

    • FixedThreadPool
    • SingleThreadExecutor
    • CachedThreadPool

    FixedThreadPool

    FixedThreadPool被称为可重用固定线程数的线程池。

    // 获取fixedThreadPool
    ExecutorService fixedThreadPool=Executors.newFixedThreadPool(paramInt);
    
    //内部会调用下面的方法,参数 corePoolSize、maximumPoolSize、keepAliveTime、workQueue
    return new ThreadPoolExecutor(paramInt, paramInt, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
    

    FixedTheadPool设置的线程池大小和最大数量一样;keepAliveTime为0,代表多余的空闲线程会立刻终止;保存任务的队列使用LinkedBlockingQueue,当线程池中的线程执行完任务后,会循环反复从队列中获取任务来执行。
    FixedThreadPool适用于限制当前线程数量的应用场景,适用于负载比较重的服务器。

    SingleThreadExecutor

    SingleThreadExecutor的核心线程池数量corePoolSize和最大数量maximumPoolSize都设置为1,适用于需要保证顺序执行的场景

    ExecutorService singleThreadExecutor=Executors.newSingleThreadExecutor();
    
         return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
    

    CachedThreadPool

    CachedThreadPool是一个会根据需要创建新线程的线程池,适用于短期异步的小任务,或负载教轻的服务器。

    ExecutorService cachedThreadPool=Executors.newCachedThreadPool();
    
         return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
    

    SynchronousQueue是一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。corePoolSize是0,maximumPoolSize都最大,无界的。keepAliveTime为60秒,空闲线程超过60S会被终止。

    ScheduleThreadPoolExecutor

    ScheduleThreadPoolExecutor和Timer类似,可以设置延时执行或周期执行,但比Timer有更多的功能。Timer和TimerTask只创建一个线程,任务执行时间超过周期会产生一些问题。Timer创建的线程没有处理异常,因此一旦抛出非受检异常,会立刻终止。

    ScheduledThreadPoolExecutor executor=new ScheduledThreadPoolExecutor(5);
    //可以直接执行
    executor.execute(new JobTaskR("executor", 0));
    executor.execute(new JobTaskR("executor", 1));
    
    System.out.println("5S后执行executor3");
    //隔5秒后执行一次,但只会执行一次。
    executor.schedule(new JobTaskR("executor", 3), 5, TimeUnit.SECONDS);
    
    System.out.println("开始周期调度");
    //设置周期执行,初始时6S后执行,之后每2s执行一次
    executor.scheduleAtFixedRate(new JobTaskR("executor", 4), 6, 2, TimeUnit.SECONDS);
    

    scheduleAtFixedRate或者scheduleWithFixedDelay方法,它们不同的是前者以固定频率执行,后者以相对固定延迟之后执行。

    3.3 Netty的EventLoop与线程池

    Netty的事件循环和事件循环组的实现中,类的层级关系比较复杂,其底层是Java线程池的实现,不过在netty的实际使用中还是比较简单的,我们只需要使用如下的代码即可,

    EventLoopGroup bossGroup=new NioEventLoopGroup();
    EventLoopGroup workGroup=new NioEventLoopGroup();
    ServerBootstrap b=new ServerBootstrap();
    b.group(bossGroup,workGroup)//设置事件循环组
    

    Netty的事件循环机制有两个基本接口:EventLoop和EventLoopGroup。前者是事件循环,后者是由多个事件循环组成的组。
    EventLoop自身是一个不断循环执行的线程,以NioEventLoop为例,其继承了SingleThreadEventExecutor,内部的executor是创建NioEventLoop时传入的线程池,用来将run方法放入线程池中执行;此外还包含为一个TaskQueue,netty在处理io过程中的task可以提交到这个队列中,事件循环会不断获取task并执行,因此但其本身也可以看做一个线程池。
    NioEventLoop的run方法中,Nio的事件循环会不断select后获取任务并执行,然后根据ioRatio的设置执行TaskQueue的任务。NioEventLoop的execute方法中,其会将task加入到taskQueue等待事件循环执行。因此,我们可以将NioEventLoop当做一个不断执行的线程池,EventLoopGroup作为线程池组,线程池组的意义是采用给的的策略选取一个EventLoop并提交任务。

    EventLoop的定义如下,其继承了一个顺序执行的线程池接口和EventLoopGroup,也就是说EventLoop之间有父子关系,通过parent();返回任务循环组,通过next()选取一个事件循环。线程池组的register用于将Netty的Channel注册到事件循环中。

    public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
        @Override
        EventLoopGroup parent();
    }
    
    public interface EventLoopGroup extends EventExecutorGroup {
        EventLoop next();
        ChannelFuture register(Channel channel);
    }
    

    NioEventLoopGroup

    NioEventLoopGroup除了处理网络的异步I/O任务,还用于完成异步提交的系统任务。NioEventLoopGroup初始化时,有如下几个参数可以配置,主要用于设置线程池的相关配置。

    • nThreads 子线程池数量
    • Executor executor 用来执行事件循环的线程池
    • chooserFactory :next()时选择线程池的策略
    • selectorProvider 用于打开selector
    • selectStrategyFactory 用来控制select循环行为的策略
    • RejectedExecutionHandlers 线程池执行的异常处理策略
    public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                 final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                    RejectedExecutionHandlers.reject());
        }
    

    NioEventLoopGroup初始化过程为:

    1. 如果传入的executor 为空,会默认使用ThreadPerTaskExecutor,该线程池针对每个任务会创建一个线程,创建线程方式使用DefaultThreadFactory提供的newThread方法。
    2. 初始化开始,首先会根据创建nThread个子线程池,保存在childrens变量中,创建逻辑比较简单,将初始化NioEventLoopGroup时设置的参数传递给NioEventLoop对象。在创建子线程池NioEventLoop的过程中,如果一旦有失败的,就需要关闭已经创建的所有子线程池并等待这些线程池结束。
    3. 之后,使用chooserFactory创建chooser,用来在next()选择事件循环时从childrens变量选择一个返回。默认使用2的倍数的策略,也可以设置为顺序依次选择。
    4. 向组中所有的事件循环的terminationFuture注册事件,目的是等待所有事件循环结束后将事件循环组的terminatedChildren设置为成功完成。
    5. 最后,将children复制保存为一个只读的集合,保存在变量readonlyChildren中。

    至此,NioEventLoopGroup的初始化过程就结束了。我们可以看到,NioEventLoopGroup主要的用来聚合多个EventLoop,对其进行调度。

    NioEventLoop

    在NioEventLoopGroup的初始化过程中,会创建多个NioEventLoop,NioEventLoop用来执行实际的事件循环,初始化时有如下几个属性:

    • NioEventLoopGroup parent 线程池所在的Group

    • Executor executor 执行任务的线程池,默认是ThreadPerTaskExecutor

    • SelectorProvider selectorProvider 用来打开selector

    • SelectStrategy strategy 用来控制select循环行为的策略

    • RejectedExecutionHandlers 线程池执行的异常处理策略

    • addTaskWakesUp addTask(Runnable)添加任务时是否唤醒线程池,默认是false

    • maxPendingTasks 线程池中等待任务的最大数量

    • scheduledTaskQueue 保存定时任务的QUeue

    • tailTasks :保存任务的Queue,netty选择使用jctools的MpscChunkedArrayQueue,原因是为了提高效率,因为Nio线程池的线程消费者只有一个,就是一直进行的select循环,而生产者可能有多个。具体实现参见 http://blog.csdn.net/youaremoon/article/details/50351929

    事件循环

    • NioEventLoop初始化时,会根据配置参数sun.nio.ch.bugLevelio.netty.selectorAutoRebuildThreshold设置重建selector的阈值,这是为了解决jvm空轮询导致cpu利用率100%的问题。
    • openSelector的目的是打开选择描述符Selector,并对sun.nio.ch.SelectorImpl的实现进行优化,将selectedKeys和publicSelectedKeys属性都修改为SelectedSelectionKeySet类,这个类使用了两个数组,使用空间换时间的方法,设置了两个数组,每次使用其中的一个。
    • 打开Selector之后,在服务器启动后会调用register将选择描述符注册到EventLoopGroup,NioEventLoopGroup中会调用NioEventLoop的register,这样,事件循环中的Selector就注册到了channel上。
    • 在run方法中,会根据selectStrategy调用select方法,收到io事件后使用processSelectedKeys处理,处理完成后执行TaskQueue中的方法。

    提交任务

    NioEventLoop初始化时,会创建/设置其包含的属性,最重要的是打开selector和创建tailTasks两个步骤;这时,由于没有任何任务,NioEventLoop不会启动线程。在netty中,向线程池提交任务可以使用下面的方法:

    EventLoopGroup loop = new NioEventLoopGroup();
    loop.next().submit(Callable<T> task)
    loop.next().submit(Runnable task)
    loop.next().execute(Runnable command);
    

    也可以直接通过EventLoopGroup提交任务,只是EventLoopGroup内部会调用next()后再执行相关的方法。

    EventLoopGroup loop = new NioEventLoopGroup();
    loop.submit(Callable<T> task)
    loop.submit(Runnable task)
    loop.execute(Runnable command);
    

    submit方法的内部会将Callable或Runnable包装后交给execute方法执行。

    // AbstractExecutorService.java
    public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task); // 包装task为 ftask
            execute(ftask);
            return ftask;
        }
    

    execute方法被NioEventLoop的父类SingleThreadEventExecutor覆盖,程序如下:

    public void execute(Runnable task) {
            boolean inEventLoop = inEventLoop();
            if (inEventLoop) {
                addTask(task); // 添加到任务队列
            } else {
                startThread(); // 启动线程,向EventLoop内部的线程池提交任务,会执行NioEventLoop run
                addTask(task);
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
    
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }
    
    1. 判断当前线程(提交任务的线程)与当前线程池是同一个线程,也就是说是如果是当前线程池提交的任务,则直接将任务加入线程池队列即可;
    2. 如果不是,则需要启动线程后添加任务。启动线程的过程是,如果内部线程没有启动则启动,向NioEventLoop内部包含的executor提交一个任务,任务内部执行NioEventLoop的run方法也就是事件循环(executor是实际使用的线程池,初始化是传入,默认是ThreadPerTaskExecutor)。
    3. 最后根据addTaskWakesUp标志和任务是否实现了NonWakeupRunnable判断是否需要唤醒,唤醒的方法是提交一个默认的空任务WAKEUP_TASK。

    3.4 事件循环解析

    Nio事件循环在NioEventLoop中,主要功能:

    • 处理网络I/O读写事件
    • 执行系统任务和定时任务

    在主循环中我们可以看到netty对I/O任务和提交到事件循环中的系统任务的调度。


    EventLoop事件循环

    3.4.1 I/O事件

    1. 由于NIO的I/O读写需要使用选择符,因此,netty在NioEventLoop初始化时,会使用SelectorProvider打开selector。在类加载时,netty会从系统设置中读取相关配置参数:
    • sun.nio.ch.bugLevel 用来修复JDK的NIO在Selector.open()的一个BUG
    • io.netty.selectorAutoRebuildThreshold select()多少次数后重建selector
    static {
            int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
            if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
                selectorAutoRebuildThreshold = 0;
            }
            SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
        }
    
    1. NioEventLoop的构造方法中,会调用provider.openSelector()打开Selector;如果设置io.netty.noKeySetOptimization为true,则会启动优化,优化内容是将Selector的selectedKeys和publicSelectedKeys属性设置为可写并替换为Netty实现的集合以提供效率。
    private Selector openSelector() {
            final Selector selector;
            try {
                selector = provider.openSelector();
            } catch (IOException e) {
                throw new ChannelException("failed to open a new selector", e);
            }
    
            if (DISABLE_KEYSET_OPTIMIZATION) {
                return selector;
            }
           //  下面是优化程序,此处省略
           ...
            return selector;
        }
    
    1. NioEventLoop最核心的地方在于事件循环,具体代码在NioEventLoop.java在run方法中
    • 首先根据默认的选择策略DefaultSelectStrategy判断本次循环是否select,具体逻辑为:如果当前有任务则使用selectNow立刻查询是否有准备就绪的I/O;如果当前没有任务则返回SelectStrategy.SELECT,并将wakenUp设置为false,并调用select()进行查询。
     protected void run() {
            for (;;) {  // 事件循环
                try {
                    // select策略
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));  // select()
                            if (wakenUp.get()) {
                                selector.wakeup(); // 唤醒select()的线程
                            }
                        default:
                            // fallthrough
                    }
                .... 后续处理
    
    • select()时需要判断当前是否有scheduledTask(定时任务),如果有则需要计算任务delay的时间,如果定时任务需要立刻执行了,那么必须马上selectNow()并返回,之后执行任务。如果没有scheduledTask,会判断当前是否有任务在等待列表,如果有任务时将wakenUp设置为true并selectNow();如果没有任务,那么会 selector.select(1000); 阻塞等待1s,直到有I/O就绪,或者有任务等待,或需要唤醒时退出,否则,会继续循环,直到前面的几种情况发生后退出。

    • 之后,事件循环开始处理IO和任务。如果查询到有IO事件,会调用processSelectedKeysOptimized(优化的情况下),对SelectionKey进行处理。

    if (ioRatio == 100) {
        try {
                processSelectedKeys();
            } finally {
                runAllTasks();
           }
    } else {
        final long ioStartTime = System.nanoTime();
        try {
            processSelectedKeys();
          } finally {
            final long ioTime = System.nanoTime() - ioStartTime; // io花费的时间
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 按照iorate计算task的时间
    }
    }
    
    
    • processSelectedKeysOptimized处理I/O,主要是NIO的select操作,处理相关的事件。
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
           ......
            try {
                int readyOps = k.readyOps();
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                  
                    ch.unsafe().forceFlush();
                }
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                    if (!ch.isOpen()) {
                        // Connection already closed - no need to handle write.
                        return;
                    }
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    

    3.4.2 任务处理

    • runAllTasks执行提交到EventLoop的任务,首先从scheduledTaskQueue获取需要执行的任务,加入到taskQueue,然后依次执行taskQueue的任务。
    protected boolean runAllTasks() {
            assert inEventLoop();
            boolean fetchedAll;
            boolean ranAtLeastOne = false;
    
            do {
                fetchedAll = fetchFromScheduledTaskQueue(); // 获取定时任务
                if (runAllTasksFrom(taskQueue)) {
                    ranAtLeastOne = true;
                }
            } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
    
            if (ranAtLeastOne) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
            }
            afterRunningAllTasks();
            return ranAtLeastOne;
        }
    
    • ioRatio不为100时,会调用runAllTasks(ioTime * (100 - ioRatio) / ioRatio),首先计算出I/O处理的事件,然后按照比例为执行task分配事件,内部主要逻辑与runAllTasks()主要逻辑相同。

    相关文章

      网友评论

        本文标题:Netty源码分析-03 Netty线程池

        本文链接:https://www.haomeiwen.com/subject/rangvqtx.html