线程池再探

作者: 小鱼嘻嘻 | 来源:发表于2018-01-20 17:23 被阅读12次
    线程池源码之execute

    execute:

     public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
           //workerCountOf 获取线程数量,isRunning获取线程池状态
            int c = ctl.get();
            //如果小于核心线程池数量直接添加到线程池里
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //大于核心线程数量,加入队列,会做一个dobbule check,
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //再次尝试添加线程,失败的话执行拒绝策略
            else if (!addWorker(command, false))
                reject(command);
        }
    

    总体来说就是:

    • 如果线程的数量小于线程池的核心线程数直接创建线程执行;
    • 如果加入线程池失败,说明超过核心线程数量尝试加入阻塞队列;
    • 如果加入阻塞队列失败了,尝试重新加入线程池如果也失败了就说明超过了最大线程数量,执行拒绝策略。

    addWorker:

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // check 线程池的状态和阻塞队列是否为空
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
                //这段for循环的代码含义是:往线程池里面添加一个线程
                for (;;) {
                    int wc = workerCountOf(c);
                    // check 线程数量是否超出
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 添加成功直接退出循环,添加失败的话再次尝试
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
            //这段代码的含义是:把当前线程加入到hashSet里面,加入成功后启动线程,加入失败的话,从set里面删除,并且尝试停止线程池
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                //把线程封装成Worker,worker是继承了Aqs的
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                          int rs = runStateOf(ctl.get());
                        // check 线程池状态
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                           //加入到set里
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    //加入成功启动线程
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                //加入失败,删除线程,尝试停止线程池
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    总体来说分为两步:

    • 第一步:尝试给线程池的数量加一,失败继续尝试,成功退出。
    • 第二步:线程池数量成功加一之后,采用加锁的方式把线程加入到set里,然后启动线程,如果启动失败删除线程,尝试停止线程池。

    来看一下work对象:

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable{
          
            final Thread thread;
            Runnable firstTask;
            volatile long completedTasks;
            //构造方法
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            public void run() {
                runWorker(this);
            }
    
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    
    • 首先,这个work对象实现了runnable接口,就有了线程的属性。
    • 其次,这个work继承了AbstractQueuedSynchronizer就有了锁的功能
    • 最后,我们来看一下runWorker这个方法。

    runWorker:

    //运行当前线程
     final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
          
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        //执行前操作
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            //运行当前线程
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                           //执行后操作
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    getTask:

    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
            // 主要功能就是从队列里面获取线程
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
                // 获取任务
                try {
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    
    线程池源码之shutdown

    先来搞清楚几个概念:
    interrupt():为当前线程打上停止的标记
    interrupted():测试当前是否中断。此方法具有清除功能
    isInterrupted(): 测试线程是否中断。此方法不会清除

    shutdown:停止线程池,优雅停掉,等队列里面的线程执行完再停掉。

    public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                //将线程池的状态设置为SHUTDOWN
                advanceRunState(SHUTDOWN);
              //为所有线程打上停止标记
                interruptIdleWorkers();
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
        }
    //为所有线程打上停止标记
    private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }
    
    // 尝试tryTerminate
    final void tryTerminate() {
            for (;;) {
                int c = ctl.get();
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
                if (workerCountOf(c) != 0) { // Eligible to terminate
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
    
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {
                            terminated();
                        } finally {
                            ctl.set(ctlOf(TERMINATED, 0));
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                // else retry on failed CAS
            }
        }
    
    线程池源码之shutdownNow

    shutdownNow:比较粗暴的停掉,不会等阻塞队列执行完再停掉。

      public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                //把线程池状态改为STOP
                advanceRunState(STOP);
                interruptWorkers();
                // 把队列的任务取出来
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
            return tasks;
        }
    //把队列的任务放入到list里面返回
      private List<Runnable> drainQueue() {
            BlockingQueue<Runnable> q = workQueue;
            ArrayList<Runnable> taskList = new ArrayList<Runnable>();
            q.drainTo(taskList);
            if (!q.isEmpty()) {
                for (Runnable r : q.toArray(new Runnable[0])) {
                    if (q.remove(r))
                        taskList.add(r);
                }
            }
            return taskList;
        }
    // 打上停止的标记
    void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
    

    相关文章

      网友评论

        本文标题:线程池再探

        本文链接:https://www.haomeiwen.com/subject/kfqtaxtx.html