美文网首页
线程池- AbstractExecutorService

线程池- AbstractExecutorService

作者: zhanglbjames | 来源:发表于2017-05-11 21:42 被阅读0次

    前言

    AbstractExecutorService实现了ExecutorService和Executor接口的基本方法,ThreadPoolExecute和ForkJoinPool继承AbstractExecutorService就可以减少实现的复杂度,接口适配器模式
    类继承结构
    1. Executor接口

    Executor的存在用来实现异步框架(将任务和任务的执行分开,不同于Thread将任务和执行绑定在一起),即将任务提交和任务如何执行分开,Executor正是用来提交任务的。 void execute(Runnable command)用于提交没有返回值的任务

    public interface Executor {
        void execute(Runnable command);
    }
    
    1. ExecutorService接口

    ExecutorService接口继承自Executor 接口,并且提供了对任务执行过程的管理操作,为了Executor提供各种管理服务而存在的,拓展了提交有返回值的任务的submit()方法

    public interface ExecutorService extends Executor {
    // 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。
    boolean awaitTermination(long timeout, TimeUnit unit);
    // 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
    // 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
    // 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。
    <T> T invokeAny(Collection<? extends Callable<T>> tasks);
    // 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
    // 如果此执行程序已关闭,则返回 true。
    boolean isShutdown();
    // 如果关闭后所有任务都已完成,则返回 true。
    boolean isTerminated();
    // 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
    void shutdown();
    // 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
    List<Runnable> shutdownNow();
    // 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
    <T> Future<T> submit(Callable<T> task);
    // 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
    Future<?> submit(Runnable task);
    // 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
    <T> Future<T> submit(Runnable task, T result);
    }
    
    1. AbstractExecutorService是对ExecutorService接口的默认实现

    AbstractExecutorService 源码详解

    invokeAny方法实现
        private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                                  boolean timed, long nanos)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (tasks == null)
                throw new NullPointerException();
            int ntasks = tasks.size();
            if (ntasks == 0)
                throw new IllegalArgumentException();
            // 含有结果的Future队列
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
            // 将本对象作为Executor创建ExecutorCompletionService对象
            ExecutorCompletionService<T> ecs =
                new ExecutorCompletionService<T>(this);
    
            try {
                // 记录可能抛出的执行异常
                ExecutionException ee = null;
                // 初始化超时时间
                final long deadline = timed ? System.nanoTime() + nanos : 0L;
                Iterator<? extends Callable<T>> it = tasks.iterator();
    
                // 确定在主循环之前开始一个任务
                futures.add(ecs.submit(it.next()));
                --ntasks;
                int active = 1; // 记录正在执行的任务数量
    
                for (;;) {
                    // 获取并移除下一个将要完成的任务的结果表示,如果没有任何表示则返回null
                    Future<T> f = ecs.poll();// 底层调用队列的poll方法(非阻塞)
                    if (f == null) { // 没有结果表示
                        if (ntasks > 0) { //如果还有剩余的任务,则提交下一个任务
                            --ntasks;
                            futures.add(ecs.submit(it.next())); 
                            ++active;
                        }
                        // 出现这种情况说明,已经有任务完成,并返回结果表示,但是
                        // 捕获到了异常,则跳出主循环,进行异常的抛出
                        else if (active == 0) 
                            break;
                        else if (timed) { // 超时获取结果表示
                            f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                            if (f == null)
                                throw new TimeoutException();
                            nanos = deadline - System.nanoTime();
                        }
                        else // 阻塞获取结果表示
                            f = ecs.take();
                    }
                    if (f != null) { //含有结果表示
                        --active;
                        try {
                            return f.get(); // 返回结果
                        } catch (ExecutionException eex) {
                            ee = eex;
                        } catch (RuntimeException rex) {
                            ee = new ExecutionException(rex);
                        }
                    }
                }
                // 
                if (ee == null)
                    ee = new ExecutionException();
                throw ee;
    
            } finally { // 最后取消所有提交的任务
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
            }
        }
        // 对doInvokeAny的封装,实现无超时等待的版本
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            try {
                return doInvokeAny(tasks, false, 0);
            } catch (TimeoutException cannotHappen) {
                assert false;
                return null;
            }
        }
         // 对doInvokeAny的封装,实现超时等待的版本
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return doInvokeAny(tasks, true, unit.toNanos(timeout));
        }
    

    通过对私有内部实现doInvokeAny的封装,实现对外的无超时等待的版本和超时等待的两个版本,通过ExecutorCompletionService类来实现对所有提交的任务执行完成时返回结果的存储和获取

    1. invokeAny方法没有对task进行显示地包装,但是通过ExecutorCompletionService的submit()方法提交任务时,实际上是调用newTaskFor()方法对任务进行了包装为RunnableFuture对象,然后调用了本对象的execute()方法提交任务,并返回异步计算结果对象
    // ExecutorCompletionService 的submit方法
        public Future<V> submit(Callable<V> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task);
            executor.execute(new QueueingFuture(f));
            return f;
        }
    
    1. 使用了ExecutorCompletionService对象对任务执行完成时结果的存取,隐含了对任务是否完成的判断;所以对返回结果就不用通过isDone()方法判断是否任务已经完成了
      ExecutorCompletionService部分源码如下:
        // 阻塞队列:用来存储已经完成的任务
        private final BlockingQueue<Future<V>> completionQueue;
    
        /**
         * 拓展了 FutureTask 在完成时将任务入队功能
         */
        private class QueueingFuture extends FutureTask<Void> {
            QueueingFuture(RunnableFuture<V> task) {
                super(task, null);
                this.task = task;
            }
            // 此方法在FutureTask任务run方法完成时调用,这里是将完成的任务入队
            protected void done() { completionQueue.add(task); }
            private final Future<V> task;
        }
    
        public Future<V> poll() {
            return completionQueue.poll();
        }
    
        public Future<V> poll(long timeout, TimeUnit unit)
                throws InterruptedException {
            return completionQueue.poll(timeout, unit);
        }
    
    invokeAll方法
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            if (tasks == null)
                throw new NullPointerException();
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false; // 所有任务是否完成的标志
            try {
                // 对所有任务进行包装,并提交任务,并将返回的结果添加到futures集合中
                for (Callable<T> t : tasks) { 
                    RunnableFuture<T> f = newTaskFor(t);
                    futures.add(f);
                    execute(f);
                }
                // 对所有结果进行判断或者阻塞等待结果返回
                for (int i = 0, size = futures.size(); i < size; i++) {
                    Future<T> f = futures.get(i);
                    if (!f.isDone()) { // 如果任务没有完成
                        try {
                            f.get(); // 则阻塞等待结果返回,并压制异常
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        }
                    }
                }
                // 当所有任务已经完成了(不管是正常完成还是异常完成,
                // 如发生CancellationException、ExecutionException ),
                // 则将完成标志设为true,并返回结果集合
                done = true; 
                return futures;
            } finally {
                if (!done) // 如果发生中断异常InterruptedException 则取消已经提交的任务
                    for (int i = 0, size = futures.size(); i < size; i++)
                        futures.get(i).cancel(true);
            }
        }
    
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            if (tasks == null)
                throw new NullPointerException();
            long nanos = unit.toNanos(timeout);
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false; 
            try {
                for (Callable<T> t : tasks)
                    futures.add(newTaskFor(t));
    
                final long deadline = System.nanoTime() + nanos;
                final int size = futures.size();
    
                for (int i = 0; i < size; i++) {
                    execute((Runnable)futures.get(i));
                    // 在添加执行任务时超时判断,如果超时则立刻返回futures集合
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L)
                        return futures;
                }
                // 每次对结果进行判断时都进行超时判断
                for (int i = 0; i < size; i++) {
                    Future<T> f = futures.get(i);
                    if (!f.isDone()) { // 判断超时
                        if (nanos <= 0L)
                            return futures;
                        try {
                            f.get(nanos, TimeUnit.NANOSECONDS);
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        } catch (TimeoutException toe) {
                            return futures;
                        }
                        nanos = deadline - System.nanoTime(); // 更新剩余时间
                    }
                }
                done = true;
                return futures;
            } finally {
                if (!done)
                    for (int i = 0, size = futures.size(); i < size; i++)
                        futures.get(i).cancel(true);
            }
        }
    
    

    invokeAll方法也实现了无超时和超时两个版本

    1. 无超时版本首先将所有任务包装后提交给本对象的执行器(调用execute)执行,并将返回的结果添加到futures集合中,然后对futures集合进行遍历判断,是否已经完成,如果没有完成则使用get方法阻塞等待结果的返回,并压制了一些异常;并在finally模块对标志进行检查取消已经提交的任务
    2. 超时版本和无超时版本基本一致,但是加了超时逻辑。在2个地方增加了超时判断1) 在添加执行任务时超时判断,如果超时则立刻返回futures集合;** 2) **每次对结果进行判断时都进行超时判断。
    方法总览

    相关文章

      网友评论

          本文标题:线程池- AbstractExecutorService

          本文链接:https://www.haomeiwen.com/subject/kzqatxtx.html