美文网首页Java 杂谈
Java不魔改默认线程池机制来实现可伸缩的线程池

Java不魔改默认线程池机制来实现可伸缩的线程池

作者: 大侠陈 | 来源:发表于2019-04-18 09:18 被阅读0次

java的线程池提供设置基本大小和最大大小两个参数来实现可伸缩的线程池

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue)

可java的默认实现有时候却并不是我们想要的,在这个默认实现中,线程池创建之初,线程池会创建 corePoolSize 个数量的线程,这些线程不停的处理 workQueue 中的任务, 当 workQueue 被填满,当前所有线程全部处于忙碌状态,已经无法腾出时间来处理新任务,线程池才会创建新的线程来处理任务, 也就是说,如果workQueue未被填满,线程池是不会创建新线程的。 那么,当 workQueue 是个无界队列时,maximumPoolSize 参数是无效的。 而当 workQueue 是有界队列时,假如即使 maximumPoolSize 个线程也无法满足任务处理的速率,则那些未能被处理的任务将被饱和策略退回,有时候这并不符合我们的预期。 所以, 真正的能满足我们需求的可伸缩线程池需要我们自己实现。

在这里, 我讲解两种方法

第一种比较简单,只需要单纯的设置默认线程池的参数即可实现

public class CustomThreadPool implements Executor {
    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
    ThreadPoolExecutor threadPoolExecutor;

    public CustomThreadPool(int nThreads, int keepAliveTime) {
        threadPoolExecutor = new ThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.SECONDS, queue);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
    }

    @Override
    public void execute(@NotNull Runnable command) {
        threadPoolExecutor.execute(command);
    }
}

我们通过默认线程池构造函数初始化一个线程池, 将 corePoolSize 和 maximumPoolSize 的值设置成相同, 然后再设置空闲线程的回收时间,这样我们便创建了一个固定大小的线程池。 然而,此时线程池空闲的线程是无法被回收的,因为可以被回收的线程只能是 maximumPoolSize 减去 corePoolSize 个数量的线程, 此时这个数量为0,自然也就不会有线程被回收。

但是我们可以

  threadPoolExecutor.allowCoreThreadTimeOut(true); 

通过设置这个值来取消上面的限制, 当allowCoreThreadTimeOut 设置为true,无关 corePoolSize 的大小, 只要满足回收条件就都可以被回收, 如果所有线程都空闲,则所有线程都会被回收。

测试代码如下

CustomThreadPool customThreadPool = new CustomThreadPool(10,60);

int count = 0;

while (count < 200) {
  customThreadPool.execute(() -> {
    try {
      Thread.sleep(500);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("hello world");
  });
  count++;
}

这种方法的好处是简单,坏处是线程的创建和回收频率比较高

第二种方法克服了第一种方法存在的问题,可是需要一条额外的线程来管理任务

public class ElasticityThreadPool implements Executor {
    SynchronousQueue<Runnable> tasks = new SynchronousQueue<>();
    LinkedBlockingQueue<Runnable> buffTasks = new LinkedBlockingQueue<>();

    ThreadPoolExecutor threadPoolExecutor;

    public ElasticityThreadPool(int  , int maximumPoolSize, int keepAliveTime) {
        threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, tasks);
        threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

        Thread managerThread = new Thread(() -> {
            while (true) {
                try {
                    Runnable buffTask = buffTasks.take();
                    try {
                        threadPoolExecutor.execute(buffTask);
                    } catch (RejectedExecutionException e) {
                        buffTasks.offer(buffTask);
                    }
                } catch (InterruptedException e) {
                    break;
                }
            }
        });
        managerThread.setName("ElasticityThreadPool-TaskManager");
        managerThread.start();
    }

    @Override
    public void execute(Runnable runnable) {
        buffTasks.offer(runnable);
    }
}

这个线程池使用了有界队列,而且是一个极端有界队列 SynchronousQueue , 当 corePoolSize 数量个线程全都忙碌时,新的线程将会被创建。而当 maximumPoolSize 个线程也无法满足任务的处理时, 饱和策略将会发挥作用

threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

线程池会拒绝执行任务,并抛出 RejectedExecutionException 异常。

我们从代码中看到,还存在一个名叫 buffTasks 的误解队列, 此队列的用作任务的中转, 所有提交给线程池的任务事先都会被置入此队列

public void execute(Runnable runnable) {
  buffTasks.offer(runnable);
}

而管理线程则不断的从此队列中读取任务,并提交给真正的线程池

Thread managerThread = new Thread(() -> {
  while (true) {
    try {
      Runnable buffTask = buffTasks.take();
      try {
        threadPoolExecutor.execute(buffTask);
      } catch (RejectedExecutionException e) {
        buffTasks.offer(buffTask);
      }
    } catch (InterruptedException e) {
      break;
    }
  }
});
managerThread.setName("ElasticityThreadPool-TaskManager");
managerThread.start();
}

当线程池因为饱和而无法处理的任务通过饱和策略退回是,管理线程会再次将之存入 buffTasks, 以待后续处理

try {
    threadPoolExecutor.execute(buffTask);
} catch (RejectedExecutionException e) {
    buffTasks.offer(buffTask);
}

这里因为没有设置

allowCoreThreadTimeOut

所以被回收的线程按照线程池默认的机制处理,线程池总会保持 corePoolSize 个线程的随时就绪

线程池使用方法如下

ElasticityThreadPool threadPool = new ElasticityThreadPool(2, 10, 60);

int count = 0;
while (count < 200) {
  threadPool.execute(() -> {
    try {
      Thread.sleep(500);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("hello world");
  });
  count++;
}

这种方法的好处是实现了真正的可伸缩,坏处是多使用了一个额外的线程

两种方法的孰优孰劣看具体的使用场景而定。

相关文章

  • Java不魔改默认线程池机制来实现可伸缩的线程池

    java的线程池提供设置基本大小和最大大小两个参数来实现可伸缩的线程池 可java的默认实现有时候却并不是我们想要...

  • java 实现自定义线程池

    java 实现自定义线程池 定义线程池接口 线程池接口的默认实现 示例摘抄于《Java并发变成的艺术》4.4.3线...

  • ThreadPoolExecutor 源代码解析(base jd

    ThreadPoolExecutor 是java线程池的默认实现。本文从源代码的角度来解析线程池,后续会出一个系列...

  • java并发编程锁机制

    之前系列文章都在叙述java线程池的设计以及实现机制,没有涉及java并发编程的锁机制,这是因为锁机制与线程池是...

  • java基础-多线程

    java线程池的实现 ThreadPoolExecutor java线程池几个参数 corePoolSize当线程...

  • 线程以及java线程池实现分享

    线程以及java线程池实现分享 线程简介 JDK线程池的工作原理 JDK线程池的实现细节 1.线程简介-由来 1....

  • ThreadPoolExecutor线程池原理

    本文参考Java线程池---addWorker方法解析Java线程池ThreadPoolExecutor实现原理线...

  • 线程池工作机制与原理

    书接上文, Java线程池 。接下来记录一下线程池的工作机制和原理 线程池的两个核心队列: 线程等待池,即线程队列...

  • 线程池的原理

    参考 深入Java源码理解线程池原理 线程池是对CPU利用的优化手段 线程池使用池化技术实现,替他的实现还有连接池...

  • ExecutorService

    ExecutorService扩展和实现Executor。 java 线程池的5种状态 RUNNING 线程池...

网友评论

    本文标题:Java不魔改默认线程池机制来实现可伸缩的线程池

    本文链接:https://www.haomeiwen.com/subject/dlzawqtx.html