美文网首页Java基础
并发编程(八)线程池详解

并发编程(八)线程池详解

作者: Timmy_zzh | 来源:发表于2021-01-13 22:22 被阅读0次
前言
  • java中线程的创建以及上下文切换是比较消耗性能的,因此引入了偏向锁,轻量级锁等优化技术,目的就是减少用户态和核心态之间的切换频率。
既然线程的创建和销毁非常消耗性能,那有没有可能复用已经被创建好的线程呢?

1.线程池

  • 线程池的本质就是为了节省资源和更好的进行资源管理
    • 线程的创建需要开辟虚拟机栈,本地方法栈,程序计数器等线程私有的内存空间,在线程销毁时需要回收这些系统资源,频繁地创建销毁线程会浪费大量资源。
    • 而通过复用已有线程可以更好地管理和协调线程的工作
  • 线程池主要解决两个问题:
    • 当执行大量异步任务时线程池能够提供很好的性能
    • 线程池提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等。
1.1.线程池体系
1.线程池体系.png
源码
// 底层接口Executor,只有一个方法
public interface Executor {
    void execute(Runnable var1);
}

// 接口ExecutorService,抽象了线程池操作方法
public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;
    <T> Future<T> submit(Callable<T> var1);
    <T> Future<T> submit(Runnable var1, T var2);
    Future<?> submit(Runnable var1);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
}

public abstract class AbstractExecutorService implements ExecutorService {
    public AbstractExecutorService() {
    }
}

public class ThreadPoolExecutor extends AbstractExecutorService {
    private final AtomicInteger ctl;
    private static final int COUNT_BITS = 29;
    private static final int CAPACITY = 536870911;
    private static final int RUNNING = -536870912;
    private static final int SHUTDOWN = 0;
    private static final int STOP = 536870912;
    private static final int TIDYING = 1073741824;
    private static final int TERMINATED = 1610612736;
    private final BlockingQueue<Runnable> workQueue;
    private final ReentrantLock mainLock;
    private final HashSet<ThreadPoolExecutor.Worker> workers;
    private final Condition termination;
    private int largestPoolSize;
    private long completedTaskCount;
    private volatile ThreadFactory threadFactory;
    private volatile RejectedExecutionHandler handler;
    private volatile long keepAliveTime;
    private volatile boolean allowCoreThreadTimeOut;
    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
    private static final RejectedExecutionHandler defaultHandler = new ThreadPoolExecutor.AbortPolicy();
    private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
    private final AccessControlContext acc;
    private static final boolean ONLY_ONE = true;
    ...
}
  • 解析
    • Executor是线程池最顶层的接口,在Executor接口中只有一个execute方法,该方法用于执行任务。置语线程的创建,调度等细节由子类实现。
    • 接口ExecutorService继承并拓展了Executor,在ExecutorService内部提供了更全面的任务提交机制以及线程池关闭方法。
    • ThreadPoolExecutor是ExecutorService的默认实现,所谓的线程池机制也大多封装在此类当中,重点分析
    • ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,并实现了ScheduledExecutorService接口
    • ForkJoinPool是一种支持任务分解的线程池,一般要配合可分解任务接口ForkJoinTask来实现

2.线程池创建

Executors
  • 为方便开发着使用线程池,JDK提供了一个线程池的工厂类-Executors,在Executors中定义了多个静态方法,用来创建不同配置的线程池。如下
2.1.newSingleThreadExecutor
  • 创建一个单线程化的线程池,他只会用唯一的工作线程来执行任务,保证所有任务按先进先出的顺序执行。
    //使用单线程线程池提交5次操作任务
    private static void singleThreadPool() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            int taskId = i;
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Thread:" + 
                                       Thread.currentThread().getName() +
                            " ,running task:" + taskId);
                }
            });
        }
    }
日志打印:
Thread:pool-1-thread-1 ,running task:0
Thread:pool-1-thread-1 ,running task:1
Thread:pool-1-thread-1 ,running task:2
Thread:pool-1-thread-1 ,running task:3
Thread:pool-1-thread-1 ,running task:4

源码:
    public static ExecutorService newSingleThreadExecutor() {
        return new Executors.FinalizableDelegatedExecutorService(
            new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue()));
    }
  • 发现newSingleThreadExecutor创建的线程池中,提交的task任务始终在同一个线程中被执行
2.2.newCachedThreadPool
  • 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
    private static void cacheThreadPool() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            int taskId = i;
            try {
                //Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Thread:" + 
                                       Thread.currentThread().getName() +
                            " ,running task:" + taskId);
                    try {
                        // 模拟耗时任务
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
日志打印:
Thread:pool-1-thread-1 ,running task:0
Thread:pool-1-thread-1 ,running task:1
Thread:pool-1-thread-1 ,running task:2
Thread:pool-1-thread-1 ,running task:3
Thread:pool-1-thread-1 ,running task:4
    
源码:
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, 
                                      TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }
  • 耗时任务执行完毕,则会将空闲的线程从新拿过来使用
  • 否则创建新线程执行,且新线程创建的最大数目为Integer.MAX_VALUE个
2.3.newFixedThreadPool
//创建一个固定数目的,可重用的线程池
    private static void newFixedThreadPool() {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            int taskId = i;
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Thread:" + 
                                       Thread.currentThread().getName() +
                            " ,running task:" + taskId);
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
日志打印:
> Task :testlib:_05ThreadPool.main()
Thread:pool-1-thread-2 ,running task:1
Thread:pool-1-thread-3 ,running task:2
Thread:pool-1-thread-1 ,running task:0
Thread:pool-1-thread-1 ,running task:3
Thread:pool-1-thread-2 ,running task:5
Thread:pool-1-thread-3 ,running task:4
Thread:pool-1-thread-1 ,running task:6
Thread:pool-1-thread-2 ,running task:7
Thread:pool-1-thread-3 ,running task:8
Thread:pool-1-thread-1 ,running task:9
    
源码:
    public static ExecutorService newFixedThreadPool(int var0) {
        return new ThreadPoolExecutor(var0, var0, 0L,
                                      TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }
  • 上面创建了一个包含固定数量3的线程池,因此虽然向线程池提交了10个任务,但是这10个任务只会被3个线程分配执行。
2.4.newScheduledThreadPool
    private static void newScheduledThreadPool() {
        ScheduledExecutorService scheduledExecutorService = 
            Executors.newScheduledThreadPool(2);
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                Date now = new Date();
                System.out.println("Thread:" + Thread.currentThread().getName()
                                   + " ,timer:" + now.toString());
            }
        },500,500, TimeUnit.MILLISECONDS);

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        scheduledExecutorService.shutdown();
    }
日志打印:
> Task :testlib:_05ThreadPool.main()
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:44 CST 2021
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:44 CST 2021
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:45 CST 2021
Thread:pool-1-thread-2 ,timer:Thu Jan 07 22:23:45 CST 2021
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:46 CST 2021
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:46 CST 2021
Thread:pool-1-thread-2 ,timer:Thu Jan 07 22:23:47 CST 2021
Thread:pool-1-thread-1 ,timer:Thu Jan 07 22:23:47 CST 2021
Thread:pool-1-thread-2 ,timer:Thu Jan 07 22:23:48 CST 2021
Thread:pool-1-thread-2 ,timer:Thu Jan 07 22:23:48 CST 2021
  • 上面的代码,创建了一个线程数量为2的定时任务线程池,通过scheduleAtFixedRate方法,指定每隔500毫秒执行一次任务,并且在5秒钟之后通过shutdown关闭定时任务。

3.线程池执行机制

  • 通过上面的例子可以方法,上面创建线程的静态方法最后都是创建ThreadPoolExecutor对象实例,
3.1.线程池结构
2.线程池结构.png
public class ThreadPoolExecutor extends AbstractExecutorService {
    //根据AtomicInteger 的低29位,计算线程池线程的数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3; //29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 线程中包含的状态
    //默认状态,接收新任务并处理排队任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    //不接收新任务,但处理排队任务,调用shotdown 会处于该状态
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //不接收新任务,也不处理排队任务,并中断正在执行的任务,调用showdownNow()
    private static final int STOP       =  1 << COUNT_BITS;
    //所有任务都已终止,workerCount为零时,线程会切换到TIDYING状态,并运行terminate方法
    private static final int TIDYING    =  2 << COUNT_BITS;
    //terminate方法完成后线程池切换为该状态
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    
    // 线程池重要方法
    //1.计算当前线程池运行状态
    private static int runStateOf(int c) {
        return c & -CAPACITY;
    }

    //2.计算当前线程数量
    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }

    //3.通过状态和线程数生成ctl
    private static int ctlOf(int rs, int wc) {
        return rs | wc;
    }
    
}
  • 线程池类中重要属性:
    • workers集合:保存所有的核心线程和非核心线程,本质是一个HashSet
    • workQueue队列:等待任务队列,当核心线程的个数达到corePoolSize时,新提交的任务会被先保存在等待队列中,本质上是一个阻塞队列BlockingQueue
    • ctl:AtomicInteger类型,二进制高3位用来标识线程池的状态,低29位用来记录线程池中线程的数量
  • 线程池构造函数参数说明
    • corePoolSize:表示核心线程数量
    • maximumPoolSize:线程池最大能够容纳同时执行的线程,必须大于或等于1。如果和maximumPoolSize相等即是固定大小线程池。
    • keepAliveTime:表示线程池中的线程空闲时间,当空闲时间达到此值时,线程会被销毁直到剩下corePoolSize核心线程数量个线程的时间单位,有MILLISECONDS,SECONDS,MINUTES等
    • workQueue:等待队列,BlockingQueue类型,当请求任务数大于corePoolSize时,任务会被缓存到此阻塞队列中
    • threadFactory:线程工厂,线程池中使用它来创建线程,如果传入的是null,则使用默认工厂类DefaultThreadFactory
    • handler:执行拒绝策略的对象,当workQueue满了之后,并且活动线程数大于maximumPoolSize的时候,线程池通过该策略处理新的请求
3.2.线程池执行流程
3.线程池执行流程.png
  • 当我们调用execute或者submit,将一个任务提交给线程池,线程池收到这个任务请求后,有以下几种处理情况:
  1. 当线程池中运行的线程数量还没有达到corePoolSize大小时,线程池会创建一个新线程执行提交的任务,无论之前创建的线程是否处于空闲状态
  2. 当线程池中运行的线程数量已经达到corePoolSize大小时,线程池会把任务加入到等待队列中,直到某一个线程空闲了,线程池会根据我们设置的等待队列规则,从队列中取出一个新的任务执行
  3. 如果线程数大于corePoolSize数量,但是还没有达到最大线程数manimumPoolSize,并且等待队列已满,则线程池会创建新的线程来执行任务
  4. 最后如果提交的任务,无法被核心线程直接执行,又无法加入等待队列,又无法创建”非核心线程“直接执行,线程池根据拒绝处理器定义的策略处理这个任务。
    • 如果没有在线程池中设置拒绝策略,线程池会抛出RejectedExecutionException 异常,即线程池拒绝接受这个任务。

相关文章

网友评论

    本文标题:并发编程(八)线程池详解

    本文链接:https://www.haomeiwen.com/subject/etapaktx.html