美文网首页
Java 线程池实现原理

Java 线程池实现原理

作者: Jk_zhuang | 来源:发表于2023-05-05 23:53 被阅读0次

概述

现在机器基本都是多核的,开启多线程可以有效地增加系统的吞吐量和性能,如下是开启一个线程最简单的方式

new Thread(new Runnable() {
    @Override
    public void run() {
        // do something
    }
}).start();

这个线程使用完后,就会被系统所回收。线程虽是轻量级的,但其创建、关闭依然需要花费时间、资源。当任务粒度不大的时候,大量创建线程会得不偿失。而且当线程数超过核心数,创建后的线程还会处于等待状态。

因此线程的数量最好是能控制的,且能够复用,线程池即能满足需求。下面看看创建一个线程池,并提交一个任务去执行的方式

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
    @Override
    public void run() {
        // do something
    }
});

实现原理

上面创建了一个只拥有一个线程的线程池,并提交一个任务去执行。线程池的创建可以使用线程池工厂 Executors 里面的 new... 系列方法,含义如下

Executors.newSingleThreadExecutor(); // 创建一个只有单个线程的线程池。任务提交后,若这个线程空闲,则执行,否则加入队列,待线程空闲后执行
Executors.newFixedThreadPool(numbersOfThread); // 创建一个有numbersOfThread个线程的线程池。任务提交后,若有空闲线程,则执行,否则加入队列,待有线程空闲后执行
Executors.newCachedThreadPool(); // 创建一个有无限个线程的线程池。任务提交后,若有空闲线程,则执行,否则创建新的线程去执行
Executors.newSingleThreadScheduledExecutor(); // 在newSingleThreadExecutor之上扩展了在给定时间执行某任务的功能
...

查看线程工厂 Executors new... 方法的实现,我们可以看到最终都是调用了 ThreadPoolExecutor 的构造方法

public ThreadPoolExecutor(int corePoolSize, // 核心线程数量
                          int maximumPoolSize, // 最大线程数量
                          long keepAliveTime, // 当线程数量超过核心线程数量时,其余线程保活时间
                          TimeUnit unit, // 时间单位
                          BlockingQueue<Runnable> workQueue, // 任务队列
                          ThreadFactory threadFactory, // 线程工厂
                          RejectedExecutionHandler handler // 任务提交失败时的执行策略
)

这里先说下任务提交后,线程池的执行策略

(1) 当线程池的实际线程数量小于corePoolSize时,则优先创建核心线程;
(2) 若大于等于corePooSize,则将新的任务加入等待队列;
(3) 若加入队列失败,并且线程数小于maximumPoolSize,则创建新的线程执行任务;
(4) 否则执行拒绝策略。

下面我们看看具体的源码

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // workerCountOf:当前线程总数
            if (addWorker(command, true)) // 创建一个核心线程并执行当前提交任务
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // 把任务加到任务队列里面
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) // 创建一个非核心线程并执行任务
            reject(command);
    }

接下来我们看看 Worker 工作线程的执行过程

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable // 实现了Runnable接口
    {

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this); // 用一开始设置的线程工厂创建了线程,addWorker之后会调用这个线程的start方法启动线程
        }

        public void run() {
            runWorker(this); // 执行工作线程
        }

        final void runWorker(Worker w) {
            try {
                while (task != null || (task = getTask()) != null) { // 如果当前任务未执行,则先执行当前任务;否则去任务队列里面拿任务执行,拿不到任务时,这个线程就退出了
                    ...
                    try {
                        beforeExecute(wt, task); //任务执行前的回调
                        Throwable thrown = null;
                        try {
                            task.run(); // 任务执行
                        } finally {
                            afterExecute(task, thrown); // 任务执行完的回掉
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }

接下来看看 getTask() 获取任务的过程

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 要等待 or 阻塞

            try {
                Runnable r = timed ?
                    // 从队列里面拿任务,如果队列为空,最长等待时间为 keepAliveTime
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 
                    // 从队列里面拿任务,如果队列为空,则阻塞,直到队列有新任务添加为止
                    workQueue.take(); 
                    // 所以线程池的核心线程为什么不会销毁、
                    // 非核心线程为什么能存活 keepAliveTime 时间,
                    // 超时后会被回收,是不是豁然开朗了?
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

可以看到,线程池实现的相关特性,主要是通过 getTask() 时从任务队列里面拿任务的等待阻塞来实现的

至于线程工厂,这里不再赘述。

下面最后再看看拒绝策略的种类

(1) AbortPolicy:直接抛异,阻止系统正常工作
(2) CallerRunsPolicy:只要线程池未关闭,直接在调用者线程执行当前任务
(3) DiscardOledesPolicy:丢弃最老的请求,尝试重新提交任务
(4) DiscardPolicy:默默丢弃当前提交的任务
或者可以自己实现RejectedExecutionHandler接口

至此,线程池的大体实现基本就清晰了

相关文章

  • ThreadPoolExecutor线程池原理

    本文参考Java线程池---addWorker方法解析Java线程池ThreadPoolExecutor实现原理线...

  • 美团一面总结

    线程池的实现原理 参考: Java线程池实现原理及其在美团业务中的实践[https://tech.meituan....

  • 线程以及java线程池实现分享

    线程以及java线程池实现分享 线程简介 JDK线程池的工作原理 JDK线程池的实现细节 1.线程简介-由来 1....

  • JAVA线程池常见用法及其原理

    JAVA线程池常见用法及其原理 JAVA线程池常见用法: 1.代码实现 import lombok.extern....

  • 线程池的原理

    参考 深入Java源码理解线程池原理 线程池是对CPU利用的优化手段 线程池使用池化技术实现,替他的实现还有连接池...

  • 面试题2019年7月

    线程池原理 参考:Java 线程池原理分析 线程池工作原理:1、线程数量小于 corePoolSize,直接创建新...

  • Java中线程池,你真的了解会用吗

    在《深入源码分析Java线程池的实现原理》这篇文章中,我们介绍过了Java中线程池的常见用法以及基本原理。 在文中...

  • JAVA线程及线程池使用

    参考如下文章进行整理: JAVA四种线程池的使用 Java线程池实现原理及其在美团业务中的实践 JAVA几种线程使...

  • 分析Java线程池Callable任务执行原理

    Java并发编程源码分析系列: 分析Java线程池的创建 分析Java线程池执行原理 上一篇分析了线程池的执行原理...

  • 线程池源码解读

    深入分析java线程池的实现原理 ps: 用一个AtomicInteger记录 线程池状态和其中的线程个数, 其中...

网友评论

      本文标题:Java 线程池实现原理

      本文链接:https://www.haomeiwen.com/subject/oxexsdtx.html