美文网首页
java线程池核心方法

java线程池核心方法

作者: 何甜甜在吗 | 来源:发表于2018-03-29 14:58 被阅读0次

    与线程池相关的核心类为ThreadPoolExecutor,但是在实际应用中,我们并不会调用这个类来使用线程池,而是使用Executors,Executors类封装了生成几种不同线程池的方法。

    线程池的用法

    ExecutorService es = Executors.newCachedThreadPool();
    es.execute(Runnable run);
    

    所以从execute(Runnable command)方法讲起,这个方法会将任务提交给一个线程进行执行
    execute(Runnable command)方法源码:

    //调用execute方法会将线程提交到线程池中  
     public void execute(Runnable command) {
            //如果任务为null,则抛出异常 
            if (command == null)
                throw new NullPointerException();
        
            //1.如果当前线程数量小于corePoolSize的大小
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {  
                //addWorker方法会进一步进行检查,因为有可能其他线程对线程池的状态做了改变
                //addWorker方法中进行两个检查1)线程池的状态 2)当前线程数量是否超过corePoolSize的大小
                //检查完成以后如果都否合要求,创建一个Worker,new Worker(Runnable r),如果条件满足,会将这个worker加入到HashSet<Worker>中去,并且返回true
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //2.如果corePoolSize已经满了,则需要加入到阻塞队列
            //会进行一个判断,线程池的状态以及是否可以往阻塞队列中继续添加runnable
            if (isRunning(c) && workQueue.offer(command)) {  
                int recheck = ctl.get();
                //在进行一次判断,这个判断主要是为了有其他线程调用了shutDown或者shutDownNow方法,这时候如果再有任务就会拒绝执行
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //如果此时队列已满,则会采取相应的拒绝策略
            //addWorker中第二参数boolean core,如果false,则边界为maxmunSize,如果为true,则边界为corePoolSize
            else if (!addWorker(command, false))
                reject(command);
        }
    

    execute(Runnable command)方法战略三步走
    1.往corePoolSize中加入任务进行执行
    2.当corePoolSize满时往阻塞队列中加入任务
    3.阻塞队列满时并且maximumPoolSize已满,则采取相应的拒绝策略

    因为execute(Runnable command)方法没有加锁,所以做了很多相同的判断,因为很有可能这个线程在执行execute方法时有其他线程已经完成了execute方法并且改变了线程池的状态(比如可能因为其他线程的执行导致corePoolSize已满,或者其他线程调用了shutDown()或者shutDownNow()方法拒绝在接受新任务)
    可以看出execute(Runnable command)方法中最核心的方法是addWorker(Runnable firstTask, boolean core)
    先看addWorker(Runnable firstTask, boolean core)方法的源码:

     private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                //返回线程池的状态
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    //打破死循环的关键
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                //为什么叫addWorker方法,从这里就可以看出,创建了一个Worker对象,并且Runnable是其里面的一个字段
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    //正真执行到往corePoolSize添加任务时会进行一个加锁操作
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
                        //在加锁之后还会进行一个判断
                        //判断1.线程池是否在running 2.任务是否被移除
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            //往corePoolSize中加入任务
                            workers.add(w);
                            int s = workers.size();
                            //调整曾经线程池拥有最大线程数的大小
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            //改变状态
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        //运行该任务,t.start方法最终会调用native void start0()方法 
                        t.start();
                        //改变状态
                        workerStarted = true;
                    }
                }
            } finally {
                //如果加入失败
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
    

    从代码中看出其实corePoolSize中就是维护了一个HashSet<Worker>
    并且在真正往corePoolSize加入任务时会进行一个加锁,加完锁还会做一个验证
    很多细节还待以后继续补充,现在学习源码的功力还不够

    相关文章

      网友评论

          本文标题:java线程池核心方法

          本文链接:https://www.haomeiwen.com/subject/eremcftx.html