美文网首页Java知识点并发Arch架构-OS系统
线程池,常见的四种线程池和区别

线程池,常见的四种线程池和区别

作者: 维特无忧堡 | 来源:发表于2018-08-08 23:02 被阅读470次

简述

  为了彻底了解线程池的时候,我们需要弄清楚线程池创建的几个参数

  • corepollsize : 核心池的大小,默认情况下,在创建线程池后,每当有新的任务来的时候,如果此时线程池中的线程数小于核心线程数,就会去创建一个线程执行(就算有空线程也不复用),当创建的线程数达到核心线程数之后,再有任务进来就会放入任务缓存队列中
  • Maximumpoolsize : 线程池中最多可以创建的线程数
  • keeplivetime : 线程空闲状态时,最多保持多久的时间会终止。默认情况下,当线程池中的线程数大于corepollsize 时,才会起作用 ,直到线程数不大于 corepollsize 。
  • workQuque: 阻塞队列,用来存放等待的任务
  • rejectedExecutionHandler :任务拒绝处理器(这个注意一下),有四种

(1)abortpolicy丢弃任务,抛出异常
(2)discardpolicy拒绝执行,不抛异常
(3)discardoldestpolicy 丢弃任务缓存队列中最老的任务
(4)CallerRunsPolicy 线程池不执行这个任务,主线程自己执行。

1、newFixedThreadPool 定长线程池

一个有指定的线程数的线程池,有核心的线程,里面有固定的线程数量,响应的速度快。正规的并发线程,多用于服务器。固定的线程数由系统资源设置。核心线程是没有超时机制的,队列大小没有限制,除非线程池关闭了核心线程才会被回收。

2、newCachedThreadPool 可缓冲线程池

只有非核心线程,最大线程数很大,每新来一个任务,当没有空余线程的时候就会重新创建一个线程,这边有一个超时机制,当空闲的线程超过60s内没有用到的话,就会被回收,它可以一定程序减少频繁创建/销毁线程,减少系统开销,适用于执行时间短并且数量多的任务场景。

3、ScheduledThreadPool 周期线程池

创建一个定长线程池,支持定时及周期性任务执行,通过过schedule方法可以设置任务的周期执行

4、newSingleThreadExecutor 单任务线程池

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行,每次任务到来后都会进入阻塞队列,然后按指定顺序执行。

关键源码解读

1、execute方法
 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
       //判断当前线程个数是否小于corePoolSize(核心线程数),如果小于的话,就再创建一个线程,每个线程都被封装成一个Worker,
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
       //如果大于核心线程的话就尝试把任务加入缓存队列,这里增加了状态出现异常的确认判断,
       //如果状态出现异常会继续remove操作,如果执行true,则按照拒绝处理策略驳回任务
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果队列放不了,只能采用默认的拒绝服务策略了,
        else if (!addWorker(command, false))
            reject(command);
    }

源码中出现ctl的次数比较多,那么这是个什么呢?
我们可以看看它的定义

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
方法
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

它实质上就是一个线程安全的32位的Integer,用前三位表示线程池的状态,后29位来表示线程的个数,所以在计算个数的时候用到了workerCountOf,忽略前三位带来的影响。

2、Worker中的run方法

Worker就是对线程的封装,线程池中维护了一个HashSet<Worker>的一个集合来存储工作线程,每次addWork的时候就往这个里面加,因为HashSet是不安全的,所以加了ReentrantLock来做同步

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

其实这个操作很简单,就是一个while不断的去getTask,获得任务之后,就依次执行
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
(发现每次执行任务的时候都加了锁,有点奇怪,这里我还要看一下)
那么假设任务队列中没有了呢?那这里就用到了我们定义的keeplivetime ,在getTask中有这样一段代码,

Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
也就是当超过keeplivetime 没有拿到就会返回null,这个时候循环就会截止,这个线程Wo也就会结束,所以说keepAliveTime指的是最长的poll时间

相关文章

网友评论

    本文标题:线程池,常见的四种线程池和区别

    本文链接:https://www.haomeiwen.com/subject/bcclvftx.html