美文网首页Java知识点并发Arch架构-OS系统
线程池,常见的四种线程池和区别

线程池,常见的四种线程池和区别

作者: 维特无忧堡 | 来源:发表于2018-08-08 23:02 被阅读470次

    简述

      为了彻底了解线程池的时候,我们需要弄清楚线程池创建的几个参数

    • corepollsize : 核心池的大小,默认情况下,在创建线程池后,每当有新的任务来的时候,如果此时线程池中的线程数小于核心线程数,就会去创建一个线程执行(就算有空线程也不复用),当创建的线程数达到核心线程数之后,再有任务进来就会放入任务缓存队列中
    • Maximumpoolsize : 线程池中最多可以创建的线程数
    • keeplivetime : 线程空闲状态时,最多保持多久的时间会终止。默认情况下,当线程池中的线程数大于corepollsize 时,才会起作用 ,直到线程数不大于 corepollsize 。
    • workQuque: 阻塞队列,用来存放等待的任务
    • rejectedExecutionHandler :任务拒绝处理器(这个注意一下),有四种

    (1)abortpolicy丢弃任务,抛出异常
    (2)discardpolicy拒绝执行,不抛异常
    (3)discardoldestpolicy 丢弃任务缓存队列中最老的任务
    (4)CallerRunsPolicy 线程池不执行这个任务,主线程自己执行。

    1、newFixedThreadPool 定长线程池

    一个有指定的线程数的线程池,有核心的线程,里面有固定的线程数量,响应的速度快。正规的并发线程,多用于服务器。固定的线程数由系统资源设置。核心线程是没有超时机制的,队列大小没有限制,除非线程池关闭了核心线程才会被回收。

    2、newCachedThreadPool 可缓冲线程池

    只有非核心线程,最大线程数很大,每新来一个任务,当没有空余线程的时候就会重新创建一个线程,这边有一个超时机制,当空闲的线程超过60s内没有用到的话,就会被回收,它可以一定程序减少频繁创建/销毁线程,减少系统开销,适用于执行时间短并且数量多的任务场景。

    3、ScheduledThreadPool 周期线程池

    创建一个定长线程池,支持定时及周期性任务执行,通过过schedule方法可以设置任务的周期执行

    4、newSingleThreadExecutor 单任务线程池

    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行,每次任务到来后都会进入阻塞队列,然后按指定顺序执行。

    关键源码解读

    1、execute方法
     public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
           //判断当前线程个数是否小于corePoolSize(核心线程数),如果小于的话,就再创建一个线程,每个线程都被封装成一个Worker,
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
           //如果大于核心线程的话就尝试把任务加入缓存队列,这里增加了状态出现异常的确认判断,
           //如果状态出现异常会继续remove操作,如果执行true,则按照拒绝处理策略驳回任务
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //如果队列放不了,只能采用默认的拒绝服务策略了,
            else if (!addWorker(command, false))
                reject(command);
        }
    

    源码中出现ctl的次数比较多,那么这是个什么呢?
    我们可以看看它的定义

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;
    线程池状态
    private static final int RUNNING = -1 << COUNT_BITS;
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    private static final int STOP = 1 << COUNT_BITS;
    private static final int TIDYING = 2 << COUNT_BITS;
    private static final int TERMINATED = 3 << COUNT_BITS;
    方法
    private static int workerCountOf(int c) { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    它实质上就是一个线程安全的32位的Integer,用前三位表示线程池的状态,后29位来表示线程的个数,所以在计算个数的时候用到了workerCountOf,忽略前三位带来的影响。

    2、Worker中的run方法

    Worker就是对线程的封装,线程池中维护了一个HashSet<Worker>的一个集合来存储工作线程,每次addWork的时候就往这个里面加,因为HashSet是不安全的,所以加了ReentrantLock来做同步

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    其实这个操作很简单,就是一个while不断的去getTask,获得任务之后,就依次执行
    beforeExecute(wt, task);
    task.run();
    afterExecute(task, thrown);
    (发现每次执行任务的时候都加了锁,有点奇怪,这里我还要看一下)
    那么假设任务队列中没有了呢?那这里就用到了我们定义的keeplivetime ,在getTask中有这样一段代码,

    Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();
    也就是当超过keeplivetime 没有拿到就会返回null,这个时候循环就会截止,这个线程Wo也就会结束,所以说keepAliveTime指的是最长的poll时间

    相关文章

      网友评论

        本文标题:线程池,常见的四种线程池和区别

        本文链接:https://www.haomeiwen.com/subject/bcclvftx.html