美文网首页
线程池 ThreadPoolExecutor

线程池 ThreadPoolExecutor

作者: Wi1ls努力努力再努力 | 来源:发表于2019-05-12 15:03 被阅读0次
    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
    

    参数说明

    参数 说明
    corePoolSize 核心线程数,也指线程池的基本大小。当任务提交时,即时有其他空闲线程也会创建一个新线程执行任务,直到线程数等于 corePoolSize
    maximumpoolSize 线程池最大数量。如果队列满并且创建的线程数小于最大线程数,则会创建新的现场执行任务。对于无界队列无用
    keepAliveTime 工作线程活动保持时间。当工作线程空闲后,在 keepAliveTime 内没有重新执行任务,则会被销毁
    unit keepAliveTime的单位
    workQueue 任务队列,用以保存等待执行的任务的阻塞队列
    threadFactory 先创创建工厂
    handler 饱和策略,当队列和线程池满后,说明线程池处于饱和状态。

    • 常用的阻塞队列

      • ArrayBlockingQueue
      • LinkedBlockingQueue
      • SynchronousQueue
      • PriorityBlockingQueue
    • 常用的饱和策略

      • AbortPolicy
      • CallerRunsPolicy
      • DiscardOldestPolicy
      • DiscardPolicy

    常见的线程池,用线程池工具类 Executors 创建

    线程池 corePoolSize maximumPoolSize keepAliveTime unit workQueue threadFactory handler
    FixedThreadPool 自定义 等于 corePoolSize 0 MILLISECONDS LinkedBlockingQueue Executors$DefaultThreadFactory AbortPolicy
    SingleThreadExecutor 1 1 0 MILLISECONDS LinkedBlockingQueue 自定义或Executors$DefaultThreadFactory AbortPolicy
    CachedThreadPool 0 Integer.MAX_ VALUE 60 SECONDS SynchronousQueue 自定义或Executors$DefaultThreadFactory AbortPolicy

    默认的构造函数

    @Executors
    public static ExecutorService newFixThreadPool(int nThreads);
    public static ExecutorService newFixThreadPool(int  nThreads, ThreadFactory threadFactory)
    
    public static ExecutorService newSingleThreadExecutor();
    public static ExecutorService newSingleThreadExecutor(ThreadFactory factory);
    
    public static ExecutorService newCachedThreadPool();
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
    

    FixThread 介绍:
    从参数构造可以看出来 FixThread 内的线程全都是核心线程,使用的队列是无界阻塞队列 LinkedBlockingQueue,因此 FixThread 会从无界队列中反复获取任务交由核心线程执行


    SingleThreadExecutor:
    其核心线程数和最大线程数被设置为 1,任务队列使用无界队列 LinkedBlockingQueue,因此该线程池会反复从 LinkedBlockingQueue 中获取任务来执行,每次执行一个


    CachedThreadPool:
    其核心线程数为 0,最大线程数为Integer.MAX_VALUE,并且 keepAliveTime 设置为 60 秒,使用的是没有容量的 SynchronousQueue,因此每提交一个任务,如果没有多余的线程可以使用,那么每次都会创建一个线程去执行任务。于是乎如果任务的提交速度远大于任务的执行速度,那么会因为创建过多线程而耗尽 CPU。


    线程池:
    无论是 FixThread 还是 SingleThreadExecutor 还是 CacherThreadPool,其内部都是ThreadPoolExecutor对象,参数介绍在上面已经说明了。当执行一个任务时,需要调用 execute(Runnable)@ThreadPoolExecutor

    public void execute(Runnable command){
        if(command == null){
            throw new NullPointException();
        }
            //默认值是 e0000000
        int c= ctl.get();
            //从 workerCountOf(c)返回 c&1ffffffff,因此线程最大为 0x1fffffff
        if(workerCountOf(c) < corePoolSize){
                    //当线程数小于 corePoolSize,则
            if(addWorker(command, true)){
                c = ctl.get();
            }
        }
            //如果线程池是 Running 的并且任务入队成功
        if(isRunning(c) && workQueue.offer(command)){
            int recheck = ctl.get();
                    //如果线程池非 Running 并且任务出队成功,则执行拒绝策略
            if(!isRunning(recheck) && remove(command)){
                reject(command);
            }else if(workerCountOf(recheck) == 0){
                            //
                addWorker(null,false);
            }
        }else if(!addWorker(command, flase)){
            reject(command);
        }
    }
    

    看一些参数说明

      //初始值是 e0000000,最高位表示线程池状态,低 7 位表示线程数,
      //如 e 代表 RUNNING,0 代表 SHUTDOWN,2代表 STOP,4代表 TIDYING,6代表 TERMINATED
      private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        //29
        private static final int COUNT_BITS = Integer.SIZE - 3;
        //1fffffff
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        0xe0000000
        private static final int RUNNING    = -1 << COUNT_BITS;
        0x0
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        0x20000000
        private static final int STOP       =  1 << COUNT_BITS;
        0x40000000
        private static final int TIDYING    =  2 << COUNT_BITS;
        0x6000000
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    private boolean addWorker(Runnable firstTask, boolean core){
        retry:
        for(;;){
            int c= ctl.get();
            int rs=runStateOf(c);
    
            //runstateOf()是做 mask 操作,取出线程池状态
            //mask是 e00000000,
            //SHUTWODN是非 RUNNING状态的最小值,如果>=SHUTDOWN表示线程池非 RUNNING
            if(rs >= SHUTDOWN &&
                !(rs == SHUTDOWN &&
                    firstTask == null &&
                    ! workQueue.isEmpty())){
                return false;
            }
    
            for(;;){
                int wc = wokerCountOf(c);
                //线程池当前线程数量大于线程池容量 或
                //如果是用核心线程执行且大于核心线程数阈值
                //或是用非核心线程执行且大于非核心线程数阈值
                if(wc >= CAPACITY
                    || wc>=(core? corePoolSize : maximumPoolSize)){
                    return false;
                }
                //CAS 增加线程池线程数,则否重试
                if(compareAndIncrementWorkerCount(c)){
                    break retry;
                }
                //? re-check
                c = ctl.get();
                if(runStateOf(c) != rs){
                    continue retry;
                }
            }
        }
        //以下是为了增加工作线程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try{
            //将 Runnable 封装为 Worker,内部用线程工厂类创建线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if(t!=null){
                final RenntrantLock mainLock = this.mainLock;
                //锁
                mainLock.lock();
                try{
                    int rs = runStateOf(ctl.get());
    
                    if(rs < SHUTDOWN
                        ||(rs == SHUTDOWN && firstTask == null)){
                        if(t.isAlive()){
                            throw new IllegalThreadStateException();
                        }
                        workers.add(w);
                        int s = workers.size();
                        if(s>largestPoolSize){
                            largestPoolSize = s;
                        }
                        workerAdded = true;
                    }
                }finally{
                    main.unlock();
                }
                if(workerAdded){
                                    //开始工作
                    t.start();
                    workerStarted = true;
                }
            }
        }finally{
            if(!workStarted){
                addWorkerFailed(w);
            }
        }
        return workerStated;
    }
    
    @ThreadPoolExecutor$Worker
    Worker(Runnable firstTask){
      setState(-1);
      this.firstTask = firstTask;
      this.thread=getThreadFactory().newThread(this);
    }
    //当在 addWorker()中调用 Worker.thread.start()则会调用 run()@Worker
    public void run(){
      runWorker(this);
    }
    
    
    final void runWorker(Worker w){
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock();
        boolean completedAbruptly = true;
        try{
            //循环取任务并且执行
            while(task !=null || (task = getTask) !=null){
                w.lock();
                if((runStateAtLeast(ctl.get(),STOP)
                    ||(Thread.interrupted() && 
                    runStateAtLeast(ctl.get(),STOP)))
                    &&!wt.isInterrupted()){
                    wt.interrupt();
                }
                try{
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try{
                        task.run();
                    }catch(Exception x){
                        throw x;
                    }finally {
                        afterExecute(task, thrown);
                    }
                }finally{
                    task = null
                    w.completedTasks++;
                    w.unlock;
                }
            }   
            completedAbruptly = false;
        }finally{
            processWorkerExit(w, completedAbruptly);
        }
    }
    
    //从任务队列取出任务
    private Runnable getTask(){
        boolean timeOut = false;
        for(;;){
            int c = ctl.get();
            int rs = runStateOf(c);
    
            if(rs>= SHUTDOWN && (rs >= STOP)||workQueue.isEmpty()){
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
    
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            if((wc>maximumPoolSize ||(timed && timedOut))
                &&(wc > 1 || workQueue.isEmpty())){
                    if(compareAndDecrementWorkerCount(c)){
                        return null;
                    }
                    continue;
            }
    
            try{
                Runnable r  = timed ?
                    workQueue.pool(keepALiveTime, TimeUnit.NANOSECOND):
                    workQueue.take();
                if(r !=null){
                    return r;
                }
                timeOut = true;
            }catch(InterruptedException retry){
                timeOut = false;
            }
        }
    }
    
    

    。。。

    相关文章

      网友评论

          本文标题:线程池 ThreadPoolExecutor

          本文链接:https://www.haomeiwen.com/subject/fduxaqtx.html