美文网首页
线程池实现

线程池实现

作者: loading_17 | 来源:发表于2018-06-26 23:34 被阅读0次

    线程池核心参数

    1. corePoolSize
      核心线程数,当有任务提交的时候,便会创建一个线程,如果创建的线程数量达到核心线程数,后续任务便会放入阻塞队列中。可以使用prestartAllCoreThreads()提前创建所有核心线程数。
    2. maximumPoolSize
      最大线程数。当阻塞队列已满,后续还有任务进入的时候,便继续创建线程,最大上限是maximumPoolSize。
    3. keepAliveTime
      表示线程的存活时间。当空闲线程数大于corePoolSize时所存活的时间。
    4. unit
      存活时间的单位
    5. workQueue
      等待被执行任务的阻塞队列
      ArrayBlockingQueue:数组形式的有界阻塞队列,FIFO
      LinkedBlockingQueue:链表形式无界阻塞队列,FIFO
      SynchronousQueue:不存储元素的阻塞队列,插入操作必须等待另外一个线程调用remove操作。
      priorityBlockingQueue:优先队列
    6. ThreadFactory
      线程工厂,可以自定义线程名。
    7. RejectedExecutionHandler
      队列满之后的处理策略
      AbortPolicy:直接抛异常
      CallerRunsPolicy:使用调用线程来执行任务,也就是主线程
      DiscardOldestPolicy:抛弃最前的任务并执行当前任务
      DiscardPolicy:直接丢弃任务

    线程池种类

    1. newFixedThreadPool
      使用LinkedBlockingQueue作为阻塞队列,corePoolSize与maximumPoolSize相等。
    2. newSingleThreadExecutor
      线程池只有一个线程,使用LinkedBlockingQueue。
    3. newCacheThreadPool
      corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,使用SynchronousQueue作为阻塞队列。
    4. newScheduledThreadPool
      通过new ScheduledThreadPoolExecutor(corePoolSize)的方式进行创建。maximunPoolSize为Integer.MAX_VALUE,使用DelayedWorkQueue。

    这里先说说几种线程池的状态

    1. RUNNING:能够接受新任务,线程池一旦创建就处于running状态。
    2. SHUTDOWN:不接受新任务,但会处理已添加的任务。调用shutdown接口时,由running状态变成shutdown。
    3. STOP:不接受新任务,不处理已经被添加的任务。并且会中断正在处理的任务。调用shutdownnow后状态为stop
    4. TIDYING:所有任务终止。在处于tidying状态后会执行terminated。
    5. TERMINATED

    在ThreadPoolExecutor中,可以看到使用3位表示线程池状态,29位表示线程数。

    看下下面的实例

    public class ExecutorTest {
        public static void main(String[] args) {
            Executor executor = Executors.newFixedThreadPool(15);
            for(int i=0;i < 30;i++){
                executor.execute(new Test());
            }
    
        }
    
        static class Test implements Runnable{
    
            public void run() {
                System.out.println(Thread.currentThread().getId()+" "+Thread.currentThread().getName());
            }
        }
    }
    

    跟进execute看看。

       public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
    /*        
    *  1.  如果少于corePoolSize的线程在运行,则尝试对第一个任务开启新
    *  线程。对addWorker的调用会自动检测runState和workerCount,以防
    *  止在不该添加线程的时候添加线程出现错误警告。
    *  2.  如果一个任务能够成功入队,仍然需要对是否增加线程做双重检查
    *  或者可能从进入该方法后线程池就shutdown了。所以需要对state做
    *  再次检查,如果有必要的话一旦pool停止,就需要回滚入队操作,或
    *  者当没有在没有任何线程的情况下去创建一个新的。
    *  3.  如果我们不能让任务入队,我们将尝试增加一个线程。如果失败
    *  了,将尝试新增一个线程。如果失败了,便知道pool处于shut down
    *  或者饱和状态,必须要拒绝任务。
    */
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    
    1. 首先计算当前的线程数,如果小于核心线程数,则执行addWorker增加线程数。如果没有则进入下面一个判断。
    2. 判断线程是否处于running状态,如果是则尝试offer这个任务,offer是会直接返回true或者false。如果入队成功则做二次检查,并继续判断如果不处于running状态就需要删除刚才加入的任务,并调用reject方法。如果处于running,并且线程数为0则进行addWorker操作。这里为什么addWorker后面的参数是false到具体方法再讲。
    3. 如果不是running状态或者入队失败,则尝试添加任务,如果再失败则采取拒绝策略。

    那么来看看addWorker代码

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
    1. 先判断pool的状态,如果大于等于shutdown,或者等于shutdown的时候有任务传入或是workQueue为空都会直接返回fasle。shutdown不接受新任务,如果队列为空肯定也是要返回false,不为空的话shutdown状态需要把剩余的任务处理完。
    2. 接着一个循环判断线程数,capacity是2的29次方-1,也就是最大线程数。注意到core参数,会传入true或者是false,这里用来选择使用core还是maximum来判断。如果core是true,wc大于等于核心数,则添加失败。如果core是false,则wc与maximum做判断。所以如果wc小于核心线程则跳出循环去创建线程。
    3. 创建线程通过可重入锁和Worker类配合进行。Worker类继承AbstractQueuedSynchronizer并实现了Runnable接口。

    代码中有new Worker(Task),先看看构造

            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    

    这里使用newThread,将worker自己传入,因为其实现了Runnable,所以启动线程的时候会调用其run方法,而其run方法中调用了runWoker。

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    这里通过Worker加锁的方式,在run方法之前做一个自定义的beforeExecute处理,接着执行任务的run方法,最后执行afterExecute。

    再看上述判断中的getTask

        private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    这里有个allowCoreThreadTimeout,这是线程池提供是否允许核心线程Timeout的一个设置属性。如果允许或者wc大于核心线程数,则time为true,使用workQueue.poll来获取任务,如果队列为空则会返回null,因为是允许超时所以会等待keepalivetime。如果是false,则使用workQueue.take,如果为空则会阻塞。

    相关文章

      网友评论

          本文标题:线程池实现

          本文链接:https://www.haomeiwen.com/subject/zrrpyftx.html