美文网首页
线程池简易实现和线程池源码

线程池简易实现和线程池源码

作者: wxxhfg | 来源:发表于2021-07-08 20:41 被阅读0次

线程池简单实现

线程池简要图示
package Thread;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadPoolDemo {
    //1.阻塞队列
    private BlockQueue<Runnable> taskQueue;

    //2.核心线程数
    private  int coreSize;

    //3.获取任务的超时时间
    private long timeout;


    //4.时间转换
    private TimeUnit timeUnit;

    //5、线程集合
    HashSet<Worker> workers = new HashSet<>();


    //6.拒绝策略
    private RejectPolicyDemo<Runnable> rejectPolicy;


    public ThreadPoolDemo(int coreSize, long timeout, TimeUnit timeUnit , int queueCap , RejectPolicyDemo<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockQueue<Runnable>(queueCap);
        this.rejectPolicy = rejectPolicy;
    }

    class  Worker extends  Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            //1.当task 不为空,执行任务
            //2.当task 执行完毕,再接着从任务队列获取任务继续执行
//            while(task != null || (task = taskQueue.take()) != null){  //该策略会死等 ,就算线程池为空,也会一直等待
            while(task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){
                try{
                    task.run();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    task = null; //没有任务了 将task置为null
                }
            }

            synchronized (workers){
                workers.remove(this);
            }

        }
    }




    //执行任务
    public  void excute(Runnable task){
        //如果任务数没有超过 coresize 时 ,直接交给worker对象执行
        //如果超过了 coresize时 ,将任务加入到阻塞队列中
      synchronized (workers){

          if (workers.size() < coreSize) {
              Worker worker = new Worker(task);
              workers.add(worker);
              worker.start();
          }
          else {
              //可选择阻塞队列满足后 选择拒绝策略
//              taskQueue.put(task);


              //1.死等
              //2 带超时等待
              //3 让调用者放弃任务
              //4 让调用者抛出异常
              //5 让调用者自己执行任务

              taskQueue.tryPut(rejectPolicy , task);

          }
      }
    }

    public static void main(String[] args) {
        ThreadPoolDemo threadPool = new ThreadPoolDemo(2 , 1000 , TimeUnit.MILLISECONDS , 100 ,
                (queue, task)->{
                    //1.死等
//                    queue.put(task);
                    //2 带超时等待
//                    queue.offer(task , 500 ,TimeUnit.MILLISECONDS );
                    //3 让调用者放弃任务
//                    System.out.println("放弃任务");
                    //4 让调用者抛出异常
//                    throw new RuntimeException("运行失败"+task); 抛出异常后 后续的任务不会再执行
                    //5 让调用者自己执行任务
                task.run();
                });
    }

}


@FunctionalInterface
interface  RejectPolicyDemo<T>{
    void reject(BlockQueue<T>  queue , T task);
}

class BlockQueue<T> {
    public BlockQueue() {

    }

    public BlockQueue(int capcity) {
        this.capcity = capcity;
    }

    // 1.阻塞队列
    private Deque<T> queue = new ArrayDeque<>();

    //2.锁
    private ReentrantLock lock = new  ReentrantLock();

    //3. 生产者变量
    private Condition fullWaitSet = lock.newCondition();

    //4.消费者变量
    private Condition emptyWaitSet = lock.newCondition();

    //5.容量
    private  int capcity;


    //带超时的阻塞获取
    public  T poll(long timeout , TimeUnit unit){
        lock.lock();
        try {
            //将timeout 统一转换成 纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    if(nanos <= 0){
                        //超时
                        return null;
                    }
                    //防止虚假唤醒,返回的是剩余时间
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }
        finally {
            lock.unlock();
        }

    }


    //阻塞获取
    public T take(){
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }
        finally {
            lock.unlock();
        }
    }

    //带超时时间的阻塞添加
    public boolean offer(T task , long timeout , TimeUnit unit){
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout);

            while(capcity == queue.size()){
                try {
                    if(nanos <= 0){
                        return false;
                    }
                    fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;

        } finally {
            lock.unlock();
        }
    }

    //阻塞添加
    public  void put(T element){
        lock.lock();
        try {
            while(capcity == queue.size()){
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(element);
            emptyWaitSet.signal();

        } finally {
            lock.unlock();
        }
    }



    public int size(){
        lock.unlock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicyDemo<T> rejectPolicy, T task) {
        lock.lock();
        try {
            //判断队列是否已满
            if(queue.size() == capcity){
                rejectPolicy.reject(this , task);
            }else { //有空闲
                queue.addLast(task);
                emptyWaitSet.signal();
            }

        }finally {
            lock.unlock();
        }


    }
}

源码

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量,ThreadPoolExecutor 类中的线程状态变量如下:

    // Integer.SIZE 值为 32 
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
状态名 高三位 接受新任务 处理阻塞队列任务 说明
RUNNING 111 Y Y 接收新任务,同时处理任务队列中的任务
SHUTDOWN 000 N Y 不接受新任务,但是处理任务队列中的任务
STOP 001 N N 中断正在执行的任务,同时抛弃阻塞队列中的任务
TIDYING 010 - - 任务执行完毕,活动线程为0时,即将进入终结阶段
TERMINATED 011 - - 终结状态

从数字上面比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING

线程池状态和线程池中线程的数量由一个原子整型ctl来共同表示

  • 使用一个数来表示两个值的主要原因时:可以通过一次cas同时更改两个属性的值

构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

构造参数解释:

  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
    • maximumPoolSize - corePoolSize = 救急线程数
  • keepAliveTime:救急线程空闲时的最大生存时间
  • unit:时间单位
  • workQueue:阻塞队列(存放任务)
    • 有界阻塞队列 ArrayBlockingQueue
    • 无界阻塞队列 LinkedBlockingQueue
    • 最多只有一个同步元素的队列 SynchronousQueue
    • 优先队列 PriorityBlockingQueue
  • threadFactory:线程工厂(给线程取名字)
  • handler:拒绝策略

救急线程在核心线程和阻塞队列都放不下了才会使用

工作方式:

  1. 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
  2. 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入 workQueue 队列排 队,直到有空闲的线程。
  3. 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线 程来救急。
  4. 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 下面的前 4 种实现,其它著名框架也提供了实现
    1. ThreadPoolExecutor.AbortPolicy 让调用者抛出RejectedExecutionException 异常,这是默认策略
    2. ThreadPoolExecutor.CallerRunsPolicy 让调用者运行任务
    3. ThreadPoolExecutor.DiscardPolicy 放弃本次任务
    4. ThreadPoolExecutor.DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
    5. Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方 便定位问题
    6. Netty 的实现,是创建一个新线程来执行任务
    7. ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
    8. PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
  5. 当高峰过去后,超过 corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。


    jdk线程池决拒绝策略

总览

线程池类结构
其他工具类

ThreadPoolExecutor

ThreadPoolExecutor 是 JDK 中的线程池实现,这个类实现了一个线程池需要的各个方法,它实现了任务提交、线程管理、监控等等方法。
我们经常会使用Executors这个工具类来快速构造一个线程池,对于初学者而言,这种工具类是很有用的,开发者不需要关注太多的细节,只要知道自己需要一个线程池,仅仅提供必需的参数就可以了,其他参数都采用作者提供的默认值。



public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

这两个方法都会进行使用ThreadPoolExecutor来创建一个ThreadPoolExecutor实例(具体可见前面构造方法)

Doug Lea 采用一个 32 位的整数来存放线程池的状态和当前池中的线程数,其中高 3 位用于存放线程池状态,低 29 位表示线程数(即使只有 29 位,也已经不小了,大概 5 亿多,现在还没有哪个机器能起这么多线程的吧)。

核心方法 execute()

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
      
        int c = ctl.get();
 // 如果当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务,
    // 创建一个新的线程,并把当前任务 command 作为这个线程的第一个任务(firstTask)
        if (workerCountOf(c) < corePoolSize) {
         // 添加任务成功,那么就结束了。提交任务嘛,线程池已经接受了这个任务,这个方法也就可以返回了
        // 至于执行的结果,到时候会包装到 FutureTask 中。
        // 返回 false 代表线程池不允许提交任务
            if (addWorker(command, true))
                return;
           // 前面说的那个表示 “线程池状态” 和 “线程数” 的整数
            c = ctl.get();
        }

      // 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了

   // 如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
     // 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
       // 如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程
        // 到这里,我们知道了,这块代码的真正意图是:担心任务提交到队列中了,但是线程都关闭了
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

 // 如果 workQueue 队列满了,那么进入到这个分支
    // 以 maximumPoolSize 为界创建新的 worker,
    // 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

四个拒绝策略的具体实现

// 只要线程池没有被关闭,那么由提交任务的线程自己来执行这个任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

// 不管怎样,直接抛出 RejectedExecutionException 异常
// 这个是默认的策略,如果我们构造线程池的时候不传相应的 handler 的话,那就会指定使用这个
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

// 不做任何处理,直接忽略掉这个任务
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

// 这个相对霸道一点,如果线程池没有被关闭的话,
// 把队列队头的任务(也就是等待了最长时间的)直接扔掉,然后提交这个任务到等待队列中
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

Executor生成不一样的连接池

  • 生成一个固定大小的线程池:
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

最大线程数设置为与核心线程数相等,此时 keepAliveTime 设置为 0(因为这里它是没用的,即使不为 0,线程池默认也不会回收 corePoolSize 内的线程),任务队列采用 LinkedBlockingQueue,无界队列。

过程分析:刚开始,每提交一个任务都创建一个 worker,当 worker 的数量达到 nThreads 后,不再创建新的线程,而是把任务提交到 LinkedBlockingQueue 中,而且之后线程数始终为 nThreads。

  • 生成只有一个线程的固定线程池,这个更简单,和上面的一样,只要设置线程数为 1 就可以了:
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  • 生成一个需要的时候就创建新的线程,同时可以复用之前创建的线程(如果这个线程当前没有任务)的线程池:
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

核心线程数为 0,最大线程数为 Integer.MAX_VALUE,keepAliveTime 为 60 秒,任务队列采用 SynchronousQueue。于 corePoolSize 是 0,那么提交任务的时候,直接将任务提交到队列中。

总结

  • java线程池七大属性

corePoolSize,
maximumPoolSize,
workQueue,
keepAliveTime,
unit,
threadFactory,
rejectedExecutionHandler

  • 线程池中的线程创建时机
  • 如果当前线程数少于 corePoolSize,那么提交任务的时候创建一个新的线程,并由这个线程执行这个任务;
  • 如果当前线程数已经达到 corePoolSize,那么将提交的任务添加到队列中,等待线程池中的线程去队列中取任务;
  • 如果队列已满,那么创建新的线程来执行任务,需要保证池中的线程数不会超过 maximumPoolSize,如果此时线程数超过了 maximumPoolSize,那么执行拒绝策略。
  • 任务执行过程中发生异处理

如果某个任务执行出现异常,那么执行任务的线程会被关闭,而不是继续接收其他任务。然后会启动一个新的线程来代替它。

  • 执行拒绝策略的时机
  • workers 的数量达到了 corePoolSize(任务此时需要进入任务队列),任务入队成功,与此同时线程池被关闭了,而且关闭线程池并没有将这个任务出队,那么执行拒绝策略。这里说的是非常边界的问题,入队和关闭线程池并发执行,读者仔细看看 execute 方法是怎么进到第一个 reject(command) 里面的。
  • workers 的数量大于等于 corePoolSize,将任务加入到任务队列,可是队列满了,任务入队失败,那么准备开启新的线程,可是线程数已经达到 maximumPoolSize,那么执行拒绝策略。

参考

https://www.javadoop.com/post/java-thread-pool

相关文章

网友评论

      本文标题:线程池简易实现和线程池源码

      本文链接:https://www.haomeiwen.com/subject/drbtpltx.html