美文网首页
6-ExecutorService

6-ExecutorService

作者: 鹏程1995 | 来源:发表于2020-02-11 10:58 被阅读0次

    概述

    引入

    我们之前记录了Executor接口,它是用来将子任务的创建和启动两个操作进行解耦而出现的,我们在上一节中的“使用思路“构建了n个子线程用来不停的执行到来的任务。但是我们的例子使用并不方便,比如:

    1. 我们只提供无返回结果的子任务执行,如果有结果穿出就得自己在子任务中做变量同步
    2. 只提供了执行接口,并利用守护线程和用户线程的关系完成退出,没有提供标准的线程池的关闭功能
    3. 只有单独执行,没有批量操作

    我们本次介绍的新的接口ExecutorService也是基于这些方面对Executor进行了拓展。

    摘要

    本文主要介绍ExecutorService的各个接口的设计意图,并对上一节我们设计的例子进行了进一步的完善。

    类介绍

    类定位

    ExecutorService是线程池框架的一个重要的借口,它定义了一个作为线程池的实现类必须要实现的几个方法。

    注意

    在实现ExecutorService接口时要注意线程安全,此类是有可能同时被多个线程操作的,比如同时加入任务,同时调用关闭等等。

    源码解读

    提交任务

    /**
     * 提交一个有返回值的任务,并返回一个可以检测此任务状态和取消此任务的 Future 。
     * 此函数只是提交任务,具体是直接执行还是排队等着就看各自实例的实现了。【有点像 Executors.execute()】
     *
     * 
     * @param task 任务
     * @param <T> 返回值类型
     * @return 代表此任务的 Future
     * @throws RejectedExecutionException 拒绝执行此任务时抛出
     * @throws NullPointerException 入参为空时抛出
     */
    <T> Future<T> submit(Callable<T> task);
    
    /**
     * 提交一个没有返回值的任务,和完成此任务后返回的值,其他和上面的一样
     *
     */
    <T> Future<T> submit(Runnable task, T result);
    
    /**
     * 返回的结果是 null ,其他的和上面一样
     *
     */
    Future<?> submit(Runnable task);
    

    批量提交任务

    /**
     * 批量提交任务并进行执行、等待。
     * 注意这里不同上面单次提交任务,这里提交后要等待任务全部执行完成后才能返回,
     * 如果你主线程中有其他的并行操作而且追求调用效率,可以自己用循环调用 submit ,如果图方便的话还是可以直接
     * 调用这个的。
     * 入参的集合结构不能变,入参的集合结构不能变,入参的集合结构不能变,重要的话说三次
     *
     * @param tasks 任务集合
     * @param <T> 任务集返回值类型
     * @return 一串代表任务的 Future ,和入参集合的遍历顺序相同
     * @throws InterruptedException 等待执行结果时遇到了线程中断时抛出,未完成的任务会被取消
     * @throws NullPointerException 入参集合为 null 或者入参集合中有元素为 null时抛出
     * @throws RejectedExecutionException 如果有任务被拒绝执行时抛出
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    
    /**
     * 批量执行任务、并限时等待。
     * 时间到达时未完成的任务会被取消。其他的和上面函数相同
     *
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    
    /**
     * 批量执行任务,返回第一个完成执行的任务,并取消其他的任务。
     *
     * 其他的和上面的 invokeAll() 一样 
     *
     * @throws ExecutionException 如果提交的所有任务都没有执行成功
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    
    /**
     * 增加了限时要求。
     *
     *
     * @throws TimeoutException 在给定时间内没有一个执行成功时抛出
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
    

    状态检测

    /**
     * 判断线程池是否被关闭【不接受新任务】
     */
    boolean isShutdown();
    
    /**
     * 判断线程池是否已经终结【不接受新任务而且现有的所有任务都结束,可以安全回收此实例的那种】
     */
    boolean isTerminated();
    

    关闭

    /**
     * 开始进行有序的关闭,不允许再提交任务,现有的任务会继续执行。
     *
     * 此方法会立刻返回,你需要调用 awaitTermination() 等待现有任务执行完
     *
     *
     * @throws SecurityException 【后面再详细了解吧】
     */
    void shutdown();
    
    /**
     * 尝试终止所有正在执行的任务,停止等待的任务,然后把等待的任务列表返回。
     *
     * 终止正在执行的任务我们仅做了线程中断,至于子线程什么时候反应,要多久反应,
     * 此方法不会阻塞等待,如果有需要就用 awaitTerminantion() 阻塞主线程等待
     *
     * 如果有子线程对中断不反应,继续执行他的任务,我们也没办法,我们只能告诉它“该停了”,
     * 其他的也没什么用了。
     *
     * @return 提交后未执行的任务
     * @throws SecurityException 
     */
    List<Runnable> shutdownNow();
    

    使用示例

    示例代码——MyExecutorService

    package com.gateway.concurrent.pool;
    
    import java.util.*;
    import java.util.concurrent.*;
    
    public class MyExecutorService implements ExecutorService {
    
        private final Thread[] threads = new Thread[POOL_SIZE];
    
        private final BlockingQueue<Runnable> tasks = new LinkedBlockingDeque<>();
    
        private volatile boolean ifAcceptNewTask = true;
    
        private volatile boolean ifTryToStopThread = false;
    
        private final static int POOL_SIZE = 5;
    
        private final static long SLEEP_UNIT = 1000;
    
        public MyExecutorService() {
            for (int i=0;i<POOL_SIZE;i++){
                threads[i] = new Thread(()->{
                    while (true){
                        // 取任务
                        try {
                            Runnable runnable = null;
                            runnable = tasks.take();
                            runnable.run();
                        } catch (InterruptedException e) {
                            // 取任务时被中断
                            e.printStackTrace();
                            if (ifTryToStopThread){// 要停止线程了
                                System.out.println("此线程收到结束中断,退出执行");
                                break;
                            }
    
                        }
    
                    }
    
                });
                threads[i].start();
            }
        }
    
        @Override
        public void shutdown() {
            // 不接受新任务
            this.ifAcceptNewTask = false;
    
        }
    
        @Override
        public List<Runnable> shutdownNow() {
            this.ifAcceptNewTask = false;
            this.ifTryToStopThread=true;
    
            List<Runnable> unExecutedTasks = new ArrayList<>();
            tasks.drainTo(unExecutedTasks);
    
            for (int i =0;i<POOL_SIZE;i++){
                if (threads[i].isAlive()){
                    threads[i].interrupt();
                }
            }
            return unExecutedTasks;
        }
    
        @Override
        public boolean isShutdown() {
            return this.ifAcceptNewTask;
        }
    
        @Override
        public boolean isTerminated() {
            for (int i=0;i<POOL_SIZE;i++){
                if (threads[i].isAlive()){
                    return false;
                }
            }
            return true;
        }
    
        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
            Long deadLine = System.currentTimeMillis()+unit.toNanos(timeout);
            while (!isTerminated()){
                // 超时
                if (System.currentTimeMillis() > deadLine){
                    return false;
                }
                // 等待
                if (System.currentTimeMillis() - deadLine > SLEEP_UNIT){
                    Thread.currentThread().sleep(SLEEP_UNIT);
                }
            }
            return true;
        }
    
        @Override
        public <T> Future<T> submit(Callable<T> task) {
          // TODO 这里加一下对线程池状态的判断,不要直接加
            FutureTask<T> temp = new FutureTask<>(task);
            try {
                tasks.put(temp);
            } catch (InterruptedException e) {
                e.printStackTrace();
    
                // 被打断干脆就不等了,其实讲真估计也不会到这一步,
                // 毕竟LinkedBlockingQueue是无界到,也不会满
                throw new RejectedExecutionException("任务队列已满,阻塞等待时被打断",e);
            }
    
            return temp;
        }
    
        @Override
        public <T> Future<T> submit(Runnable task, T result) {
            return this.submit(()->{
                task.run();
                return result;
            });
        }
    
        @Override
        public Future<?> submit(Runnable task) {
            return this.submit(task,null);
        }
    
        @Override
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
            if (Objects.isNull(tasks)){
                throw new NullPointerException("入参不能为空");
            }
    
            // 创建任务
            Iterator<Callable<T>> iterator = (Iterator<Callable<T>>) tasks.iterator();
            List<Future<T>> result = new ArrayList<>();
    
            while (iterator.hasNext()){
                result.add(this.submit(iterator.next()));
            }
    
            // 等待任务完成
            Iterator<Future<T>> resultIterator = result.iterator();
            while (resultIterator.hasNext()){
                try {
                    resultIterator.next().get();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
    
            return result;
        }
    
        // 思路差不多,不写了
        @Override
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
            return null;
        }
    
        @Override
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
            return null;
        }
    
        @Override
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return null;
        }
    
        @Override
        public void execute(Runnable command) {
            if (Objects.isNull(command)){
                throw new NullPointerException("入参不能为空");
            }
            try {
                this.tasks.put(command);
            } catch (InterruptedException e) {
                e.printStackTrace();
                // 被打断干脆就不等了,其实讲真估计也不会到这一步,
                // 毕竟LinkedBlockingQueue是无界到,也不会满
                throw new RejectedExecutionException("任务队列已满,阻塞等待时被打断",e);
            }
    
        }
    
    
    }
    
    

    示例代码——Main函数

    package com.gateway.concurrent.pool;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    public class Pool1 {
        public static void main(String args[]){
            MyExecutorService myExecutorService = new MyExecutorService();
            List<Future<String>> futureTasks = new ArrayList<>();
            for (int i = 0; i < 40; i++) {
                futureTasks.add(myExecutorService.submit(new TestCalss("任务" + i)));
            }
    
            // 主线程继续做一些别的事
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
    
            for (int i = 0; i < futureTasks.size(); i++) {
                try {
                    System.out.println(futureTasks.get(i).get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    System.out.println("等待过程中被打断,重新等待这个任务结束");
                    i--;
    
                } catch (ExecutionException e) {
                    e.printStackTrace();
                    System.out.println("任务执行失败");
                }
            }
    
            // 关闭线程池
            myExecutorService.shutdown();
    
            try {
                // 如果可以正常关闭,最好
                // 当然,我们自己写到代码,MyExecutorService 是肯定不能正常关闭的
                // 我们这里的等待时间是给任务队列中尚未执行的任务的
                if (myExecutorService.awaitTermination(1000, TimeUnit.NANOSECONDS)){
                    return;
                }
    
                myExecutorService.shutdownNow();
    
                // 我们这里的等待时间是给线程池中的线程停止正在执行的任务的
                if (myExecutorService.awaitTermination(1000, TimeUnit.NANOSECONDS)){
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    
        static class TestCalss implements Callable<String> {
    
            private String name;
    
            public TestCalss(String name) {
                this.name = name;
            }
    
            @Override
            public String call() throws Exception {
                Long sleepTime = (long) (Math.random() * 1000);
                try {
                    Thread.sleep(sleepTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return name + " 完成,用时 " + sleepTime + " 毫秒";
            }
        }
    
    }
    

    核心逻辑介绍

    没啥说的,个人感觉相比于上一个版本,最大的进步就是:

    1. 可以返回值了
    2. 可以主动控制线程池的关闭回收了【这次我们没有将线程池中的线程设置为守护线程,你能正常运行完程序就表示你成功关闭了线程池里的五条线程】

    使用思路

    使用更加简便,我们一般都是作为线程池的使用者的,所以在大多数情况下不是很关系线程池的内部逻辑,我们调用的思路是:

    1. 创建/获得一个线程池实例
    2. 任务入线程池
    3. 获得任务结果
    4. 完成所有操作后关闭线程池

    扩展

    参考文献

    相关文章

      网友评论

          本文标题:6-ExecutorService

          本文链接:https://www.haomeiwen.com/subject/wjgtfhtx.html