美文网首页
ThreadPoolExecutor线程池源码和典型问题

ThreadPoolExecutor线程池源码和典型问题

作者: 唯爱_0834 | 来源:发表于2020-06-19 15:11 被阅读0次

    Executor框架

    • 主要有三个部分组成
      1. 任务: 包括被执行任务需要实现的接口:Runnable接口或Callable接口
      2. 任务的执行:任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)
      3. 异步计算的结果: 包括接口Future和实现Future接口的FutureTask类
    • 简单的连接上述
      1. Executor是一个接口,是Executor框架的基础,将任务提交与任务执行分离开来
      2. ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务
      3. ScheduledThreadPoolExecutor是一个实现类,在给定的延迟后运行命令,或者定期执行命令(比Timer更灵活,功能更加强大)
      4. Future接口和实现它的FutureTask类,代表异步计算的结果
      5. Runnable接口和Callable接口实现类都可以被任务给执行

    线程队列

    • 主要包括: ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue
    ArrayBlockingQueue : 基于数组结构的有界阻塞队列(FIFO) 源码附录在文章最下
    • 一个有界缓存队列,可以指定缓存队列大小,当其存储满后,加入队列失败,会开启新线程执行,但是当线程达到最大线程数时,在加入就会抛出异常RejectExecutorExcation
    PriorityBlockingQueue : 基于数组结构的优先级的无限阻塞队列
    • 拥有优先级的同ArrayBlockIngQueue在offer实现上可2倍的自动扩容
    • 不存在超容量情况,所以不会像ArrayBlockingQueue一样抛出异常
    • 通过构造函数传入对象来判断优先级,所以传入对象必须实现comparable接口
    • 源码分析在文章最下,在算法中运用较多,比如查找链表中前K大值等
    LinkedBlockingQueue : 基于链表结构的阻塞队列(FIFO)
    • 一个无界缓存等待队列,当线程数量达到核心线程数时,剩余任务都会添加进来等待,即最大线程数无效
    SynchronousQueue : 不存储元素的阻塞队列(FIFO先进先出公平,或LIFO非公平)
    • 没有容量,是无缓存等待队列,一个不存储任务的队列,会直接将任务交给消费者,并且必须等队列中添加元素被消费后才能继续添加新元素
    • 拥有公平(FIFO)和非公平策略,非公平会导致一些数据永远不会被消费
    • 使用该队列一般要求非线程数设置为无界,避免线程拒绝执行操作

    Executor的运用

    1. Executors.callable(Runnable task):将Runnable对象封装为一个Callable对象
    2. ExecutorService.execute(Runnable task):执行一个没有返回值得任务
    3. ExecutorService.submit(Runnable或Callable task ):返回一个实现Future接口的对象FutureTask,
      1. 主线程可以执行FutureTask.get()方法等待任务执行完成获取接口
      2. 也可以调用FutureTask.cancel()来取消此任务的执行
    ThreadPoolExecutor源码分析
    1. 首先创建线程池最终调取方法是:
          public ThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue) {
          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
               Executors.defaultThreadFactory(), defaultHandler);
      }
      //调取这个方法
      public ThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue,
                                ThreadFactory threadFactory,
                                RejectedExecutionHandler handler) 
      
      • corePoolSize : 核心线程数,创建线程池后默认线程为0,当有任务来时才会创建对应的一个线程,直到达到corePoolSize大小,就会将在到达的任务放到缓存队列中,除非调用了预创建线程才会在没有任务到达之前就创建对应的线程数;
      • maximumPoolSize: 最大线程数,线程池最多能创建的线程数,当队列满了以后再进来的任务会在创建一个新的线程
      • keepAliveTime : 线程没有任务执行时最多保持多久时间会终止.默认只有线程数大于corePoolSize时才起作用,针对的是最大线程数时创建的线程.有方法让线程数在核心线程之内也起作用,直到线程数为0
      • unit : 保持时间的参数单位
      • workQueue: 一个阻塞队列,用来存储等待执行的任务,当任务满了就会调用maximumPoolSize创建新线程
      • threadFactory : 用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程命名或者设置优先级等操作:通过implements ThreadFactory可以自定义线程工厂
      • handler : 便是当拒绝处理任务时的策略(当阻塞队列满了,且达到最大线程数后再来任务会调取),通过implements RejectedExecutionHandler可自定义自己的拒绝策略
        1. AbortPolicy : 默认方式,直接抛出异常
        2. CallerRunsPolicy : 只用调用者所在线程来运行任务,异步任务变成了同步执行了,比如主线程调用的execute方法则拒绝策略会将线程池拒绝的任务交给主线程执行了
        3. DiscardOldestPolicy: 丢弃队列中对头的那个任务(最早添加进来的)并执行当前任务
        4. DiscardPolicy: 不处理,丢弃掉
    ThreadPoolExecutor : FixedThreadPool, SingleThreadExecutor, CachedThreadPool
    FixedTHreadPool(默认阻塞队列无界==> OOM)
    1. 创建一个固定线程数量nThreads的线程,核心线程跟最大线程数量一致都为nThreads
      public static ExecutorService newFixedThreadPool(int nThreads) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>());
      }
      
    2. 使用无界队列LinkedBlockingQueue: 使得最大线程数,跟超时时间都是无意义的,根据线程池运行步骤,队列不可能被填满,所以未执行shutdown()或shutdownNow()方法的运行中的FixedThreadPool不会拒绝任务
    3. 使用与为了满足资源管理的需求而限制当前线程数量,用于负载比较重的服务器
    • 使用场景: 适用于处理CPU密集型任务,确保CPU在长期被工作线程使用下,尽可能少的分配线程,一半是N(CPU) + 1
    SingleThreadExecutor(默认阻塞队列无界==> OOM)
    • 创建使用单个线程的SingleThreadExecutor
      public static ExecutorService newSingleThreadExecutor() {
          return new FinalizableDelegatedExecutorService
              (new ThreadPoolExecutor(1, 1,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>()));
      }
      
    • 同上FixedThreadPool使用无限队列保存任务,因此最大及核心线程数都是1
    • 使用与需要保证顺序的执行各个人物,并且在任意时间点,不会有多个线程时活动的
    • 使用场景: 适用于串行执行任务的场景,每个任务必须按顺序执行,不需要并发执行
    CachedThreadPool(默认核心线程0,最大线程i.MAX,synchronousQueue==>OOM)
    1. 创建一个大小无界的线程池,适用于执行很多的短期异步人物的小程序,或者负载比较轻量的服务器
      public static ExecutorService newCachedThreadPool() {
          return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                        60L, TimeUnit.SECONDS,
                                        new SynchronousQueue<Runnable>());
      }
      
    2. 分析: corePoolSize 设置为0 ,核心线程数为空,而最大线程数设置为Inter.MAX_VALUE,即非核心线程为无限的,同时空闲线程等待新任务最长为60s后被终止
    3. 同时: 使用没有容量的SynchronousQueue作为线程池的工作队列,这意味着如果主线程提交任务速度高于线程池中线程处理任务的速度,CachedThreadPool将会不断创建新线程,最终耗尽CPU和内存资源
    4. 执行步骤:
      1. 执行execute方法,首先执行SynchronousQueue的offer方法提交任务,并查询线程池中是否有空闲线程来执行其中的poll方法来移除任务,如果有,则配对成功,将任务交给这个空闲队列
      2. 否则,配对失败,将会创建一个新线程去处理任务
      3. 当线程池中线程空闲时,会执行synchronousQueue的poll方法等到其提交新任务,如果超过60s依然没有提交,则这个线程就会终止
      4. 由于非核心线程无界,所以一旦提交任务速度 > 线程池处理速度就会不断的创建新线程
      5. 因此:使用与每次提交任务都会有线程立刻进行处理的,大量,耗时少的任务,长时间保持空闲的CachedThreadPool将不会使用任何资源
      6. 其实就是主线程调用 offer方法跟没有容量的阻塞队列的poll方法是否适配,如果适配就使用该空闲线程,如果不适配就从新创建线程执行任务
    • 使用场景: 用于并发执行大量短期的小任务
    ThreadPoolExecutor
    • 重点:重中之重: 线程池工作的过程:
          public void execute(Runnable command) {
          if (command == null)
              throw new NullPointerException();
          /*
           * Proceed in 3 steps:
           *
           * 1. If fewer than corePoolSize threads are running, try to
           * start a new thread with the given command as its first
           * task.  The call to addWorker atomically checks runState and
           * workerCount, and so prevents false alarms that would add
           * threads when it shouldn't, by returning false.
           *
           * 2. If a task can be successfully queued, then we still need
           * to double-check whether we should have added a thread
           * (because existing ones died since last checking) or that
           * the pool shut down since entry into this method. So we
           * recheck state and if necessary roll back the enqueuing if
           * stopped, or start a new thread if there are none.
           *
           * 3. If we cannot queue task, then we try to add a new
           * thread.  If it fails, we know we are shut down or saturated
           * and so reject the task.
           */
          int c = ctl.get();
          //如果当前线程数小于核心线程数,则创建线程并执行当前任务
          if (workerCountOf(c) < corePoolSize) {
              if (addWorker(command, true))
                  return;
              c = ctl.get();
          }
          //如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列
          if (isRunning(c) && workQueue.offer(command)) {
              int recheck = ctl.get();
              if (! isRunning(recheck) && remove(command))
                  reject(command);
              else if (workerCountOf(recheck) == 0)
                  addWorker(null, false);
          }
          //如果线程池内不处于运行或者任务无法放入队列,并且当前线程数小于最大允许的线程数量,则创建一个线程执行任务,非核心线程
          else if (!addWorker(command, false))
              reject(command);  //抛出RejectedExecutionException异常,对于有界队列而言才会有这个
      }
      
      1. 如果正在运行的线程数量小于corePoolSize:核心线程数,则马上创建线程运行这个任务
      2. 如果正在运行的线程数量大于或等于核心线程数,马上将这个任务放入队列,注意每种线程队列实现的offer方法不同哦!
      3. 如果队列满了,而且正在运行的线程数量小于最大线程数量,则会创建非核心线程立刻运行这个任务
      4. 如果队列满了,正在运行的线程数量大于或等于允许的最大线程数量,那么线程池就会抛出RejectExecutorException
    • 切记:
      1. 新建完线程则当这个线程完成任务时,他会从队列中取下一个任务来执行;
      2. 当一个线程无事可做,即为队列为null,超过一定时间keepAliveTime自定义超时回收时间,当设置为0配合的是无界队列使用,线程池会判断,如果当前运行的线程数大于核心线程数,那么这个线程就会被停掉回收,所以所有任务完成后,线程池会收缩到corePoolSize的大小
    • 通过以上分析可以看到他们源码中都调用了ThreadPoolExecutor来创建一个线程池
       public ThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue) {
          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
               Executors.defaultThreadFactory(), defaultHandler);
      }
      //实际调用对象
      public ThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue,
                                ThreadFactory threadFactory,
                                RejectedExecutionHandler handler)
      
    • 分析源码中的参数
      1. corePoolSize: 核心线程池大小
      2. maximumPoolSize: 最大线程池的大小
      3. BlockingQueue: 用来暂时保存任务的工作队列
      4. RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或ThreadPoolExecutor已经饱和 时(达到了最大线程池大小且工作队列已满),execute()方法将要调用的Handler

    ScheduledThreadPoolExecutor

    • 继承自ThreadPoolExecutor:主要用来在给定的延迟之后运行任务或者定期执行任务(功能与Timer类似)
    • 线程池的特点:
      public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
      return new ScheduledThreadPoolExecutor(corePoolSize);
      }
      
      public ScheduledThreadPoolExecutor(int corePoolSize) {
          super(corePoolSize, Integer.MAX_VALUE,
                DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                new DelayedWorkQueue());
      }
      
      private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
      
      • 最大线程数为Integer.MAX_VALUE
      • 阻塞队列为DelayedWorkQueue
        ScheduledThreadPoolExecutor添加任务的另外两个方法:
      1. scheduleAtFixedRate: 按某种速率周期执行
      2. scheduleWithFixedDelay: 在某个延迟后执行
      • 两种方法的内部实现都是创建了一个ScheduledFutureTask对象封装了任务的延迟执行时间及执行周期,并调用decorateTask()方法转成RunnableScheduledFuture对象,然后添加到延迟队列中。
    1. DelayQueue: 封装了一个优先级队列,会对队列中的ScheduledFutureTask进行排序,两个任务执行time不同时,time小的先执行,否则比较添加到队列中的ScheduledFutureTask的顺序号sequenceNumber,先提交的先执行
    2. 工作机制为:
      1. 调用上面两个方法添加一个任务
      2. 线程池中的线程从DelayQueue中取任务
      3. 然后执行任务
    3. 执行步骤为:
      1. 线程从DelayQueue中获取time大于等于当前时间的ScheduledFutureTask , DelayQueue.take()
      2. 执行完后修改这个task的time为下次被执行时间
      3. 然后在把这个task放回队列中DelayQueue.add()
    4. 使用场景: 用于需要多个后台线程执行周期任务,同时需要限制线程数量的场景

    作业

    1. 自定义创建一个能够根据加入顺序得到最后一个数据的线程池,前面的没有完成就抛弃
    2. 自定义设置一个先达到最大队列工作,后缓存队列的线程池
    • 1的解题思路
      1. 由于是顺序加入,如果前面有等待任务,则新加入的任务将会替换掉之前的任务,我们只要最后一个任务,则使用线程数为1,且等待队列也为1,同时拒绝策略为DiscardOldestPolicy()即可
      2. 当第一次添加任务执行,第二次添加加入队列,如果此时任务3来到将会替换2,直到下一次任务到来,则最终执行的是最后一次添加的任务
    • 2的解题思路:
      1. 默认当工作队列满了无法入队才会扩容线程池,我们可以重写队列的offer方法,造成队列已满假象
      2. 在扩容达到最大线程以后会触发拒绝策略,此时我们可以将任务真正的插入到缓存队列中
      • 解答:使用LinkedTransferQueue.tryTransfer() :如果存在一个消费者已经等待接收它,则立即传送指定的元素,否则返回false,并且不进入队列。
      /**
       * 设置一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
       * LinkedTransferQueue采用一种预占模式。
       * 意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,
       * 然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,
       * 并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。我们称这种节点操作为“匹配”方式。
       */
      public class MyLinkedTransferQueue extends LinkedTransferQueue<Runnable> {
      
          @Override
          public boolean offer(Runnable e) {
              return tryTransfer(e); // 如果存在一个消费者已经等待接收它,则立即传送指定的元素,否则返回false,并且不进入队列。
          }
      }
      
      /**
       * 使用静态内部类创建一个单例的线程池
       */
      public class ExecutorFactory {
      
          private static class Holder{
              private static ThreadPoolExecutor executors = new ThreadPoolExecutor(1, 4 , 60 , TimeUnit.SECONDS ,
                      new MyLinkedTransferQueue() ,
                      new ThreadNameFactory(),
                      new CustomizeRejectHandler());
          }
      
      
          public static ThreadPoolExecutor getInstance(){
              return Holder.executors ;
          }
      }
      
      
      
      /**
       * 自定义拒绝策略,用于当前队列任务满以后的操作
       */
      public class CustomizeRejectHandler implements RejectedExecutionHandler {
      
          @Override
          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //重写一个DisCardOldestPolicy
      //        if (!executor.isShutdown()){ //当线程池没有终止时,弹出队列中最早添加一个任务并执行当前任务
      //            executor.getQueue().poll();
      //            executor.execute(r);
      //        }
      
              try {
                  executor.getQueue().put(r); //当拒绝是添加到阻塞队列中
              } catch (InterruptedException e) {
                  Thread.currentThread().interrupt();
              }
          }
      }
      
      
    1. 单机上一个线程正在处理任务,如果突然断电了怎么办(正在处理和阻塞队列里的请求怎么处理)
      答: 可以对正在处理和阻塞队列的任务做事务管理或对阻塞队列中的任务持久化处理,并且当断电或系统崩溃,操作无法进行时,可以通过回溯日志的方式来撤销正在处理的已经执行成功的操作,然后重新执行整个阻塞队列
    2. 为什么不建议在代码中直接使用Executors创建线程池,而是推荐通过ThreadPoolExecutor方式创建?
    3. 答:不适用是可以明确的让我们知道线程池的运行规则,避免使用工具类的包装而不够直观内部机制导致潜在难以发现的问题,比如,使用newSingleThreadPool和FixedThreadPool创建线程池由于默认最大线程为Max而导致如果处理时间过长,任务过多而导致的OOM就很难发现问题

    多线程中的问题

    CountDownLatch与CyclicBarrier区别
    • 这两个类都可以实现一组线程在到达某个条件之前进行等待,内部都有一个计数器,当计数器的值不断减为0时所有阻塞的线程都将会被唤醒
    • CountDownlatch计数器由使用者控制,线程调用await()只是将自己阻塞而不会减少计数器的值,只有当调用countDown()方法才会减一,直到为0时,唤醒await(),并且只能拦截一轮
    • CyclicBarrier : cyclic 循环 , Barrier:栅栏 ,顾名思义表示可以实现循环拦截,其中的计数器由自己控制,在CyclicBarrier中线程调用await()不仅会将自己阻塞还会降计数器减1.直到为0时唤醒所有的阻塞队列,而后会重置新一轮的拦截
    线程池的选择使用
    • 高并发,任务执行时间短的业务怎样使用线程池?并发不高,任务执行长的业务怎么使用线程池?并发高,业务执行时间长的业务怎么使用线程池?
      1. 高并发,任务执行时间短,线程池线程数量可以设置成CPU+1,目的是减少线程上下文切换
      2. 并发不高,任务执行时间长的业务区别看待:
        1. 假设业务长时间几种在IO操作,就是IO密集型任务,由于IO操作并不占用CPU资源,这个时候就尽可能让CPU运转,可以加大线程池中的线程数量,让CPU不至于停下来等待IO操作,从而处理更多的业务
        2. 假设任务长时间集中在计算操作上,就是计算密集型任务,同一一致,线程池的线程数量设置少一些,减少线程上下文的切换
      3. 并发高,业务执行时间长,解决这种类型任务的关键不在于线程池而在于整体架构的设计,看看这些业务里面某些数据是否能做缓存是第一步,增加服务器是第二步,至于线程池的设置,设置参考其他有关线程池的文章。最后,业务执行时间长的问题,也可能需要分析一下,看看能不能使用中间件对任务进行拆分和解耦。
    jstack追踪异常代码
    • Windows 下使用 jps,jstack ,及procexp.exe工具
    1. JPS常用命令整理
      • JPS是1.5提供的一个显示当前所有Java进程pid命令
      1. jps : 列出pid和Java主类名
      2. jps -l : 列出pid和Java主类全名
      3. jps -lm : 列出pid,主类全程和应用程序参数
      4. jps -v : 列出pid和JVM参数
    2. Jstack常用命令整理
      • jstack是JVM自带的一种堆栈追踪工具
      • jstack pid(通过jps获得的pid) 打印线程堆栈
    线程中断机制
    • java的线程中断机制是一种协作机制,线程会不时的检测中断标识位,以判断是否应该被中断(值是否为true), 主要有三个方法
      1. interrupt() : 每个线程都有个boolean类型的中断标志,当使用该方法时会被标记为true
      2. isInterrupted() : 判断线程是否被中断
      3. interrupted() : 清除中断标志,并返回原状态
    • 当使用中断时,被中断的线程并不会立刻停止做事,而是在合适的时机终止
      • 机制一: 如果该线程处在可中断状态下sleep,join,wait等,则该线程会被立刻唤醒,同时收到一个interruptedException,,如果是IO则资源会被关闭
      • 机制二:如果该线程处于不可中断状态下,即没有调用上方机制一的api,处于运行时的进程,则只是设置一下中断标志位,其他事情都不会发生,如果此后线程调用阻塞api,则会马上跳出,并抛出InterruptedExecption异常,接下来事情就跟一一致了,如果不调用阻塞api,则线程会一致运行下去.则在运行的代码中可以轮询中断标志位,看它是否被请求停止正在做的事情,可以通过isInterrupted()来读取,同时也可以通过一个interrupted()来读取和清楚标记位
    public class InterruptedTest extends Thread{
    
        public static void main(String[] args) {
            InterruptedTest interruptedTest = new InterruptedTest();
            interruptedTest.start();
        }
    
    
        @Override
        public void run() {
            super.run();
            MyThread myThread = new MyThread();
    
            myThread.start();
    
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //开始中断
            myThread.interrupt();
    
    
        }
    
    
        class MyThread extends Thread{
    
            @Override
            public void run() {
                super.run();
                //判断是否标记为中断:注意被异常捕获以后中断标记位会被重置为false,需要再次抛出方可中断
                while (!Thread.currentThread().isInterrupted()){
    
                    System.out.println("我是正在运行...");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) { //sleep是会有中断标记的因此会抛出interruptedException: 中断标志位会被清除,如果想让上方while退出,必须再次手动设置标志位
                        e.printStackTrace();
                        System.out.println("我收到了异常中断标记" + Thread.currentThread().isInterrupted()); // false:已经被异常重置了
                        interrupt(); //再次抛出
                        System.out.println("我收到了再次抛出中断标记" + Thread.currentThread().isInterrupted()); //true,可以正常中断
                    }
                }
            }
        }
    }
    
    
    线程池线程复用原理
    • 我们自己创建的线程都知道,他只能start()执行一次,一旦执行完毕或被中断即走terminated终止状态结束线程了,那为何身处线程池中的线程却可以复用一致执行呢?你难道没有这样的疑问吗?
    • 通过上面的分析,我们知道线程池的运行流程图!


      image
    • 一旦一个任务提交我们就会判断其后流程:
      1. 直接申请线程执行任务
      2. 加入到缓存队列中等待线程执行
      3. 执行拒绝策略
    • 带着上方的疑问,我们根据源码详细分析线程池运行机制: 如何维护自身状态,如何管理任务,如何管理线程!
    ThreadPoolExecutor
    • 首先看他的继承关系图


      image
    1. 顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。
    2. ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行
    3. AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可
    4. ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务
    线程池的生命周期管理
    • 线程池运行的状态,是伴随着线程池的运行在内部维护的,由一个变量维护两个状态值(很常见): 运行状态(runState)和线程数量(workCount)
    //使用高3位保存线程池的运行状态runState(总共5种3位足够了),低29位保存workerCount有效线程数量:使用同一位的原子类不用对两个变量操作时需要加锁操作了,直接使用一个原子类即可
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        //五种线程池状态
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
        private static int runStateOf(int c)     { return c & ~CAPACITY; } //计算当前运行状态
        private static int workerCountOf(int c)  { return c & CAPACITY; } //计算当前线程数量
        private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成一个ctl合成数量是一个原子类
    
    • 注意:线程池运行状态同线程运行状态不同的哦
      • Running: 能接受新提交的任务,并且能够处理阻塞队列中的任务
      • Shutdown : 关闭状态,不能接受新提交的任务,但却可以继续处理阻塞队列中的任务
      • stop : 不能接受新任务,也不处理缓存队列中的任务,会中断正在处理任务的线程
      • Tidying: 所有任务都终止了,workerCount有效线程数为0
      • Terminated: 在terminated()方法执行完后进入该状态,终止状态


        image
    任务调度
    • 当用户提交一个任务会通过Executor.execute()方法执行,他的步骤上方已经总结过了,这里在重复说一下:
      1. 首先检查线程池运行状态,如果不是Running,直接拒绝,线程池要保证在Running状态下执行任务
      2. 如果workerCount(下面使用wc) < corePoolSize,则创建新线程执行提交任务
      3. 如果wc >= corePoolSize, 核心线程池满了,阻塞队列未满,则添加到队列中
      4. 如果wc >= corePoolSize && wc < maximumPoolSize ,且缓存队列满了,则直接创建新线程执行提交任务
      5. 如果wc > maximumPoolSize,并且线程队列满了,则根据拒绝策略具体执行该任务,默认是抛异常
      private final BlockingQueue<Runnable> workQueue; //线程阻塞队列
      public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) { //计算当前运行线程数小于核心线程数,对应上方2的情况
                if (addWorker(command, true)) //创建新的线程执行任务
                    return;
                c = ctl.get();
            }
            //执行这里说明过了2那种情况,就看3的情况,阻塞队列可以正常添加任务
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get(); //添加成功了
                if (! isRunning(recheck) && remove(command)) //如果添加以后不再是运行状态,则移除刚才加入队列的任务
                    reject(command);  //执行拒绝策略
                else if (workerCountOf(recheck) == 0) //线程池运行状态且wc运行线程数为0时
                    addWorker(null, false); //创建新线程执行阻塞队列中的任务
            }
            else if (!addWorker(command, false)) //如果队列满了,可以正常走addWorker创建非核心线程即上方4, 如果false则是5
                reject(command); //走拒绝策略
        }
    
    • 增加新线程执行任务addWorker()
     private final HashSet<Worker> workers = new HashSet<>(); //线程池中的所有工作线程
    private boolean addWorker(Runnable firstTask, boolean core) {
    //根据当前状态,判断是否添加成功,上方执行方法中的addWorker两个参数firstTask = null ,core = true /false 具体分析
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c); //获取运行状态
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && //状态 > shutDown 表示此时已经不再接受任务
                //shutdown状态不接受新任务,但可以执行已经加入队列中的任务,所以当进入shutdown状态,且传进来的任务为null时,并且任务队列不为null时,是允许添加新线程的,把这个条件取反就是不允许
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty())) 
                    return false;
    
                for (;;) { //使用CAS操作避免加锁
                    int wc = workerCountOf(c); //获取工作线程
                    if (wc >= CAPACITY || 
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false; //大于线程最大容量2的29次方量(所以newCacheExecutor并不能得到Integer.MAX_Value的),或者大于最大允许线程量则不能添加啦
                    if (compareAndIncrementWorkerCount(c)) //可添加就CAS操作线程数+1,成功说明可添加
                        break retry; //break跳出retry对应的循环,执行循环后面的添加worker逻辑
                    c = ctl.get();  // Re-read ctl 重新读取状态
                    if (runStateOf(c) != rs) 
                        continue retry; //状态改变了,跳到外层循环继续重新执行循环
                    // else CAS failed due to workerCount change; retry inner loop
                    //在内存层循环中不停的尝试CAS操作增加线程数
                }
            }
            //找了上方break retry可以正常使用CAS新增线程数
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask); //通过Worker包装runnable任务,稍后我们分析
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock(); //加锁
                    try {
                    
                        int rs = runStateOf(ctl.get());
                        //如果线程池状态rs < Shutdown即只能是Running
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) { //或者shutDown状态但是没有新任务
                            if (t.isAlive()) // 线程已经启动,并且当前没有任何异常的话,则是true,否则为false
                                throw new IllegalThreadStateException(); //我还没有启动呢
                            workers.add(w); //正常添加到线程池中workers工作线程
                            int s = workers.size();
                            if (s > largestPoolSize) //largestPoolSize:记录着线程池中出现过最大线程数量
                                largestPoolSize = s;
                            workerAdded = true; //可以正常工作的标记
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) { //如果正常工作,则开启线程任务
                        t.start();
                        workerStarted = true; //开始工作标记
                    }
                }
            } finally {
                if (! workerStarted) //该任务没有开始,则添加到失败
                    addWorkerFailed(w); 
            }
            return workerStarted;
        }
    
    • 通过上面分析,新增线程去完成任务主要通过worker类去完成的
    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable //实现了Runnable接口,因此t.start()执行的就是worker的run方法啊
             {
         
            final Thread thread;
           
            Runnable firstTask;
            
            volatile long completedTasks;
    
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);  //创建thread(this:Worker) ,则t.start()调用worker的run,同时原来的Runnable被封装为Worker的属性firstTask
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                runWorker(this);
            }
            
        //getThreadFactory即为ThreadPoolExecutor创建thread工厂(实现ThreadFactory)可修改Thread名称,优先级等操作实现的
        public ThreadFactory getThreadFactory() {
            return threadFactory;
        }
    
    • 所以上方addWorker添加工作线程的t.start()方法调用的就是runWorker
    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask; //这个就是我们执行线程池executor.execute()方法时候的runnable
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
            //如果task不为null,并且从workQueue中获取任务不为null,则会一直执行下去
                while (task != null || (task = getTask()) != null) { //task是需要执行的任务,不一定是刚刚添加的那个了,这样其实worker线程并没有完成工作,自然也就不会销毁了
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) || //检查线程状态,若线程池处于中断状态,调用interrupt将线程中断
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt(); //中断线程
                    try {
                        beforeExecute(wt, task); //可以在任务真正执行之前做点啥,空实现
                        Throwable thrown = null;
                        try {
                            task.run(); //执行execute()方法中的run方法,在t.start()线程内,这只是一个方法执行哈!
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown); //线程之后可以做啥,空实现
                        }
                    } finally {
                        task = null;
                        w.completedTasks++; //该线程执行完成任务+1
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    • 重点逻辑是while循环,当我们第一次创建worker并执行任务后,并没有结束线程,而是通过while循环调用getTask()方法从阻塞队列中去task继续调用task.run()执行任务,注意这里run()只是一个普通的方法调用,并不是start()哦!运行线程就是Worker线程中
    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 对应ShutDown虽然不添加任务,但是可以执行阻塞队列中的,Stop以后就不能子在执行任务了
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null; //返回null,停止执行任务
                }
    
                int wc = workerCountOf(c);
    
                // allowCoreThreadTimeOut 表示是否允许核心线程超时销毁,默认false不销毁.若设置成true,核心线程也会销毁的
                //只有正在工作的线程数大于核心线程数才会为true,佛足额返回false
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //
            
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                //如果timed为true(wx > 核心线程),通过poll取任务,如果为false,通过take取任务
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //这两个参数就是创建线程池中保存时间量
                        workQueue.take();
                    if (r != null) //如果有任务就退出死循环,返回任务交给上方的worker线程运行
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    
    • 通过以上分析:不改变allowCoreThreadTimeOut默认前提下,若wc > 核心线程数,则通过poll从队列中取任务,如果wc <= 核心线程数,则通过take取任务
    • 则poll()方法同take()区别是啥呢?
      • 他们都是阻塞队列中的方法:常用的 ArrayBlockingQueue,LinkedBlockingQueue,PriorityQueue,SynchronizedQueue,中的方法
    //ArrayBlockingQueue
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0) { //是否队列中的元素个数为0,说明空队列
                    if (nanos <= 0L) //等待时间到了,队列中还未有数据加入,则返回null,
                        return null;
                    /**
                    * 调用该方法的前提是,当前线程已经成功获得与该条件对象绑定的重入锁,否* * 则调用该方法时会抛出IllegalMonitorStateException。
                    * nanosTimeout指定该方法等待信号的的最大时间(单位为纳秒)。若指定时间* * 内收到signal()或signalALL()则返回nanosTimeout减去已经等待的时间;
                    *若指定时间内有其它线程中断该线程,则抛出InterruptedException并清除当前线程的打断状态;
                    * 若指定时间内未收到通知,则返回0或负数。 
                    */
                    nanos = notEmpty.awaitNanos(nanos);  //每次signal唤醒重新等待
                }
                return dequeue(); //如果有元素取出
            } finally {
                lock.unlock();
            }
        }
    //如果poll超时返回null,则回调到
    f ((wc > maximumPoolSize || (timed && timedOut)) //true
                    && (wc > 1 || workQueue.isEmpty())) { //队列也是空的,走进去
                    if (compareAndDecrementWorkerCount(c)) //CAS可以减少c的个数
                        return null; //返回了null,该线程不能再上方的while循环中继续获取就结束线程啦,非核心线程就over啦,嘿嘿!
                    continue;
                }
    
    
    public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)  //不能使用if,避免虚假唤醒
                    notEmpty.await();  //一旦count队列为空,会一致await阻塞在这里的,直到workQueue.offer()添加元素时唤醒
                return dequeue(); //取出队头元素
            } finally {
                lock.unlock();
            }
        }
    
    • 综上: 当核心线程在while循环中运行调用getTask获取task任务时,如果此时队列中没有数据则走缓存阻塞队列的take方法,会被notEmpty.await()阻塞,那这个阻塞又是何时被唤醒的呢?
    • 当然是下一个任务达到的时候也就是调用execute的时候添加一个新的任务Task
    //这个就是调用当前核心线程已经满了,则添加到阻塞队列中,
    //刚刚上方的核心线程在等待任务,添加以后肯定就调用notEmpty.signal()唤醒等待线程取任务执行啦
    if (isRunning(c) && workQueue.offer(command)) 
    
    • 我们来验证一下我们的想法:workQueue就是选择的队列,这里看ArrayBlockingQueue,当然对于其他队列也是相同的
     public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock; //获取锁,跟上方加锁时同一把锁
            lock.lock();
            try {
                if (count == items.length)
                    return false; //如果当前队列已满,不能再加入了false
                else {
                    enqueue(e); //正常添加到队列中
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
       
    //enqueue添加到数组循环队列中后调用notEmpty.signal()唤醒一个await线程取任务开始工作啦!
    private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            notEmpty.signal();
        }
    
    • 通过生成-消费者模式,将execute加入队列的任务通知等待的核心线程取阻塞队列中的任务开始执行!

    补充一下阻塞队列的源码分析

    ArrayDeque: 底层使用循环数组实现双向队列

    • 局部变量
      transient Object[] elements; //当前存储数组
      transient int head; //其始头结点
      transient int tail; //尾结点 
      private static final int MIN_INITIAL_CAPACITY = 8; //默认数组大小为 8
      
      • 注意由于是循环数组,所以头结点不一定比尾结点小哦,也可能比尾结点大有size - 1 -> 0 的位置
      • 由于需要判断数组中数据是否为空,还是已经填满,所以会有一个节点浪费掉,这是因为:
      1. 如果head = tail 无法判断他们是空的还是满的数组
      2. 如果空出一位tail表示当前需要填充数据的位置,则当head =tail时表示空,head = (tail + 1) % size 相等,表示当前数组满了,需要扩容
      • 扩容方法也参考HashMap的扩容以 2的N次方,则上面的 %可以表示为 head = (tail + 1) & (size - 1)性能更好
    • 因此: ArrayDeque默认数组大小为 8 ,然后以 2倍扩容,不满会向上补,比如 创建一个14的数组实际创建的是 大小 16的循环数组
    add方法
    • 源码为:不能存储null
      public void addLast(E e) {
          if (e == null)
              throw new NullPointerException();
          elements[tail] = e; //注意这里是先添加在判断是否扩容,通过上面局部变量的分析可知肯定有一个空余的位置,在这里正好填满数组以后再判断是否需要扩容,判断方法也同上2的情形一致
          if ( (tail = (tail + 1) & (elements.length - 1)) == head)
              doubleCapacity(); //两倍的扩容数组
      }
      
      //我们查看扩容代码
      private void doubleCapacity() {
          assert head == tail; //断言tail增加以后是否跟head一致,数组完全填满
          int p = head; 
          int n = elements.length;
          int r = n - p; // 分两部分来复制,n分成 从0->p - 1 复制p个数据 和从p->n 复制(n - p)个数据
          int newCapacity = n << 1; //扩容两倍
          if (newCapacity < 0)
              throw new IllegalStateException("Sorry, deque too big");
          Object[] a = new Object[newCapacity];
          //将原来数组的起始位置至数组末尾全部复制到新数组0 -> n - p - 1处,此时不用管尾结点的位置,全部复制,多余的复制会存储成默认值的,不用在意(这里不会出现的,因为上方的断言,数组数据已经填满了)
          //分开copy是为了避免头尾节点过度的问题,不管你们在哪儿,我都是从头复制到最尾部,在从数组0位置copy到头部节点处即可全部复制
          System.arraycopy(elements, p, a, 0, r); 
          //在赋值从原来数组的0->p -1的位置上的数据到 新数组的 n-p -> (n - 1)处,即把原来旧数组全部复制到新数组从0其实到n -1结尾处
          System.arraycopy(elements, 0, a, r, p);
          elements = a; //拷贝后重新赋值给数组
          head = 0; //重新设置起始头结点为 0 ,尾结点为n处的null
          tail = n;
      }
      
    remove 删除
    • remove方法最终调取pollFirst() 删除头部数据 或 pollLast() 删除尾部数据
      
       public E pollFirst() {
          int h = head;
          @SuppressWarnings("unchecked")
          E result = (E) elements[h];
          
          if (result == null)
              return null;
          // r如果当前头部不为则赋值为null,并将头head增加 1,且判断是否越界超过整个数组大小了
          elements[h] = null;     // Must null out slot
          head = (h + 1) & (elements.length - 1);
          return result;
      }
      
      public E pollLast() {
      //尾结点删除就是 -1 是否小于了0,如果tail = 0 ,则t = -1 & 15 (-1的补码为 32位的1 & 15 = 15设置成最后一位数组的值删除并且尾结点为null的指向最后一位)
          int t = (tail - 1) & (elements.length - 1);
          @SuppressWarnings("unchecked")
          E result = (E) elements[t];
          if (result == null)
              return null;
          elements[t] = null; //删除循环数组中最后一位
          tail = t;
          return result;
      }
      
    修改和查找
    • 就是普通的数组查找,无非是垮了个(size -1 ) -> 0而已

    PriorityQueue: 使用堆得优先级队列

    • PriorityQueue一个基于优先级的无界队列,优先级队列的元素按照其自然顺序进行排序或者根据自定义提供的Comparator进行排序
      • 比如对于VIP和普通用户的请求进行优先级排序处理请求等;
    • 不允许使用null及不可比较的对象(没有实现Comparable接口的对象)
    • 非线程安全的,但是可以使用PriorityBlockingQueue用于多线程环境
    • PriorityQueue队列头是排序规则中最小的那个元素,如果多个元素都是最小值则随机挑选一个
    • 常用方法及时间复杂度(后文分析)
      • peek() 返回队首元素 O(1)
      • element(); //返回队头元素(不删除) O(1)
      • poll() 返回队首元素,队首元素出队列 O(log(N))
      • add() 添加元素 O(log(N))
      • offer(E e)将指定的元素插入此优先级队列。不能添加null元素。 O(log(N))
      • isEmpty() 判断是否为空
    构造函数
    • 查看构造函数
      //默认数组大小 11
       private static final int DEFAULT_INITIAL_CAPACITY = 11;
      
      //存储数组
      transient Object[] queue; // non-private to simplify nested class access
      
      //数组的长度
      private int size = 0;
      
      //队列比较器,为null使用默认比较器
      private final Comparator<? super E> comparator;
      
      //Fail-Fast标记,用于多线程      
      transient int modCount = 0; // non-private to simplify nested class access
      
      
    实现原理
    • 通过二叉小顶堆实现,可以用一颗完全二叉树表示(任意一个非叶子节点的权值,都不大于去左右子节点的权值),也就意味着可以通过数组作为其底层实现(数组实现简单,同时不会占用无用内存,数组中间不会有空余位置)
      [图片上传失败...(image-8ca912-1592548755621)]
    • 观察上方我们可以得到父子节点之间的关系
      1. 左子点下标: 当前父节点下标 * 2 + 1 ;
      2. 右子点下标: 当前父节点下标 * 2 + 2 ;
      3. 父节点下标: (当前子节点 - 1) / 2 ; //不论是左子节点还是右子节点
    • 通过上方公式,我们可以很方便计算出某个节点的父节点以及子节点的下标,这就是为何使用数组来存储堆的原因
    添加 add/offer
    • 两者区别主要是插入失败处理不同,add插入失败抛异常,而offer插入失败返回false,对于PriorityQueue两者没啥差别的
      [图片上传失败...(image-6e9980-1592548755621)]
    • 当新加入元素破坏了最小顶堆,就需要跟它父节点/祖父节点..设置根节点做调整,只是交换子节点跟父节点即可
      public boolean offer(E e) {
          if (e == null)
              throw new NullPointerException();
          modCount++;
          int i = size;
          if (i >= queue.length) //如果当前数组已经满了,就扩容其大小
              grow(i + 1); //扩容函数如果数组长度小于64就两倍扩容,否则增长1.5倍
          size = i + 1;
          if (i == 0) //如果当前数组为空,则在第一位添加数据
              queue[0] = e;
          else
              siftUp(i, e); //开始在叶子结点添加并调节数据,插入位置是数组的最后一位,也就是最下方叶子结点的最右边的那个位置
          return true;
      }
      
      
      //grow扩容函数
      private void grow(int minCapacity) {
          int oldCapacity = queue.length;
          // Double size if small; else grow by 50%
          int newCapacity = oldCapacity + ((oldCapacity < 64) ? 
                                           (oldCapacity + 2) :
                                           (oldCapacity >> 1));
          // overflow-conscious code
          if (newCapacity - MAX_ARRAY_SIZE > 0)
              newCapacity = hugeCapacity(minCapacity);
          queue = Arrays.copyOf(queue, newCapacity); //复制到新的数组中
      }
      
      
      //seftUp()函数
      private void siftUp(int k, E x) {
          if (comparator != null) //自定义的比较器
              siftUpUsingComparator(k, x);
          else //默认比较器
              siftUpComparable(k, x);
      }
      
      //我们主要看自定义比较器的方法
      private void siftUpUsingComparator(int k, E x) {
          while (k > 0) { //k最开始是数组的最后一位,我们开始跟他的父节点进行比较
              int parent = (k - 1) >>> 1; //根据上方公式第三条得到其父节点下标 k>0的位置肯定有父节点,不用担心下面的数组越界问题
              Object e = queue[parent]; //获取父节点的数据
              if (comparator.compare(x, (E) e) >= 0) //根据自定义比较器确定两者关系,如果x比父节点e大,则表明插入位置正确,退出循环
                  break;
              queue[k] = e; //否则将父节点的值赋值给当前子节点的位置,父节点值并没有改变,只是位置信息赋值给了当前k,comparator比较的时候还是拿到插入值x同父节点值比较,跟当前节点值无关的
              k = parent;
          }
          queue[k] = x; //上方公式获取到k应该插入的位置,对其赋值即可,k只赋值了这一次哦
      }
      
      
      • 加入元素可能破坏小顶堆性质,需要进行调整,过程为: 从K指定的位置开始,将x追层与当前点的parent进行比较并交换,知道满足x>= queue[parent]:比较可以是元素的自然顺序,也可以是自定义比较器
    获得队首元素
    • element()和peek() : 获取但不删除队首元素,也就是队列中权值最小的那个元素,区别是前者失败抛出异常,后者返回null,由于采用最小堆,且队首元素在下标为0处,因此直接返回queue[0]即可
      public E peek() {
          return (size == 0) ? null : (E) queue[0];
      }
      
    remove和poll
    • 获取并删除队首元素,区别是remove失败抛出异常,poll返回null,由于队首删除会改变最小堆结构,因此需要维护小顶堆的性质,需要调节堆
      [图片上传失败...(image-d67d9e-1592548755621)]
      public E poll() {
          if (size == 0)
              return null;
          int s = --size; //查找数组最后一位数据 x ,对其调用siftDown方法
          modCount++;
          E result = (E) queue[0];
          E x = (E) queue[s];
          queue[s] = null; //删除以后数组长度 -1 ,最后一位设置成null
          if (s != 0)  //s不是队首元素则开始调整堆
              siftDown(0, x);
          return result;
      }
      
      //调整堆的函数,无非就是跟其孩子节点进行比较
      private void siftDown(int k, E x) { //注意:此时 k 为 0 ,x为当前数组末尾那个值也就是上图中的数字 9 
          if (comparator != null) //是否存在自定义的比较器
              siftDownUsingComparator(k, x);
          else
              siftDownComparable(k, x);
      }
      
      //自定义比较器
      private void siftDownUsingComparator(int k, E x) {
          int half = size >>> 1; //这里注意理解: size是减后的数组大小,由于堆中父节点 < 子节点特性,因此,对于最下层,也就是图片中跟9同级的节点是没有必要在考虑的,同样9如果有祖父节点则右侧叔节点也是没有必要考虑的,对于上方的b图 size = 10 ,右移1位为 5就是数据15的位置,则15的父节点必然小于15所以已经找到交换过了,当15的父节点为根节点时,下方对左右有比较的,已经覆盖整个特点了,因此设置成 size / 2 更加快速合理
          while (k < half) {  //循环比较,找到数组中最小数字存放到堆顶元素
              int child = (k << 1) + 1; //第一次找到k的左子树
              Object c = queue[child];
              int right = child + 1; //找到k的右子树
              if (right < size &&
                  comparator.compare((E) c, (E) queue[right]) > 0) //首先比较左右子树找到他们的最小值,以便下面跟父节点比较是否交换
                  c = queue[child = right]; //如果右子树更小,设置给临时变量c
              if (comparator.compare(x, (E) c) <= 0) //比较当前最后一位数据x,跟他的子树中最小的数据,如果比他小,则找到了这个存放x的位置,如果比他大,则交换值
                  break;
              queue[k] = c; //将比x小的子树存放到父节点中
              k = child; //设置k的下标为当前孩子节点位置,注意child已经在比较左右子树时根据他们大小设置到最小子树上了,因此在往下查找
          }
          queue[k] = x; //找到了当前合适的父节点位置K,x的值比他的左右子树都大,则这个位置就是存放x的位置
      }
      
      
      
    remove(Object o) :删除指定元素
    • 遍历数组找到第一个满足条件的元素下标(如果有多个,只删除一个的),删除会改变队列结构,需要进行调整,分为两种情况:
      1. 删除的数组最后一位元素,则直接删除即可
      2. 删除的不是最后一个元素,则从删除点开始以最后一个元素为参考调用一次siftDown(),可能导致不满足父节点小于该删除节点,还需要上虑调用添加是的siftUp()将添加走一遍
    • 很多人对第二个不理解,简单分析一下源码,删除操作实际调用removeAt(int i)
      [图片上传失败...(image-53d895-1592548755621)]


      image
        private E removeAt(int i) { //i为数组中从0开始查找第一个满足.equeal(o)的元素下标
          // assert i >= 0 && i < size;
          modCount++;
          int s = --size; //找到数组末尾数据,如果是第一种情况直接删除即可
          if (s == i) // removed last element
              queue[i] = null;
          else {  //如果是第二种情况
              E moved = (E) queue[s]; //首先保存移除数组末尾元素,然后将数组最后一位置空,即删除任何数据,其实就是删除最未那个,这点同红黑树删除一致
              queue[s] = null;
              siftDown(i, moved); //再次调用siftDown,此时i是需要删除的元素下标,moved是数组最后一位的数据,其实上方的poll弹出堆顶元素,只是一个特殊的移除下标为0的数据而已
              //对于上方的图片只是一种特殊情况4正好是9的祖父节点,如果需要移除的是10也没有关系,我们依然删除下标10的数据,将数据9跟10的两个子节点15 跟11比较,发现都小于则将9放到下标2的位置再将10返回即可,只要满足子节点大于父节点即可
              if (queue[i] == moved) {  //上方调整过的数据,如果发现当前删除节点数据直接填充了比如删除的是11,这个时候填充9到下标6的位置,你发现 queue[6] = movie = 9下方的都是大于9的,但是父节点并不能保证小于9哦,这个时候就需要向添加时候一样,向上过滤直到满足所有下方数据都小于父节点即2跟6交换后2位置9,6位置10
                  siftUp(i, moved);
                  if (queue[i] != moved)
                      return moved;
              }
          }
          return null;
      }
      
    注意:无论是队列还是栈都是可以用链表或者数组实现的,基本上所有数据都是这两种形式
    • 很多人会说,我学习ArrayDeque跟PriorityQueue数据结构有啥用呢?你大概是忘记了一个重要的东西线程池
    • 线程池中缓存队列有主要有四种:
      1. ArrayBlockingQueue(int i): 它是ArrayDeque的线程安全版本,基本使用一致,无非就是加了锁保证线程安全性,还有一个它是有界的i是数组的大小,不可更改,ArrayDeque是可变的
      2. LinkedBlockingQueue(int i): i可要可不要,如果不要就是不固定大小的链表,如果有则为指定大小的链表,这个简单就不分析了,跟链表一致的
      3. PriorityBlockingQueue(int i): i可有可无,意义同上,是PriorityQueue的线程安全类,依据设置优先级弹出队列中的任务给线程池使用
      4. SynchronizedQueue() : 没有空间的队列,不缓存任何数据滴,来多少任务创建多少线程

    相关文章

      网友评论

          本文标题:ThreadPoolExecutor线程池源码和典型问题

          本文链接:https://www.haomeiwen.com/subject/oekvxktx.html