线程池的使用

作者: AnonyPer | 来源:发表于2019-04-08 16:11 被阅读3次

Java线程池

前言

平时使用Java线程时,习惯于直接new Thread来创建一个线程,这种创建线程的方法随手就出来了,满足于我们一般的使用场景,但是每次new Thread都会产生一个对象,同时我们也知道这样的线程缺乏有效的管理,由此java推出了线程池,相对于直接new Thread,线程池有以下优点:

  • 降低资源消耗

线程池可利用已存在线程,少了对象的创建,减少了性能开销

  • 可有效管理线程

控制线程数、控制执行规则,控制并发数

Java线程池源码的介绍

前面大概介绍了线程池的优点,那么java中关于线程池的类有哪些?

[图片上传失败...(image-977979-1554711086155)]

接口Executor

java中线程池的接口为Executor,该接口提供了一个void execute(Runnable var1);方法,接受一个Runnable的任务。

package java.util.concurrent;

public interface Executor {
    void execute(Runnable var1);
}

子类接口ExecutorService

Executor接口的子类ExecutorService封装了更多的方法:

public interface ExecutorService extends Executor {
    void shutdown();//关闭命令,不再接受新的任务 如果还有未执行完的任务,则执行完之后之后关闭

    List<Runnable> shutdownNow();//停止正在执行的任务,不再处理正在等待的任务

    boolean isShutdown();//是否关闭,在调用shutdown之后返回true,其他返回false

    boolean isTerminated();//立即判断 调用shutdown之后,在线程都执行完后返回true,其他返回false

    //在指定时间后,判断是否已经关闭,如果关闭返回true,否则返回false
    boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;

    //以下是提交线程的方法
    
    <T> Future<T> submit(Callable<T> var1);

    <T> Future<T> submit(Runnable var1, T var2);

    Future<?> submit(Runnable var1);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
}

抽象子类AbstractExecutorService

ExecutorService接口的抽象实现类AbstractExecutorService,实现了ExecutorService接口中的提交线程的相关方法:

public abstract class AbstractExecutorService implements ExecutorService {
    public Future<?> submit(Runnable var1) {
        if(var1 == null) {
            throw new NullPointerException();
        } else {
            RunnableFuture var2 = this.newTaskFor(var1, (Object)null);
            this.execute(var2);
            return var2;
        }
    }

    public <T> Future<T> submit(Runnable var1, T var2) {
        if(var1 == null) {
            throw new NullPointerException();
        } else {
            RunnableFuture var3 = this.newTaskFor(var1, var2);
            this.execute(var3);
            return var3;
        }
    }

    public <T> Future<T> submit(Callable<T> var1) {
        if(var1 == null) {
            throw new NullPointerException();
        } else {
            RunnableFuture var2 = this.newTaskFor(var1);
            this.execute(var2);
            return var2;
        }
    }
    ...
}

常用的实现子类ThreadPoolExecutor

抽象类AbstractExecutorService的子类ThreadPoolExecutor是我们常用到的一个类,具体实现了上面提到的未实现的方法,并提供了几种构造函数以及额外的判断方式,比如线程池大小、队列等,源码如下:


package java.util.concurrent;

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
        
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
    }

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    public void execute(Runnable var1) {
        ...
    }

    public void shutdown() {
        ...
    }

    public List<Runnable> shutdownNow() {
        ...
    }

    public boolean isShutdown() {
        ...
    }

    public boolean isTerminating() {
        ...
    }

    public boolean isTerminated() {
        ...
    }

    public boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException     {
        ...
    }

    ...
}

ThreadPoolExecutor的构造函数有四个,最终调用的构造方法如下:

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
,里面的含义如下:

  • corePoolSize 核心线程池大小
  • maximumPoolSize 线程池最大容量

如果运行的线程少于corePoolSize,新添加任务时会创建新线程来处理任务,不管是否有空闲线程;如果运行的线程大于corePoolSize但是比maximumPoolSize少时,只有队列(workQueue)满的时候才会创建新线程;

ps:默认提交任务时创建线程,当调用线程池的prestartAllCoreThreads的方法时,会提前创建好corePoolSize个基本线程

  • keepAliveTime 线程池没有线程执行时,保持存活时间

如果线程池中线程大于corePoolSize,多出的线程在执行完任务后会等待keepAliveTime时间后就会被销毁。

  • unit 存活时间单位
  • workQueue 任务队列

运行的线程数大于等于corePoolSize时,新的请求就会放在该队列中。队列的种类有以下几种:

1、ArrayBlockingQueue:这是一个基于数据结构的有界阻塞队列,按照先进先出(FIFO)原则进行排序。

2、LinkedBlockingQueue:基于链表结构的队列,按照先进先出(FIFO)原则进行排序,效率比ArrayBlockingQueue高。(对比理解ArrayList和LinkedList)

3、SynchronousQueue:不存储元素的队列,插入操作需要等待另一个线程调用了移除操作,否则插入操作就处于阻塞状态。

4、PriorityBlockingQueue:有优先级的无限阻塞队列,每次都返回优先级最高的元素。

  • threadFactory 创建线程工厂
  • handler 线程拒绝策略

拒绝策略会在请求队列满了,且创建的线程超过maximumPoolSize数,新请求加入的线程就会被拒绝。拒绝的策略有以下几种:

1、AbortPolicy 直接抛出异常(默认的拒绝策略

2、CallerRunsPolicy 只用调用者所在线程来运行任务

3、DiscardOldestPolicy 丢弃队列里面最近的一个任务,并执行当前任务

4、DiscardPolicy 不处理,丢弃掉

同时我们也可以通过实现RejectedExecutionHandler接口来自定义策略。

以上可见,我们可以根据ThreadPoolExecutor这个类的构造方法来new 一个线程池,我们采用第一个构造方法来构造一个线程池,构造之前我们先看一下系统默认的构造线程工厂和拒绝策略:

线程工厂 Executors.defaultThreadFactory()


public static ThreadFactory defaultThreadFactory() {
    return new Executors.DefaultThreadFactory();//调用该方法进行实例化
}


static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager var1 = System.getSecurityManager();
            this.group = var1 != null?var1.getThreadGroup():Thread.currentThread().getThreadGroup();
            this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
        }

        public Thread newThread(Runnable var1) {//新创建线程的方法
            Thread var2 = new Thread(this.group, var1, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
            if(var2.isDaemon()) {//设置守护线程
                var2.setDaemon(false);
            }

            if(var2.getPriority() != 5) {//设置优先级
                var2.setPriority(5);
            }

            return var2;
        }
    }

拒绝策略:defaultHandler

private static final RejectedExecutionHandler defaultHandler = new ThreadPoolExecutor.AbortPolicy();

public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() {
        }

        public void rejectedExecution(Runnable var1, ThreadPoolExecutor var2) {//直接抛出异常
            throw new RejectedExecutionException("Task " + var1.toString() + " rejected from " + var2.toString());
        }
    }

创建线程池

public class ThreadPoolTest {
    public static void main(String[] args) {
        //2个核心数 最大5个 线程队列长度为2
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5, 5000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2));
        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(200); //控制依次加入执行
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            threadPoolExecutor.submit(new MyRunnable(i));
        }

    }

    static class MyRunnable implements Runnable {
        int index;

        public MyRunnable(int index) {
            this.index = index;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " " + index + ": 执行了");
            try {
                Thread.sleep(2500);//防止执行过快,看不出效果
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

执行结果:

pool-1-thread-1 0: 执行了
pool-1-thread-2 1: 执行了
pool-1-thread-3 2: 执行了
pool-1-thread-4 5: 执行了
pool-1-thread-5 6: 执行了
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2503dbd3 rejected from java.util.concurrent.ThreadPoolExecutor@4b67cf4d[Running, pool size = 5, active threads = 5, queued tasks = 2, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at com.test.java.ThreadPoolTest.main(ThreadPoolTest.java:21)
pool-1-thread-1 3: 执行了
pool-1-thread-2 4: 执行了

以上可以看出,0、1、2达到核心线程池最大数后,3、4被放入队列中,然后新加5、6时线程队列已满,但还没有达到最大线程上限,所以重新创建了两个线程,7、8、9等直接被拒绝。
线程池提交任务执行流程

创建线程池

线程池的创建,每次实例化需要的参数太多,为了更简便的使用线程池,java封装了一个线程池的静态工厂类Executors,该类以静态方法提供了四种常用的创建线程池方法:

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int var0) {
    return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}

可以看到,传入的参数即为核心线程数,最大线程数核心线程数一致。LinkedBlockingQueue()的方法如下:

public LinkedBlockingQueue() {
    this(2147483647);//int 最大值
}

public LinkedBlockingQueue(int var1) {
    this.count = new AtomicInteger();
    this.takeLock = new ReentrantLock();
    this.notEmpty = this.takeLock.newCondition();
    this.putLock = new ReentrantLock();
    this.notFull = this.putLock.newCondition();
    if(var1 <= 0) {
        throw new IllegalArgumentException();
    } else {
        this.capacity = var1;
        this.last = this.head = new LinkedBlockingQueue.Node((Object)null);
    }
}

核心线程数和最大线程数一样,阻塞队列长度几乎不限。适用于任务数量有限,但是每一个任务都需要长时间运行的情况。

public class ThreadPoolTest {
    public static void main(String[] args) {

        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            threadPool.submit(new MyRunnable(i));
        }
        threadPool.shutdown();

    }

    static class MyRunnable implements Runnable {
        int index;

        public MyRunnable(int index) {
            this.index = index;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " " + index + ": 执行了");
            try {
                Thread.sleep(2500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

//执行结果
pool-1-thread-1 0: 执行了
pool-1-thread-2 1: 执行了
pool-1-thread-3 2: 执行了
pool-1-thread-4 3: 执行了
pool-1-thread-5 4: 执行了
pool-1-thread-1 5: 执行了
pool-1-thread-2 6: 执行了
pool-1-thread-3 7: 执行了
pool-1-thread-4 8: 执行了
pool-1-thread-5 9: 执行了

Process finished with exit code 0 线程创建了5个 在4、5中间停顿了2秒,前面的线程执行完毕,后面的才执行。

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}

public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
    return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), var0);
}

核心线程数为0,线程总长度几乎无限,存在时间为60秒,适合于任务密集,但是执行时间较短的场景。

//将上述newFixedThreadPool(5)改成newCachedThreadPool();
ExecutorService threadPool = Executors.newCachedThreadPool();

//执行结果
pool-1-thread-1 0: 执行了
pool-1-thread-2 1: 执行了
pool-1-thread-3 2: 执行了
pool-1-thread-4 3: 执行了
pool-1-thread-5 4: 执行了
pool-1-thread-6 5: 执行了
pool-1-thread-7 6: 执行了
pool-1-thread-8 7: 执行了
pool-1-thread-9 8: 执行了
pool-1-thread-10 9: 执行了

Process finished with exit code 0 //创建了10个线程

newSingleThreadScheduledExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
    return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
}

线程个数为1,适用于多个任务排队执行的场景。

newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int var0) {
    return new ScheduledThreadPoolExecutor(var0);
}

public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
    return new ScheduledThreadPoolExecutor(var0, var1);
}


//ScheduledThreadPoolExecutor
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService{
    public ScheduledThreadPoolExecutor(int var1, ThreadFactory var2) {
        super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), var2);
    }
}


//ScheduledExecutorService
public interface ScheduledExecutorService extends ExecutorService {
    ScheduledFuture<?> schedule(Runnable var1, long var2, TimeUnit var4);

    <V> ScheduledFuture<V> schedule(Callable<V> var1, long var2, TimeUnit var4);

    ScheduledFuture<?> scheduleAtFixedRate(Runnable var1, long var2, long var4, TimeUnit var6);

    ScheduledFuture<?> scheduleWithFixedDelay(Runnable var1, long var2, long var4, TimeUnit var6);
}

实现了ScheduledExecutorService,可以创建一个有日程安排功能的线程池,比如:

  • 推迟5秒执行:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
//第一个参数是任务 第二个参数时推迟时间 第三个是时间单位
scheduledExecutorService.schedule(new MyRunnable(1),5L,TimeUnit.SECONDS);
  • 每隔一秒执行一次
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
//第一个参数是任务 第二个参数是初始推迟时间 第三个参数是周期频率 第四个参数是时间单位(2、3个参数的)
scheduledExecutorService.scheduleAtFixedRate(new MyRunnable(1), 2, 5, TimeUnit.SECONDS);
  • 上个任务执行结束后5秒再次执行
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
//第一个参数是任务 第二个参数是初始推迟时间 第三个参数是结束后时长 第四个参数是时间单位(2、3个参数的)
scheduledExecutorService.scheduleWithFixedDelay(new MyRunnable(1), 1, 1, TimeUnit.SECONDS);

线程池的监控和管理

线程池关闭

在介绍线程池源代码的时候,也介绍了线程池的关闭方法以及每一个方法对应的特点:

  • showdown()

调用该方法后,线程池停止接受任务新的任务,并等待已提交的任务(正在执行、处于等待的)执行完毕后关闭线程池。

  • shutdownNow()

调用该方法会强制关闭线程池,会取消正在运行的以及处于等待队列的任务,该方法返回等待队列的任务列表,会触发InterruptedException的异常。

  • isShutdown();

判断是否调用showdown关闭,在调用shutdown之后返回true,其他返回false

  • isTerminated();

立即判断线程池是否关闭 调用shutdown之后,在线程都执行完后返回true,其他返回false

  • awaitTermination(long timeout, TimeUnit unit)

该方法是检测线程池是否关闭,在调用shutDown后,等待一段时间后去判断线程池是否关闭,返回true则意味着已关闭,false则意味着为关闭。如果在等待的过程中,线程池关闭了,则立即返回true。

线程池的监控

除了关闭之外,线程池还有一些监控的方法:

  • getActiveCount

获取活动线程的数量

  • getPoolSize

获取线程池的数量

  • getLargestPoolSize

获取线程池曾经最大线程数量,如果线程池满过,那么该数值等于线程池最大值

  • getTaskCount

线程池中需要执行的任务数量

  • getCompletedTaskCount

线程池中已经执行完成的任务数量,小于等于taskCount

  • beforeExecute(可重写)

线程执行前的方法

  • afterExecute(可重写)

线程执行后的方法

  • terminated(可重写)

线程关闭后的方法

以上就是线程池的基本用法!

相关文章

  • Java线程池的使用

    线程类型: 固定线程 cached线程 定时线程 固定线程池使用 cache线程池使用 定时调度线程池使用

  • java----线程池

    什么是线程池 为什么要使用线程池 线程池的处理逻辑 如何使用线程池 如何合理配置线程池的大小 结语 什么是线程池 ...

  • spring 线程池和java线程池

    jdk线程池就是使用jdk线程工具类ThreadPoolExecutor 创建线程池spring线程池就是使用自己...

  • Spring @Async开启异步任务

    定义线程池 使用线程池

  • 八、线程池剖析

    一、前置问题 线程的状态转换 为什么要使用线程池 线程池的继承体系 线程池使用的场景 线程数的设置规则 线程池的状...

  • 1203-AsyncTask详解一:线程池的基本设置

    AsyncTask的内部使用线程池处理并发,要了解它是怎样使用线程池的,那要先了解线程池的基本设置 线程池的基本参...

  • ExecutorService shutdown()和shutd

    ExecutorService是我们经常使用的线程池,当我们使用完线程池后,需要关闭线程池。ExecutorSer...

  • java 线程池使用和详解

    线程池的使用 构造方法 corePoolSize:线程池维护线程的最少数量 maximumPoolSize:线程池...

  • java线程池

    线程VS线程池 普通线程使用 创建线程池 执行任务 执行完毕,释放线程对象 线程池 创建线程池 拿线程池线程去执行...

  • Android中的线程池

    前言 提到线程池,我们先说一下使用线程池的好处。使用线程池的优点可以概括为:1、重复使用线程池中的线程,避免因为线...

网友评论

    本文标题:线程池的使用

    本文链接:https://www.haomeiwen.com/subject/mdttvqtx.html