美文网首页
java多线程-8-线程池

java多线程-8-线程池

作者: 宠辱不惊的咸鱼 | 来源:发表于2019-09-30 09:08 被阅读0次

    问题

    • Worker怎么维持?
      • run方法中有个while循环,getTask时阻塞在阻塞队列的消费端
    • Worker怎么失效?
      • getTask中,超过核心池的线程会返回null,然后run中的while被打破,run结束了,自然就被回收了

    ThreadPoolExecutor

    构造函数

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
        if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    状态参数

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 29个1
    
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS; // 111 + 29个0
    private static final int SHUTDOWN   =  0 << COUNT_BITS; // 000 + 29个0
    private static final int STOP       =  1 << COUNT_BITS; // 001 + 29个0
    private static final int TIDYING    =  2 << COUNT_BITS; // 010 + 29个0
    private static final int TERMINATED =  3 << COUNT_BITS; // 011 + 29个0
    
    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; } // 取c前3位
    private static int workerCountOf(int c)  { return c & CAPACITY; }  // 取c后29位
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    • ctl初始值:111 + 29个0

    execute

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // 第1次时workerCountOf(c)为0
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
    
    // Worker是个AQS,也是个Runnable
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // 大于SHUTDOWN || 等于SHUTDOWN && firstTask不为空 || 等于SHUTDOWN && workQueue为空,返回false
            // 含义是:仅在等于SHUTDOWN时,会接着消耗workQueue
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false; // 超CAPACITY || 超size,返回false
                if (compareAndIncrementWorkerCount(c)) // 111 + 29个0 -> 111 + 28个0 + 1
                    break retry; // +1成功,直接break
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); // Worker extends AQS implements Runnable
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock; // 非公平ReentrantLock
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
    
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w); // workers:HashSet<Worker>
                        int s = workers.size();
                        if (s > largestPoolSize) // int largestPoolSize
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    
    • 最后Worker构造函数这里可以看出thread的参数其实是Worker自己,而真实任务则是firstTask属性
    • 那么,上面t.start()那里,其实执行的是Worker的run方法

    run

    public void run() {
        runWorker(this); // this是Worker
    }
    
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) { // firstTask不为空,或者workQueue不为空,轮询执行任务
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    
    • execute -> addWorker(ctl+1) -> t.start -> run -> runWorker -> task -> getTask -> workQueue.take处阻塞
    • corePoolSize未满前,新来的任务都去新增Worker,因为之前的Worker都阻塞在workQueue
    • corePoolSize满时,新来的任务进workQueue;之前阻塞在take的Worker去消耗任务
    • workQueue也满了,启用maximumPoolSize
    • 超出corePoolSize的Worker,在消耗完workQueue后,会进入processWorkerExit,把自己移出workers,即自失效

    ThreadFactory

    // default
    Executors.defaultThreadFactory()
    
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
        }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    
    • new个Thread
    • 设置成非daemon
    • 设置成普通优先级
    • 线程池中的线程时由线程工厂创建并返回的

    reject策略

    • 发生在maximumPoolSize依旧被耗光时
    • 内置4种,默认AbortPolicy
    • AbortPolicy - 简单粗暴抛异常
    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() {}
    
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
        }
    }
    
    • DiscardPolicy - 丢弃任务,啥也不干
    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() {}
    
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    
    • DiscardOldestPolicy - 把workQueue中最老的command扔掉,把新的执行execute
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() {}
    
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
    
    • CallerRunsPolicy - 调用线程池方法的线程同步执行run方法
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() {}
    
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
    

    线程池状态

    • RUNNING
      • 创建后的初始状态
      • 可接收新任务
      • 可处理workQueue任务
    • SHUTDOWN
      • RUNNING -> shutdown() -> SHUTDOWN
      • 不接收新任务
      • 可处理workQueue任务
    • STOP
      • RUNNING | SHUTDOWN -> shutdownNow() -> STOP
      • 不接收新任务
      • 不处理workQueue任务
      • 中断正在处理的任务
    • TIDYING
      • SHUTDOWN -> workQueue 和 执行中变空 -> TIDYING
      • STOP -> 执行中变空 -> TIDYING
      • 标识:workerCountOf(ctl) == 0
      • 会执行terminated()方法做后续处理,默认为空
    • TERMINATED
      • TIDYING -> terminated() -> TERMINATED
      • 线程池彻底终止

    相关文章

      网友评论

          本文标题:java多线程-8-线程池

          本文链接:https://www.haomeiwen.com/subject/yzrcpctx.html