Java并发之线程池

作者: 第四单元 | 来源:发表于2018-12-17 22:40 被阅读6次

    一.使用线程池的好处

    与“为每一个任务分配一个线程”相比,线程池有一些好处。

    • 重用已经创建的线程,减少了创建、销毁线程的开销。
    • 任务到达时,可能线程池中已经有创建好的线程供使用了,避免了等待线程创建的时间开销。

    二.Java线程池实现原理

    在Java中创建线程池可以使用Executors提供的四个静态方法创建适用于特定情况的几种线程池。但这些构造方法还是根据需求直接传入特定参数实例化了ThreadPoolExecutor类。所以,我们要想从原理上理解线程池,还是要先学习一下ThreadPoolExecutor的构造方法,看看都有哪些参数。这些参数其实可以理解为线程池的配置信息,根据自己的需求传入不同的参数就能构造出不同的线程池。

    ThreadPollExector类有4个构造方法,其它三个的参数较少,使用了一些但归根结底是传入的7个关键参数决定了这个线程池是什么样的。所以关键是这七个参数。

    2.1 ThreadPoolExecutor构造方法的7个参数

    方法声明如下:

     public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler);
    

    corePoolSize :
    核心线程数。当线程池刚刚创建时线程数是0,这时如果每来一个任务就会创建一个新的线程,直到已创建的线程数等于核心线程数后就不再创建了,而是把新任务放入阻塞队里去。

    关于初始线程数:如果调用了prestartAllCoreThreads()会直接创建出corePoolSize个线程;如果调用了prestartCoreThread()则会提前创建出一个线程。

    关于线程存活时间:核心线程不受keepAliveTime的影响,创建后会一直存在,直到线程池关闭。除非调用了allowCoreThreadTimeOut(true)方法

    这里的疑问是,如果现有线程是有空闲的,但没达到核心线程数,来了新任务会创建新的线程吗?
    答:这个疑问如果理解了线程池的工作过程就不会问了。详见下面的线程池工作流程。

    maximumPoolSize
    最大线程数。是线程池最多允许存在的线程总数。如果当前线程数已经达到corePoolSize。那么就将任务放入队列,如果队列也满了。就判断一下当前存在线程数是否小于maximumPoolSize,如果是,则创建新的线程执行任务。这里创建的线程处于核心线程池外,受keepAliveTime的影响,如果空闲到达执行时间就会销毁。

    *keepAliveTime
    控制非核心线程在空闲状态下的存活时间。如果调用allowCoreThreadTimedOut方法,核心线程也可以受它的影响。

    unit
    keepAliveTime的时间单位,有秒、毫秒、分钟、小时、天等。

    workQueue
    指定缓存队列

    threadFactory
    线程工厂,用来创建线程。

    handler
    表示当前拒绝处理任务时的策略,当缓存队列已满,线程数也达到maximumPoolSize时就按指定的策略处理新提交的任务。
    主要有以下四种取值:

    • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    2.2 线程池工作流程

    • 当有新任务提交给线程池时,首先检查一下当前线程数是否达到了corePoolSize,如果没达到则新创建线程,将当前任务作为该线程的第一个任务执行。
    • 如果已经到达了corePoolSize或超过了,则将任务放入缓存队列。
    • 如果缓存队列已满,则判断当前线程数是否已经达到了maximumPoolSize,如果没到达则创建新线程执行任务
    • 如果线程数达到了maximumPoolSize,则使用拒绝策略处理

    2.3 ThreadPoolExecutor源码分析

    2.3.1 线程池状态

    线程池有running,shutdown,stop,tidying,termienated几种状态。在jdk1.8的实现中,复用了一个AtomicInteger对象来同时存储线程状态和当前线程数。具体代码不做展开,理解这种做法即可。

    2.3.2 参数介绍
    • ReentrantLock mainLook 一个锁,添加工作线程时要先获取锁
    • HashSet<Worker> workers 存储工作线程
    • int largesetPoolSize 记录曾经达到的最大线程数
    • long completedTaskCount 记录已经完成的任务数量
      其他还有构造方法传入的7个参数也都有相应的属性进行保存
    2.3.3 execute(Runnable command)方法分析

    通过这个方法将任务提交给线程池执行。这个方法基本是2.2节线程池工作流程执行的。

        public void execute(Runnable command) {
            if (command == null) //空指针异常
                throw new NullPointerException();
            //获取clt,这个AtomicInteger对象中存储着当前线程数和线程运行状态
            int c = ctl.get(); 
            //如果当前线程数小于核心线程数则执行添加线程动作
            if (workerCountOf(c) < corePoolSize) {
                //addWorker的第二个参数表示添加的是否是核心线程,这里是true
                if (addWorker(command, true))
                    return;
                //添加后重新获取状态值,因为线程数已经有变化了,线程池状态也可能变了
                c = ctl.get();
            }
            //执行到这里,说明添加核心线程不成功,可能是数量达到corePoolSize或线程池shutdown了
            //如果线程池还在运行尝试将任务加入缓冲队列
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //如果添加到队列后,线程池停止运行了,将任务从队列移除
                if (! isRunning(recheck) && remove(command))
                    reject(command);  //移除成功后使用拒绝策略处理任务
                else if (workerCountOf(recheck) == 0)  //分析1:如果核心线程池为空,添加一个非核心线程,处理队列中可能的任务
                    addWorker(null, false);
            }
            else if (!addWorker(command, false)) //队列满了,启动非核心线程执行任务
                reject(command);  //非核心线程启动失败,执行拒绝策略
        }
    
    2.3.4 addWorker(Runnable,Boolean)分析

    addWorker方法用来给线程池中添加线程。第二个参数表示添加的是否是核心线程。下面来看一下它是怎么工作的吧!

        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                //分析1:注意这个if语句是和2.3.3中分析1相呼应的。
               //调用shutdown()将空闲线程interrupt,正在执行的线程继续执行,将状态设为shutdown
              //调用shutdownNow()将所有线程中断,不管有没有执行完。
              //如果shutdown()后,核心线程都关闭了,队列中却还有元素,2.3.3分析1就添加了新的非核心线程处理,就是这里的!(rs==SHUTDOWN&&……)这种情况
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                  //超过执行线程数,返回false
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                  //cas将线程计数加1,失败后重试
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                  //线程状态发生变化重新循环
                    if (runStateOf(c) != rs) 
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
            
            //下面是添加线程的过程
            boolean workerStarted = false; //标识线程是否启动
            boolean workerAdded = false; //标识线程是否被添加
            Worker w = null;
            try {
                w = new Worker(firstTask);   //新生成worker,线程池中的线程用它表达
                final Thread t = w.thread;      //获取实际的线程
                if (t != null) {  
                   //添加线程要先获取锁
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        //获取当前状态
                        int rs = runStateOf(ctl.get());
    
                        //不是shutdown状态或者,处于shutdown状态但添加的tash是null,属于shutdown下添加线程处理队列中剩余任务的情况
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);  //添加到workers,workers持有线程
                            int s = workers.size();  
                            if (s > largestPoolSize) //记录最大线程数记录
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();   //启动线程执行
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)  //没成功
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
    2.3.5 worker的执行

    添加线程后,worker就开始执行了,在它的执行方法run里会直接调用tast的run方法,执行要干的事情。
    执行完成之后会去队列获取新的任务执行。
    如果没有新任务执行呢?
    Worker是ThreadPoolExecutor的一个内部类,它的实现了Runnable方法。这里有个问题是Runnable需要传递给Thread才能执行。Worker是如何做到的呢?
    原来,Worker类持有了一个Thread类型的变量thread,并在初始化时使用Worker本身初始化了thread
    Worker的构造方法:

     Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    

    Worker的run方法调用了runWoker(this)。Worker还继承了AQS,这一块还有待学习。
    第一次启动会执行初始化传进来的任务firstTask;然后会从workQueue中取任务执行,如果队列为空则等待keepAliveTime这么长时间

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
          //第一次时执行外部传过来的task,后面从getTask获取,getTask从队列获取任务执行,
          //如果队列为空则等待keepAliveTime这么长的时间
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    
    2.3.5 其他内容

    下一步应该分析getTask是如何从队列获取任务的了,这里不再展开讲了。
    此外,还应该有队列的实现和选择问题,拒绝策略的具体等,目前先不做如此多的分析了,待未来时机成熟了再完善。

    二.使用Executors创建具有默认配置的线程池

    java.util.concurrent.Executors中提供了4个静态方法,可以用来创建具有指定特性的线程池。都是实例化了ThreadPoolExecutor对象。

    2.1 newFixedThreadPool()方法

    返回一个带缓存的线程池,该池在必要的时候创建线程,在线程空闲60s之后终止线程.
    下面看一下这个方法的源码实现:

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    2.2 newFixedThreadPool

    创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,指导达到线程池的最大数量,这是线程池的规模将不再变化(如果某个线程由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程)

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    2.3newScheduledThreadPool

    创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer

        public static ScheduledExecutorService newScheduledThreadPool(
                int corePoolSize, ThreadFactory threadFactory) {
            return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
        }
    

    2.4newSingleThreadExecutor

    是一个单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。

        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    相关文章

      网友评论

        本文标题:Java并发之线程池

        本文链接:https://www.haomeiwen.com/subject/pryckqtx.html