美文网首页
JDK并发组件-ThreadPoolExecutor

JDK并发组件-ThreadPoolExecutor

作者: zhangsean | 来源:发表于2020-01-04 13:33 被阅读0次

结构和参数

字段 代码 含义
COUNT_BITS Integer.SIZE - 3 30
CAPACITY (1 << COUNT_BITS) - 1 00011111_11111111_11111111_11111111
RUNNING -1 << COUNT_BITS 11100000_00000000_00000000_00000000
SHUTDOWN 0 << COUNT_BITS 00000000_00000000_00000000_00000000
STOP 1 << COUNT_BITS 00100000_00000000_00000000_00000000
TIDYING 2 << COUNT_BITS 01000000_00000000_00000000_00000000
TERMINATED 3 << COUNT_BITS 01100000_00000000_00000000_00000000
//取最高的3位
private static int runStateOf(int c)     { return c & ~CAPACITY; }

//取低29位
private static int workerCountOf(int c)  { return c & CAPACITY; }

//控制位状态 + 线程数量得出来的 ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }
//c < s
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
//c >= s
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
//是否是RUNNING状态
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
//尝试增加worker数
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

//尝试减少worker数
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}

//减少worker数
private void decrementWorkerCount() {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

过程

execute

提交任务

  1. 如果运行的线程数比核心线程数少,开启新的线程
  2. 如果核心线程数满了提交到队列, 提交之后检查线程池是否已经关闭、工作线程的数量;如果已经关闭,则拒绝发送请求;如果工作线程数量为0,则创建新的线程。
  3. 如果提交到队列也失败,那么尝试添加新的线程。如果线程数大于最大的线程数,拒绝提交。
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
  
    //1
    //if条件成功之后,addWorker过程可能遇到cas操作失败,核心线程已经被填满的情况
    //这时候就会失败,变成提交队列
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
  
    //2
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
  
    //3
    else if (!addWorker(command, false))
        reject(command);
}

addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
      
        // 1. RUNNING状态可以提交任何任务
        // 2. 大于SHUTDOWN不能提交任务
        // 3. 正在SHUTDOWN并且工作队列有任务,可以提交空任务
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            
            //当前任务数已经超过系统限制
            //(核心任务)当前任务数超过核心线程数,则失败
            //(非核心任务)当前任务数超过最大任务数,则失败
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
          
            //cas增加任务数量
            if (compareAndIncrementWorkerCount(c))
                break retry;
          
            //cas增加任务数失败,导致失败的原因可能是状态变化,或者任务数量变化;
            //状态变化,要重新执行retry过程
            //任务数量变化,直接执行添加任务过程
            c = ctl.get();
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
  
        //这时已经添加了worker数

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
      
        //创建worker(同时也新建了线程)
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                //RUNNING状态可以提交任务
                //SHUTDOWN状态可以提交空任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //添加之启动这个线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

worker线程运行

  1. 核心线程不断拉取queue中的内容 (可设置超时)
  2. queue中无任务时核心线程阻塞
  3. 超过queue大小时,创建额外线程,额外线程运行完任务后就结束,保证线程只有核心线程在循环

runWorker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //获取不到task就退出
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

getTask

private Runnable getTask() {
    boolean timedOut = false; 

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //SHUTDOWN并且workQueue为空
        //>SHUTDOWN
        //两种情况需要退出
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        //是否允许线程超时
        //核心线程超时,需要设置allowCoreThreadTimeOut
        //非核心线程自带超时功能
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //非核心线程,执行完就退出
        //超时并且队列为空,则退出
        //超时并且有至少一个工作线程,则退出
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //设置了超时 => 超时获取
            //未设置超时 => 阻塞获取
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            
            //超时
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

相关文章

  • JDK并发组件-ThreadPoolExecutor

    结构和参数 字段代码含义COUNT_BITSInteger.SIZE - 330CAPACITY(1 << COU...

  • JDK并发组件-FutureTask

    属性 方法 run和get都是同步的方法,实现异步,需要在新的Thread中运行run Get get 带超时版本...

  • JDK并发组件分析-概述

    内容 https://www.jianshu.com/p/a6e558b23cab FairSync Nonfai...

  • 并发:ThreadPoolExecutor

    参考博客链接: https://link.juejin.im参考链接:https://juejin.im/entr...

  • ThreadPoolExecutor

    参考文章:Java并发学习之线程池ThreadPoolExecutor的小结并发番@ThreadPoolExecu...

  • spring 线程池和java线程池

    jdk线程池就是使用jdk线程工具类ThreadPoolExecutor 创建线程池spring线程池就是使用自己...

  • jdk并发组件分析(1)-ReentrantLock

    公平锁 上锁 acquire 上锁过程lock()->acquire(1) acquire方法会tryAcquir...

  • 线程池

    线程池的文章:JDK线程池(一):体系结构JDK线程池(二):ThreadPoolExecutor深入分析java...

  • jdk1.8 ThreadPoolExecutor

    菜鸟一枚。理解不对的地方,望能直接指出~ 1、线程池的5种状态 RUNNING:运行中,正常接受新的任务SHUTD...

  • 线程池

    并行:多核cpu同时处理多件事并发:伪并行 继承关系:ThreadPoolExecutor->ExecutorSe...

网友评论

      本文标题:JDK并发组件-ThreadPoolExecutor

      本文链接:https://www.haomeiwen.com/subject/rjwxactx.html