美文网首页
java线程池执行原理

java线程池执行原理

作者: 紫色红色黑色 | 来源:发表于2019-12-03 23:24 被阅读0次

描述

JDK提供的工具类生成的线程池会造成内存溢出,所以需要自己定义线程池。

public static void main(String[] args) {

    // 线程最大数为Integer.MAX_VALUE
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);

    // 队列的最大数为Integer.MAX_VALUE
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
}

测试代码

public static void main(String[] args) {
    AtomicInteger i = new AtomicInteger(0);

    /**
     * corePoolSize:核心线程数
     * maximumPoolSize:最大线程数
     * keepAliveTime:大于核心线程数下,被回收的线程空闲时间
     * workQueue:阻塞队列
     * threadFactory:线程工厂
     * rejectedExecutionHandler:拒绝策略
     *
     */
    ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4,
            3000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2), runnable -> {

        Thread thread = new Thread(runnable);
        thread.setName("pool-test-" + i.getAndIncrement());
        return thread;
    }, new ThreadPoolExecutor.AbortPolicy());

    /**
     * 提交任务流程:一个Thread去执行一个Runnable,
     * 当Runnable数超过corePoolSize时,将Runnable放入BlockQueue中,
     * 当BlockQueue超过容量并且线程数还没超过maximumPoolSize时,就创建Thread执行该提交的Runnable
     * 当BlockQueue超过容量,线程数超过maximumPoolSize时,就执行拒绝策略
     */
    for (int j = 0; j < 7; j++) {
        executor.execute(()->{
            System.out.println("action");
        });
    }

}

线程池执行流程

提交任务流程

1.如果正在运行的线程数小于核心线程数,就创建一个线程去执行该任务;
2.如果正在运行的线程数等于核心线程数,就将该任务放入阻塞队列中;
3.如果核心线程满额、阻塞队列满额,而正在运行的线程数小于最大线程数,就创建一个线程去执行该任务;
4.如果核心线程满额、阻塞队列满额,最大线程数满额,就执行拒绝策略。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 入参为true表示以核心线程数为运行线程数的边界
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 检查是否调用shutDown,任务入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 入参为false表示以最大线程数为运行线程数的边界
    else if (!addWorker(command, false))
        reject(command);
}

任务执行流程

ThreadPoolExecutor中Worker封装了执行载体(Thread)和第一个执行任务(Runnable)。并使用HashSet来保存Worker。上述代码中addWorker()中会调用Thread.start()去启动线程。

Thread.start()->Worker.run()->ThreadPoolExecutor.runWorker()->firstTask.run()
调用堆栈
private final class Worker 
    extends AbstractQueuedSynchronizer
    implements Runnable
{

    final Thread thread;
    Runnable firstTask;

    Worker(Runnable firstTask) {
        setState(-1);
        this.firstTask = firstTask;
        // 以Worker为参数生成Thread
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }
}

1.执行第一个任务;
2.从队列中取出任务,如果当前线程大于核心线程数,队列中没有任务会阻塞线程keepAliveTime时间,阻塞后还是没有任务则线程销毁;
3.如果当前线程不大于核心线程数,队列中没有任务会一直阻塞线程,直到取出任务为止;
4.从队列中取出任务后执行。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        /**
        * 先执行第一个Runnable,然后从队列中取出Runnable执行
        * 队列中取出Runnable时,
        *      如果线程数大于核心线程数,队列中没有任务时。线程会阻塞keepAliveTime时间,阻塞后还是没有任务,则线程销毁
        *      如果线程数不大于核心线程数,队列中没有任务时,会一直阻塞直到取出为止
        */
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 执行任务前扩展
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 执行任务h后扩展
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 销毁线程
        processWorkerExit(w, completedAbruptly);
    }
}

相关文章

网友评论

      本文标题:java线程池执行原理

      本文链接:https://www.haomeiwen.com/subject/oefzvctx.html