美文网首页
java线程池执行原理

java线程池执行原理

作者: 紫色红色黑色 | 来源:发表于2019-12-03 23:24 被阅读0次

    描述

    JDK提供的工具类生成的线程池会造成内存溢出,所以需要自己定义线程池。

    public static void main(String[] args) {
    
        // 线程最大数为Integer.MAX_VALUE
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
    
        // 队列的最大数为Integer.MAX_VALUE
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    }
    

    测试代码

    public static void main(String[] args) {
        AtomicInteger i = new AtomicInteger(0);
    
        /**
         * corePoolSize:核心线程数
         * maximumPoolSize:最大线程数
         * keepAliveTime:大于核心线程数下,被回收的线程空闲时间
         * workQueue:阻塞队列
         * threadFactory:线程工厂
         * rejectedExecutionHandler:拒绝策略
         *
         */
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4,
                3000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2), runnable -> {
    
            Thread thread = new Thread(runnable);
            thread.setName("pool-test-" + i.getAndIncrement());
            return thread;
        }, new ThreadPoolExecutor.AbortPolicy());
    
        /**
         * 提交任务流程:一个Thread去执行一个Runnable,
         * 当Runnable数超过corePoolSize时,将Runnable放入BlockQueue中,
         * 当BlockQueue超过容量并且线程数还没超过maximumPoolSize时,就创建Thread执行该提交的Runnable
         * 当BlockQueue超过容量,线程数超过maximumPoolSize时,就执行拒绝策略
         */
        for (int j = 0; j < 7; j++) {
            executor.execute(()->{
                System.out.println("action");
            });
        }
    
    }
    

    线程池执行流程

    提交任务流程

    1.如果正在运行的线程数小于核心线程数,就创建一个线程去执行该任务;
    2.如果正在运行的线程数等于核心线程数,就将该任务放入阻塞队列中;
    3.如果核心线程满额、阻塞队列满额,而正在运行的线程数小于最大线程数,就创建一个线程去执行该任务;
    4.如果核心线程满额、阻塞队列满额,最大线程数满额,就执行拒绝策略。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            // 入参为true表示以核心线程数为运行线程数的边界
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 检查是否调用shutDown,任务入队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 入参为false表示以最大线程数为运行线程数的边界
        else if (!addWorker(command, false))
            reject(command);
    }
    

    任务执行流程

    ThreadPoolExecutor中Worker封装了执行载体(Thread)和第一个执行任务(Runnable)。并使用HashSet来保存Worker。上述代码中addWorker()中会调用Thread.start()去启动线程。

    Thread.start()->Worker.run()->ThreadPoolExecutor.runWorker()->firstTask.run()
    
    调用堆栈
    private final class Worker 
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
    
        final Thread thread;
        Runnable firstTask;
    
        Worker(Runnable firstTask) {
            setState(-1);
            this.firstTask = firstTask;
            // 以Worker为参数生成Thread
            this.thread = getThreadFactory().newThread(this);
        }
    
        public void run() {
            runWorker(this);
        }
    }
    

    1.执行第一个任务;
    2.从队列中取出任务,如果当前线程大于核心线程数,队列中没有任务会阻塞线程keepAliveTime时间,阻塞后还是没有任务则线程销毁;
    3.如果当前线程不大于核心线程数,队列中没有任务会一直阻塞线程,直到取出任务为止;
    4.从队列中取出任务后执行。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            /**
            * 先执行第一个Runnable,然后从队列中取出Runnable执行
            * 队列中取出Runnable时,
            *      如果线程数大于核心线程数,队列中没有任务时。线程会阻塞keepAliveTime时间,阻塞后还是没有任务,则线程销毁
            *      如果线程数不大于核心线程数,队列中没有任务时,会一直阻塞直到取出为止
            */
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                        runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 执行任务前扩展
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 执行任务h后扩展
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 销毁线程
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    相关文章

      网友评论

          本文标题:java线程池执行原理

          本文链接:https://www.haomeiwen.com/subject/oefzvctx.html