美文网首页Java 杂谈互联网科技java知识
Java线程池ThreadPoolExecutor实现原理剖析

Java线程池ThreadPoolExecutor实现原理剖析

作者: 像程序一样思考 | 来源:发表于2018-10-11 17:22 被阅读3次

在Java中,使用线程池来异步执行一些耗时任务是非常常见的操作。最初我们一般都是直接使用new Thread().start的方式,但我们知道,线程的创建和销毁都会耗费大量的资源,关于线程可以参考之前的一片博客Java线程那点事儿, 因此我们需要重用线程资源。

当然也有其他待解决方案,比如说coroutine, 目前Kotlin已经支持了,JDK也已经有了相关的提案:Project Loom, 目前的实现方式和Kotlin有点类似,都是基于ForkJoinPool,当然目前还有很多限制,以及问题没解决,比如synchronized还是锁住当前线程等。

继承结构

继承结构看起来很清晰,最顶层的Executor只提供了一个最简单的void execute(Runnable command)方法,然后是ExecutorService,ExecutorService提供了一些管理相关的方法,例如关闭、判断当前线程池的状态等,另外不同于Executor#execute,ExecutorService提供了一系列方法,可以将任务包装成一个Future,从而使得任务提交方可以跟踪任务的状态。而父类AbstractExecutorService则提供了一些默认的实现。

构造器

ThreadPoolExecutor的构造器提供了非常多的参数,每一个参数都非常的重要,一不小心就容易踩坑,因此设置的时候,你必须要知道自己在干什么。

publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueueworkQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler) {if(corePoolSize<0||maximumPoolSize<=0||maximumPoolSize

corePoolSize、 maximumPoolSize。线程池会自动根据corePoolSize和maximumPoolSize去调整当前线程池的大小。当你通过submit或者execute方法提交任务的时候,如果当前线程池的线程数小于corePoolSize,那么线程池就会创建一个新的线程处理任务, 即使其他的core线程是空闲的。如果当前线程数大于corePoolSize并且小于maximumPoolSize,那么只有在队列"满"的时候才会创建新的线程。因此这里会有很多的坑,比如你的core和max线程数设置的不一样,希望请求积压在队列的时候能够实时的扩容,但如果制定了一个无界队列,那么就不会扩容了,因为队列不存在满的概念。

keepAliveTime。如果当前线程池中的线程数超过了corePoolSize,那么如果在keepAliveTime时间内都没有新的任务需要处理,那么超过corePoolSize的这部分线程就会被销毁。默认情况下是不会回收core线程的,可以通过设置allowCoreThreadTimeOut改变这一行为。

workQueue。即实际用于存储任务的队列,这个可以说是最核心的一个参数了,直接决定了线程池的行为,比如说传入一个有界队列,那么队列满的时候,线程池就会根据core和max参数的设置情况决定是否需要扩容,如果传入了一个SynchronousQueue,这个队列只有在另一个线程在同步remove的时候才可以put成功,对应到线程池中,简单来说就是如果有线程池任务处理完了,调用poll或者take方法获取新的任务的时候,新提交的任务才会put成功,否则如果当前的线程都在忙着处理任务,那么就会put失败,也就会走扩容的逻辑,如果传入了一个DelayedWorkQueue,顾名思义,任务就会根据过期时间来决定什么时候弹出,即为ScheduledThreadPoolExecutor的机制。

threadFactory。创建线程都是通过ThreadFactory来实现的,如果没指定的话,默认会使用Executors.defaultThreadFactory(),一般来说,我们会在这里对线程设置名称、异常处理器等。

handler。即当任务提交失败的时候,会调用这个处理器,ThreadPoolExecutor内置了多个实现,比如抛异常、直接抛弃等。这里也需要根据业务场景进行设置,比如说当队列积压的时候,针对性的对线程池扩容或者发送告警等策略。

看完这几个参数的含义,我们看一下Executors提供的一些工具方法,只要是为了方便使用,但是我建议最好少用这个类,而是直接用ThreadPoolExecutor的构造函数,多了解一下这几个参数到底是什么意思,自己的业务场景是什么样的,比如线程池需不需要扩容、用不用回收空闲的线程等。

publicclassExecutors{/*    * 提供一个固定大小的线程池,并且线程不会回收,由于传入的是一个无界队列,相当于队列永远不会满    * 也就不会扩容,因此需要特别注意任务积压在队列中导致内存爆掉的问题*/publicstaticExecutorServicenewFixedThreadPool(intnThreads) {returnnewThreadPoolExecutor(nThreads, nThreads,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue());    }/*    *  这个线程池会一直扩容,由于SynchronousQueue的特性,如果当前所有的线程都在处理任务,那么    *  新的请求过来,就会导致创建一个新的线程处理任务。如果线程一分钟没有新任务处理,就会被回    *  收掉。特别注意,如果每一个任务都比较耗时,并发又比较高,那么可能每次任务过来都会创建一个线    *  程*/publicstaticExecutorServicenewCachedThreadPool() {returnnewThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,newSynchronousQueue());    }}

源码分析

既然是个线程池,那就必然有其生命周期:运行中、关闭、停止等。ThreadPoolExecutor是用一个AtomicInteger去的前三位表示这个状态的,另外又重用了低29位用于表示线程数,可以支持最大大概5亿多,绝逼够用了,如果以后硬件真的发展到能够启动这么多线程,改成AtomicLong就可以了。

状态这里主要分为下面几种:

RUNNING: 表示当前线程池正在运行中,可以接受新任务以及处理队列中的任务

SHUTDOWN: 不再接受新的任务,但会继续处理队列中的任务

STOP: 不再接受新的任务,也不处理队列中的任务了,并且会中断正在进行中的任务

TIDYING: 所有任务都已经处理完毕,线程数为0,转为为TIDYING状态之后,会调用terminated()回调

TERMINATED: terminated()已经执行完毕

同时我们可以看到所有的状态都是用二进制位表示的,并且依次递增,从而方便进行比较,比如想获取当前状态是否至少为SHUTDOWN等,同时状态之前有几种转换:

RUNNING -> SHUTDOWN。调用了shutdown()之后,或者执行了finalize()

(RUNNING 或者 SHUTDOWN) -> STOP。调用了shutdownNow()之后会转换这个状态

SHUTDOWN -> TIDYING。当线程池和队列都为空的时候

STOP -> TIDYING。当线程池为空的时候

IDYING -> TERMINATED。执行完terminated()回调之后会转换为这个状态

privatefinalAtomicIntegerctl=newAtomicInteger(ctlOf(RUNNING,0));privatestaticfinalintCOUNT_BITS=Integer.SIZE-3;privatestaticfinalintCAPACITY=(1<=s;    }privatestaticbooleanisRunning(intc) {returnc

下面是比较核心的字段,这里workers采用的是非线程安全的HashSet, 而不是线程安全的版本,主要是因为这里有些复合的操作,比如说将worker添加到workers后,我们还需要判断是否需要更新largestPoolSize等,workers只在获取到mainLock的情况下才会进行读写,另外这里的mainLock也用于在中断线程的时候串行执行,否则如果不加锁的话,可能会造成并发去中断线程,引起不必要的中断风暴。

privatefinalReentrantLockmainLock=newReentrantLock();privatefinalHashSetworkers=newHashSet();privatefinalConditiontermination=mainLock.newCondition();privateintlargestPoolSize;privatelongcompletedTaskCount;

核心方法

拿到一个线程池之后,我们就可以开始提交任务,让它去执行了,那么我们看一下submit方法是如何实现的。

publicFuturesubmit(Runnabletask) {if(task==null)thrownewNullPointerException();RunnableFutureftask=newTaskFor(task,null);        execute(ftask);returnftask;    }publicFuturesubmit(Callabletask) {if(task==null)thrownewNullPointerException();RunnableFutureftask=newTaskFor(task);        execute(ftask);returnftask;    }

这两个方法都很简单,首先将提交过来的任务(有两种形式:Callable、Runnable )都包装成统一的 RunnableFuture,然后调用execute方法,execute可以说是线程池最核心的一个方法。

publicvoidexecute(Runnablecommand) {if(command==null)thrownewNullPointerException();intc=ctl.get();/*            获取当前worker的数目,如果小于corePoolSize那么就扩容,            这里不会判断是否已经有core线程,而是只要小于corePoolSize就会直接增加worker*/if(workerCountOf(c)

这里要特别注意下防止队列失败的逻辑,不同的队列丢任务的逻辑也不一样,例如说无界队列,那么就永远不会put失败,也就是说扩容也永远不会执行,如果是有界队列,那么当队列满的时候,会扩容非core线程,如果是SynchronousQueue,这个队列比较特殊,当有另外一个线程正在同步获取任务的时候,你才能put成功,因此如果当前线程池中所有的worker都忙着处理任务的时候,那么后续的每次新任务都会导致扩容, 当然如果worker没有任务处理了,阻塞在获取任务这一步的时候,新任务的提交就会直接丢到队列中去,而不会扩容。

上文中多次提到了扩容,那么我们下面看一下线程池具体是如何进行扩容的:

privatebooleanaddWorker(RunnablefirstTask,booleancore) {        retry:for(;;) {intc=ctl.get();//获取当前线程池的状态intrs=runStateOf(c);/*                如果状态为大于SHUTDOWN, 比如说STOP,STOP上文说过队列中的任务不处理了,也不接受新任务,                因此可以直接返回false不扩容了,如果状态为SHUTDOWN并且firstTask为null,同时队列非空,                那么就可以扩容*/if(rs>=SHUTDOWN&&!(rs==SHUTDOWN&&firstTask==null&&!workQueue.isEmpty()))returnfalse;for(;;) {intwc=workerCountOf(c);/*                    若worker的数目大于CAPACITY则直接返回,                    然后根据要扩容的是core线程还是非core线程,进行判断worker数目                    是否超过设置的值,超过则返回*/if(wc>=CAPACITY||wc>=(core?corePoolSize:maximumPoolSize))returnfalse;/*                    通过CAS的方式自增worker的数目,成功了则直接跳出循环*/if(compareAndIncrementWorkerCount(c))breakretry;//重新读取状态变量,如果状态改变了,比如线程池关闭了,那么就跳到最外层的for循环,//注意这里跳出的是retry。c=ctl.get();//Re-read ctlif(runStateOf(c)!=rs)continueretry;//else CAS failed due to workerCount change; retry inner loop}        }booleanworkerStarted=false;booleanworkerAdded=false;Workerw=null;try{//创建Workerw=newWorker(firstTask);finalThreadt=w.thread;if(t!=null) {finalReentrantLockmainLock=this.mainLock;                mainLock.lock();try{/*                        获取锁,并判断线程池是否已经关闭*/intrs=runStateOf(ctl.get());if(rslargestPoolSize)//更新largestPoolSizelargestPoolSize=s;                        workerAdded=true;                    }                }finally{                    mainLock.unlock();                }if(workerAdded) {//若Worker创建成功,则启动线程,这么时候worker就会开始执行任务了t.start();                    workerStarted=true;                }            }        }finally{if(!workerStarted)//添加失败addWorkerFailed(w);        }returnworkerStarted;    }privatevoidaddWorkerFailed(Workerw) {finalReentrantLockmainLock=this.mainLock;        mainLock.lock();try{if(w!=null)                workers.remove(w);            decrementWorkerCount();//每次减少worker或者从队列中移除任务的时候都需要调用这个方法tryTerminate();        }finally{            mainLock.unlock();        }    }

这里有个貌似不太起眼的方法tryTerminate,这个方法会在所有可能导致线程池终结的地方调用,比如说减少worker的数目等,如果满足条件的话,那么将线程池转换为TERMINATED状态。另外这个方法没有用private修饰,因为ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,而ScheduledThreadPoolExecutor也会调用这个方法。

finalvoidtryTerminate() {for(;;) {intc=ctl.get();/*                如果当前线程处于运行中、TIDYING、TERMINATED状态则直接返回,运行中的没                什么好说的,后面两种状态可以说线程池已经正在终结了,另外如果处于SHUTDOWN状态,                并且workQueue非空,表明还有任务需要处理,也直接返回*/if(isRunning(c)||runStateAtLeast(c,TIDYING)||(runStateOf(c)==SHUTDOWN&&!workQueue.isEmpty()))return;//可以退出,但是线程数非0,那么就中断一个线程,从而使得关闭的信号能够传递下去,//中断worker后,worker捕获异常后,会尝试退出,并在这里继续执行tryTerminate()方法,//从而使得信号传递下去if(workerCountOf(c)!=0) {                interruptIdleWorkers(ONLY_ONE);return;            }finalReentrantLockmainLock=this.mainLock;            mainLock.lock();try{//尝试转换成TIDYING状态,执行完terminated回调之后//会转换为TERMINATED状态,这个时候线程池已经完整关闭了,//通过signalAll方法,唤醒所有阻塞在awaitTermination上的线程if(ctl.compareAndSet(c, ctlOf(TIDYING,0))) {try{                        terminated();                    }finally{                        ctl.set(ctlOf(TERMINATED,0));                        termination.signalAll();                    }return;                }            }finally{                mainLock.unlock();            }//else retry on failed CAS}    }/**    * 中断空闲的线程*@paramonlyOne*/privatevoidinterruptIdleWorkers(booleanonlyOne) {finalReentrantLockmainLock=this.mainLock;        mainLock.lock();try{for(Workerw:workers) {//遍历所有worker,若之前没有被中断过,//并且获取锁成功,那么就尝试中断。//锁能够获取成功,那么表明当前worker没有在执行任务,而是在//获取任务,因此也就达到了只中断空闲线程的目的。Threadt=w.thread;if(!t.isInterrupted()&&w.tryLock()) {try{                        t.interrupt();                    }catch(SecurityExceptionignore) {                    }finally{                        w.unlock();                    }                }if(onlyOne)break;            }        }finally{            mainLock.unlock();        }    }

Worker

下面看一下Worker类,也就是这个类实际负责执行任务,Worker类继承自AbstractQueuedSynchronizer,AQS可以理解为一个同步框架,提供了一些通用的机制,利用模板方法模式,让你能够原子的管理同步状态、blocking和unblocking线程、以及队列,具体的内容之后有时间会再写,还是比较复杂的。这里Worker对AQS的使用相对比较简单,使用了状态变量state表示是否获得锁,0表示解锁、1表示已获得锁,同时通过exclusiveOwnerThread存储当前持有锁的线程。另外再简单提一下,比如说CountDownLatch, 也是基于AQS框架实现的,countdown方法递减state,await阻塞等待state为0。

privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{/** Thread this worker is running in.  Null if factory fails.*/finalThreadthread;/** Initial task to run.  Possibly null.*/RunnablefirstTask;/** Per-thread task counter*/volatilelongcompletedTasks;Worker(RunnablefirstTask) {            setState(-1);//inhibit interrupts until runWorkerthis.firstTask=firstTask;this.thread=getThreadFactory().newThread(this);        }/** Delegates main run loop to outer runWorker*/publicvoidrun() {            runWorker(this);        }protectedbooleanisHeldExclusively() {returngetState()!=0;        }protectedbooleantryAcquire(intunused) {if(compareAndSetState(0,1)) {                setExclusiveOwnerThread(Thread.currentThread());returntrue;            }returnfalse;        }protectedbooleantryRelease(intunused) {            setExclusiveOwnerThread(null);            setState(0);returntrue;        }publicvoidlock()        { acquire(1); }publicbooleantryLock()  {returntryAcquire(1); }publicvoidunlock()      { release(1); }publicbooleanisLocked() {returnisHeldExclusively(); }voidinterruptIfStarted() {Threadt;if(getState()>=0&&(t=thread)!=null&&!t.isInterrupted()) {try{                    t.interrupt();                }catch(SecurityExceptionignore) {                }            }        }    }

注意这里Worker初始化的时候,会通过setState(-1)将state设置为-1,并在runWorker()方法中置为0,上文说过Worker是利用state这个变量来表示锁的状态,那么加锁的操作就是通过CAS将state从0改成1,那么初始化的时候改成-1,也就是表示在Worker启动之前,都不允许加锁操作,我们再看interruptIfStarted()以及interruptIdleWorkers()方法,这两个方法在尝试中断Worker之前,都会先加锁或者判断state是否大于0,因此这里的将state设置为-1,就是为了禁止中断操作,并在runWorker中置为0,也就是说只能在Worker启动之后才能够中断Worker。

另外线程启动之后,其实就是调用了runWorker方法,下面我们看一下具体是如何实现的。

finalvoidrunWorker(Workerw) {Threadwt=Thread.currentThread();Runnabletask=w.firstTask;        w.firstTask=null;        w.unlock();//调用unlock()方法,将state置为0,表示其他操作可以获得锁或者中断workerbooleancompletedAbruptly=true;try{/*                首先尝试执行firstTask,若没有的话,则调用getTask()从队列中获取任务*/while(task!=null||(task=getTask())!=null) {                w.lock();/*                    如果线程池正在关闭,那么中断线程。*/if((runStateAtLeast(ctl.get(),STOP)||(Thread.interrupted()&&runStateAtLeast(ctl.get(),STOP)))&&!wt.isInterrupted())                    wt.interrupt();try{//执行beforeExecute回调beforeExecute(wt, task);Throwablethrown=null;try{//实际开始执行任务task.run();                    }catch(RuntimeExceptionx) {                        thrown=x;throwx;                    }catch(Errorx) {                        thrown=x;throwx;                    }catch(Throwablex) {                        thrown=x;thrownewError(x);                    }finally{//执行afterExecute回调afterExecute(task, thrown);                    }                }finally{                    task=null;//这里加了锁,因此没有线程安全的问题,volatile修饰保证其他线程的可见性w.completedTasks++;                    w.unlock();//解锁}            }            completedAbruptly=false;        }finally{//抛异常了,或者当前队列中已没有任务需要处理等processWorkerExit(w, completedAbruptly);        }    }privatevoidprocessWorkerExit(Workerw,booleancompletedAbruptly) {//如果是异常终止的,那么减少worker的数目if(completedAbruptly)//If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();finalReentrantLockmainLock=this.mainLock;        mainLock.lock();try{//将当前worker中workers中删除掉,并累加当前worker已执行的任务到completedTaskCount中completedTaskCount+=w.completedTasks;            workers.remove(w);        }finally{            mainLock.unlock();        }//上文说过,减少worker的操作都需要调用这个方法tryTerminate();/*            如果当前线程池仍然是运行中的状态,那么就看一下是否需要新增另外一个worker替换此worker*/intc=ctl.get();if(runStateLessThan(c,STOP)) {/*                如果是异常结束的则直接扩容,否则的话则为正常退出,比如当前队列中已经没有任务需要处理,                如果允许core线程超时的话,那么看一下当前队列是否为空,空的话则不用扩容。否则话看一下                是否少于corePoolSize个worker在运行。*/if(!completedAbruptly) {intmin=allowCoreThreadTimeOut?0:corePoolSize;if(min==0&&!workQueue.isEmpty())                    min=1;if(workerCountOf(c)>=min)return;//replacement not needed}            addWorker(null,false);        }    }privateRunnablegetTask() {booleantimedOut=false;//上一次poll()是否超时了for(;;) {intc=ctl.get();intrs=runStateOf(c);//若线程池关闭了(状态大于STOP)//或者线程池处于SHUTDOWN状态,但是队列为空,那么返回nullif(rs>=SHUTDOWN&&(rs>=STOP||workQueue.isEmpty())) {                decrementWorkerCount();returnnull;            }intwc=workerCountOf(c);/*                如果允许core线程超时 或者 不允许core线程超时但当前worker的数目大于core线程数,                那么下面的poll()则超时调用*/booleantimed=allowCoreThreadTimeOut||wc>corePoolSize;/*                获取任务超时了并且(当前线程池中还有不止一个worker 或者 队列中已经没有任务了),那么就尝试                减少worker的数目,若失败了则重试*/if((wc>maximumPoolSize||(timed&&timedOut))&&(wc>1||workQueue.isEmpty())) {if(compareAndDecrementWorkerCount(c))returnnull;continue;            }try{//从队列中抓取任务Runnabler=timed?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):workQueue.take();if(r!=null)returnr;//走到这里表明,poll调用超时了timedOut=true;            }catch(InterruptedExceptionretry) {                timedOut=false;            }        }    }

关闭线程池

关闭线程池一般有两种形式,shutdown()和shutdownNow()

publicvoidshutdown() {finalReentrantLockmainLock=this.mainLock;        mainLock.lock();try{            checkShutdownAccess();//通过CAS将状态更改为SHUTDOWN,这个时候线程池不接受新任务,但会继续处理队列中的任务advanceRunState(SHUTDOWN);//中断所有空闲的worker,也就是说除了正在处理任务的worker,其他阻塞在getTask()上的worker//都会被中断interruptIdleWorkers();//执行回调onShutdown();//hook for ScheduledThreadPoolExecutor}finally{            mainLock.unlock();        }        tryTerminate();//这个方法不会等待所有的任务处理完成才返回}publicListshutdownNow() {Listtasks;finalReentrantLockmainLock=this.mainLock;        mainLock.lock();try{            checkShutdownAccess();/*                不同于shutdown(),会转换为STOP状态,不再处理新任务,队列中的任务也不处理,                而且会中断所有的worker,而不只是空闲的worker*/advanceRunState(STOP);            interruptWorkers();            tasks=drainQueue();//将所有的任务从队列中弹出}finally{            mainLock.unlock();        }        tryTerminate();returntasks;    }privateListdrainQueue() {BlockingQueueq=workQueue;ArrayListtaskList=newArrayList();/*            将队列中所有的任务remove掉,并添加到taskList中,            但是有些队列比较特殊,比如说DelayQueue,如果第一个任务还没到过期时间,则不会弹出,            因此这里通过调用toArray方法,然后再一个一个的remove掉*/q.drainTo(taskList);if(!q.isEmpty()) {for(Runnabler:q.toArray(newRunnable[0])) {if(q.remove(r))                    taskList.add(r);            }        }returntaskList;    }

从上文中可以看到,调用了shutdown()方法后,不会等待所有的任务处理完毕才返回,因此需要调用awaitTermination()来实现

publicbooleanawaitTermination(longtimeout,TimeUnitunit)        throwsInterruptedException{longnanos=unit.toNanos(timeout);finalReentrantLockmainLock=this.mainLock;        mainLock.lock();try{for(;;) {//线程池若已经终结了,那么就返回if(runStateAtLeast(ctl.get(),TERMINATED))returntrue;//若超时了,也返回掉if(nanos<=0)returnfalse;//阻塞在信号量上,等待线程池终结,但是要注意这个方法可能会因为一些未知原因随时唤醒当前线程,//因此需要重试,在tryTerminate()方法中,执行完terminated()回调后,表明线程池已经终结了,//然后会通过termination.signalAll()唤醒当前线程nanos=termination.awaitNanos(nanos);            }        }finally{            mainLock.unlock();        }    }

一些统计相关的方法

publicintgetPoolSize() {finalReentrantLockmainLock=this.mainLock;        mainLock.lock();try{//若线程已终结则直接返回0,否则计算works中的数目//想一下为什么不用workerCount呢?returnrunStateAtLeast(ctl.get(),TIDYING)?0:workers.size();        }finally{            mainLock.unlock();        }    }publicintgetActiveCount() {finalReentrantLockmainLock=this.mainLock;        mainLock.lock();try{intn=0;for(Workerw:workers)if(w.isLocked())//上锁的表明worker当前正在处理任务,也就是活跃的worker++n;returnn;        }finally{            mainLock.unlock();        }    }publicintgetLargestPoolSize() {finalReentrantLockmainLock=this.mainLock;        mainLock.lock();try{returnlargestPoolSize;        }finally{            mainLock.unlock();        }    }//获取任务的总数,这个方法慎用,若是个无解队列,或者队列挤压比较严重,会很蛋疼publiclonggetTaskCount() {finalReentrantLockmainLock=this.mainLock;        mainLock.lock();try{longn=completedTaskCount;//比如有些worker被销毁后,其处理完成的任务就会叠加到这里for(Workerw:workers) {                n+=w.completedTasks;//叠加历史处理完成的任务if(w.isLocked())//上锁表明正在处理任务,也算一个++n;            }returnn+workQueue.size();//获取队列中的数目}finally{            mainLock.unlock();        }    }publiclonggetCompletedTaskCount() {finalReentrantLockmainLock=this.mainLock;        mainLock.lock();try{longn=completedTaskCount;for(Workerw:workers)                n+=w.completedTasks;returnn;        }finally{            mainLock.unlock();        }    }

总结

这篇博客基本上覆盖了线程池的方方面面,但仍然有非常多的细节可以深究,比如说异常的处理,可以参照之前的一篇博客:深度解析Java线程池的异常处理机制 ,另外还有AQS、unsafe等可以之后再单独总结。、

相关文章

网友评论

  • Java耕耘者:你好!我们是Java耕耘者专注于程序员Java开发公众号“Java这点事”。我们很赞赏你的文章,希望能获得转载授权。授权后,你的文章将会在公众号“Java这点事”、发布。我们会注明来源和作者姓名。
    非常感谢~~~

本文标题:Java线程池ThreadPoolExecutor实现原理剖析

本文链接:https://www.haomeiwen.com/subject/qnsfgftx.html