美文网首页
线程池execute源码分析

线程池execute源码分析

作者: __y | 来源:发表于2021-11-22 14:19 被阅读0次

    execute是我们提交一个线程的时候,线程池执行的我们运行线程的一个api
    ThreadPool 有几个比较重要的参数会涉及到这个源码的操作

    • corePoolSize:核心线程数;
    • maxMumPoolSize:最大线程数
    • BlockingQueue:阻塞队列
    
     public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            // 获得ctl参数;其中高3位表示线程的状态。后面29位表示线程的数量
            int c = ctl.get();
          // 如果线程数少于核心线程数,就创建一个worker(本质也是一个线程,执行我们的传入的线程),
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
    
    // 这里校验isRunning.. 如果我们关闭线程池的时候,这里是需要做判断的
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    
    
    
    // 添加一个worker。firstTask 是我们传进来需要执行的线程,core:是为了后面判断的时候用coolPoolSize还是 maximumPoolSize
    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
              // 这里是异常情况,如果是线程池shoutdown了,就直接退出了
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
            // 获得线程数
                    int wc = workerCountOf(c);
    // 这里判断是不是大于了最大的线程数,如果我们是第一个线程,核心线程>1,不会走这个判断,                
    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
    // 少于核心线程数,把工作线程数+1
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
          
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
              //    新建一个wokrer
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
                    // 这里和上面的校验一样,做了双层校验
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                          // 新增一个wokrer到HashSet集合中
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
    //   如果添加成功的话,就让worker里面的Thread启动执行我们传进去的线程代码
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    那么我们的线程里面run方法里面的代码在哪执行,然后后面又是怎么从队列里面拿出下一个任务出来执行的呢?
    所有的秘密都在wokrer中构造方法中

      Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
    // 这里会创建一个线程(将当前worker,因为wokrer实现了Runnable方法)
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                runWorker(this);
            }
    

    runWorker是真正将线程跑起来的方法

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                // 如果是第一个线程的话,task不为null。那么就执行下面的代码
                // 这是一个循环。当我们第一个任务执行完后,task会被赋值为null,然后就开始从队列面拿任务执行,如果task不为null的话就继续执行任务,不断循环直到task为null(队列为空)
                while (task != null || (task = getTask()) != null) {
                    w.lock();
    
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                        // 调用我们传入的线程任务
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
            // 如果队列为空,这种情况就会跑到这里
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    
    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
              // 如果线程数大于核心线程数,或者allowCoreThreadTimeOut  为true 这里才为true
              // allowCoreThreadTimeOut  默认是false。如果为false的话,则会常驻核心线程
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                  // 如果是true的话,就执行从队列里面poll,当前线程会等待指定的keepAliveTime时间,如果超时则返回为null。
    // 否则的话就是take(生产者,消费者模型,如果有任务了就执行,没有任务就阻塞.. 这样就保证了核心线程数是不会销毁的)
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    线程池如何保证核心线程数。
    队里为空的时候,task就会返回null。runWorker就会跳出循环执行processWorkerExit方法

    
     private void processWorkerExit(Worker w, boolean completedAbruptly) {
            if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
              // 这里会移除一个worker
                completedTaskCount += w.completedTasks;
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
    
            tryTerminate();
    
            int c = ctl.get();
            if (runStateLessThan(c, STOP)) {
                if (!completedAbruptly) {
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
        // 这里重新将wokrer加进去,保证核心线程数不变
                addWorker(null, false);
            }
        }
    

    相关文章

      网友评论

          本文标题:线程池execute源码分析

          本文链接:https://www.haomeiwen.com/subject/mkxzsltx.html