美文网首页
分布式多线程线程池

分布式多线程线程池

作者: 夜阑人儿未静 | 来源:发表于2019-03-07 16:57 被阅读0次

说到多线程,概念性东西就不一一赘述了,首先回顾下线程的创建。

Java线程创建的四种方式

1.继承Thread类,重写run方法

static class ThreadDemo extends Thread{
@Override
public void run() {
//super.run();
//业务代码......
}
}

public static void main(String[] args) {
ThreadDemo thread = new ThreadDemo();
thread.setDaemon(true);
thread.setName("thread_demo");
thread.start();
}

2.实现Runnable接口,重写run方法,实现Runnable接口的实现类的实例对象作为Thread构造函数的target

static class RunnableDemo implements Runnable{
@Override
public void run() {
//业务代码......
}
}

public static void main(String[] args) {
Thread thread = new Thread(new RunnableDemo());
thread.start();
}

3.通过Callable和FutureTask创建线程

public static void main(String[] args) throws ExecutionException, InterruptedException {
CallableDemo callable = new CallableDemo();
FutureTask<Object> futureTask = new FutureTask<>(callable);
new Thread(futureTask)..start();
Object o = futureTask.get();
}

static class CallableDemo implements Callable<Object>{
@Override
public Object call() {
//业务代码......
return null;
}
}

可以看出Callable与Runable的区别在于Callable带有返回值且可以检测线程是否完成

4.通过线程池创建线程

static class ThreadDemo extends Thread{
@Override
public void run() {
//super.run();
//业务代码......
}
}

static class RunnableDemo implements Runnable{
@Override
public void run() {
//业务代码......
}
}

static class CallableDemo implements Callable<Object>{
@Override
public Object call() {
//业务代码......
return null;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);

executorService.execute(new ThreadDemo());

executorService.execute(new RunnableDemo());

FutureTask<Object> futureTask = new FutureTask<>(new CallableDemo());
Future<?> submit = executorService.submit(futureTask);
submit.get();
}

说到线程池,Executor提供了四种线程池

1. newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
2. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
3. newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
4. newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

但安装编码规约插件的同学会发现用Executor创建线程池会爆红提示,当然也给出了解释:

image.png

找到源码点进去一探究竟

image.png

newFixedThreadPool除了设置了核心线程数和最大线程数,其他用的都是默认值。

那来了解下ThreadPoolExecutor的核心参数

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,

RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
  • corePoolSize:核心线程数
    核心线程会一直存活,及时没有任务需要执行
    当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
    设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭

  • queueCapacity:任务队列容量(阻塞队列)
    当核心线程数达到最大时,新任务会放在队列中排队等待执行

  • maxPoolSize:最大线程数
    当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
    当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常

  • keepAliveTime:线程空闲时间
    当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
    如果allowCoreThreadTimeout=true,则会直到线程数量=0

  • allowCoreThreadTimeout:允许核心线程超时

  • rejectedExecutionHandler:任务拒绝处理器
    当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
    当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
    实现RejectedExecutionHandler接口,可自定义处理器

参数设置不当是会出现oom的哦,所以要注意核心参数的默认值

corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()

参数设置了,在饱和的情况下ThreadPoolExecutor的处理顺序是什么样子的呢?

  • 当线程数小于核心线程数时,创建线程。
  • 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
  • 当线程数大于等于核心线程数,且任务队列已满
  • 若线程数小于最大线程数,创建线程
  • 若线程数等于最大线程数,抛出异常,拒绝任务

最后分享个自己在项目中常用的线程池创建工具类

@Slf4j
public class LocalThreadPool {

public final static String poolName = "thread_pool";

private volatile static LocalThreadPool singletonPool;

private ThreadPoolExecutor executor;

private ThreadPoolExecutor callable;

public static LocalThreadPool getInstance(){
if(singletonPool == null){
synchronized (LocalThreadPool.class){
if(singletonPool == null){
singletonPool = new LocalThreadPool();
}
}
}
return singletonPool;
}

private LocalThreadPool(){

//runnable
final AtomicInteger runnableId = new AtomicInteger(0);

ThreadFactory runableFactory = new ThreadFactory() {

@Override
public Thread newThread(Runnable r) {

Thread thread = new Thread(r,"thread_pool_executor_"+runnableId);

thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {

@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("{}:{}",t.getName(),e);
}
});
return thread;
}
};

//callable
final AtomicInteger callableId = new AtomicInteger(0);

ThreadFactory callableFactory = new ThreadFactory() {

@Override
public Thread newThread(Runnable r) {

Thread thread = new Thread(r,"thread_pool_callable"+callableId);

thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {

@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("{}:{}",t.getName(),e);
}
});
return thread;
}
};

executor = new ThreadPoolExecutor(10,20,60,TimeUnit.SECONDS,

new LinkedBlockingQueue<>(20),runableFactory,new RejectedExecutionHandler(){

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if(!executor.isShutdown()){
r.run();
log.info("caller run runnable");
}
}
});
callable = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20),
callableFactory, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if(!executor.isShutdown()){
r.run();
log.info("caller run callable");
}
}
});
}
public void execute(Runnable r){
executor.execute(r);
}
public <T> Future<T> submit(Callable<T> c){
return callable.submit(c);
}
}

用起来非常之方便

public static void main(String[] args) {
Future<Object> submit = LocalThreadPool.getInstance().submit(new Callable<Object>() {
@Override
public Object call() {
return null;
}
});
LocalThreadPool.getInstance().execute(new Runnable() {
@Override
public void run() {
//业务代码......
}
});
}

相关文章

  • Thread

    队列 线程锁 多线程,线程池 队列 多线程爬虫示例 多线程 自定义线程 线程池

  • Java:线程池Executors.newFixedThread

    摘要:Java,多线程,线程池 多线程编程和线程池概述 (1)多线程程序: 计算机可以实现多任务 ( multit...

  • 10.3多线程详解

    Java高级-多线程 多线程创建 多线程通讯 线程池 1.多线程创建 thread/runnable图:继承Thr...

  • Springboot | 线程池的学习,多线程池配置示例

    一、线程和进程,线程的生命周期二、单线程和多线程三、线程池的概念四、线程池的使用五、多线程池配置示例 一、线程和进...

  • 分布式多线程线程池

    说到多线程,概念性东西就不一一赘述了,首先回顾下线程的创建。 Java线程创建的四种方式 1.继承Thread类,...

  • 反射、注解与依赖注入总结

    上一篇【线程、多线程与线程池总结】中主要记录线程、多线程相关概念,侧重于线程的Future使用与线程池的操作;同样...

  • 源码分析之ThreadPoolExecutor

    线程池在多线程编程的中可谓是个利器,使用线程池会大大提高多线程的效率。原因是使用线程池相对于new Thread有...

  • Java面试题——多线程

    Java面试题——多线程 1,什么是线程池? 线程池是多线程的一种处理方式,处理过程中将任务提交给线程池,任务执行...

  • ZooKeeper分布式专题(七)-- 使用zookeeper实

    ZooKeeper分布式专题与Dubbo微服务入门 zookeeper实现分布式锁 什么多线程 多线程为了能够提高...

  • 月薪2w以上的java程序员面试都会问的问题

    多线程相关问题 实现多线程有哪些方式?有什么异同? 线程的生命周期 线程池常用的有哪些? 线程池的工作原理 如何启...

网友评论

      本文标题:分布式多线程线程池

      本文链接:https://www.haomeiwen.com/subject/shchpqtx.html