美文网首页安卓开发Java
Java8源码阅读 - Executor、ExecutorSer

Java8源码阅读 - Executor、ExecutorSer

作者: Mhhhhhhy | 来源:发表于2020-06-29 14:11 被阅读0次

    Executor

    public interface Executor {
        void execute(Runnable command);
    }
    

    Executor抽象提供了一种将任务提交与每个任务的运行机制(包括线程使用、调度)分离的方法,即Runnable代表任务,execute处理调度的逻辑;

    static class ThreadPerTaskExecutor implements Executor {
        public void execute(Runnable r) {
            System.out.println("ThreadPerTaskExecutor - execute"); 
            new Thread(r).start();
        }
    }
    
    static class DirectExecutor implements Executor {
        public void execute(Runnable r) {
            System.out.println("DirectExecutor - execute");
            r.run();
        }
    }
    
    class SerialExecutor implements Executor {
        final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
        final Executor executor;
        Runnable active;
        SerialExecutor(Executor executor) {
            this.executor = executor;
        }
        public synchronized void execute(final Runnable r) {
            System.out.println("task offer ... ");
            tasks.offer(new Runnable() {
                public void run() {
                    System.out.println("SerialExecutor - execute ... ");
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                }
            });
            if (active == null) {
                scheduleNext();
            }
        }
        protected synchronized void scheduleNext() {
            if ((active = tasks.poll()) != null) {
                System.out.println("schedule next task ");
                executor.execute(active);
            }
        }
    }
    
    public static void main(String[] args) {
        // SerialExecutor executor = new SerialExecutor(new ThreadPerTaskExecutor());
        SerialExecutor executor = new SerialExecutor(new DirectExecutor());
        executor.execute(() -> { System.out.println(Thread.currentThread()); });
        executor.execute(() -> { System.out.println(Thread.currentThread()); });
    }
    
    #输出
    #task offer ... 
    #schedule next task 
    #DirectExecutor - execute
    #SerialExecutor - execute ... 
    #Thread[main,5,main]
    #task offer ... 
    #schedule next task 
    #DirectExecutor - execute
    #SerialExecutor - execute ... 
    #Thread[main,5,main]
    

    官方文档上提供的示例,演示了Executor抽象的简单使用方法,示例想体现的除了异步同步任务外,我觉得更加重要的是任务执行和任务调度分离的思想;

    ExecutorService

    public interface ExecutorService extends Executor {
        // 启动有序关闭,在此过程中执行以前提交的任务,但不接受任何新任务。
        void shutdown();
        // 尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。
        List<Runnable> shutdownNow();
        // executor是否被shutdown
        boolean isShutdown();
        // 如果所有任务都完成关闭则返回true
        boolean isTerminated();
        //  阻塞直到所有任务在关闭请求后完成执行,或超时发生后,或当前线程中断后;
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
        // 提交任务,返回
        <T> Future<T> submit(Callable<T> task);
        <T> Future<T> submit(Runnable task, T result);
        Future<?> submit(Runnable task);
        // 执行一个集合的任务,返回一个列表其中包含所有任务完成时的状态和结果(可以是异常结果)。
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
        // 执行给定的任务,返回已成功完成的任务的结果(即不抛出异常),如果有的话。在正常或异常返回时,未完成的任务将被取消。
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    ExecutorService提供了对每个Executor跟踪、管理一个或者多个异步任务的进度,也可以关闭服务来终止service接收新任务和回收资源;

    void shutdownAndAwaitTermination(ExecutorService pool) {
        pool.shutdown(); // 禁止提交新任务
        try {
            //等待现有任务终止
            if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                pool.shutdownNow(); // 取消当前正在执行的任务
                // 等待一段时间,等待任务响应被取消
                if (!pool.awaitTermination(60, TimeUnit.SECONDS)) 
                    System.err.println("Pool did not terminate");
            }
        } catch (InterruptedException ie) {
            // 如果当前线程处于中断状态,重试shutdown
            pool.shutdownNow();
            // 保存中断状态
            Thread.currentThread().interrupt();
        }
    }
    

    文档中演示了一个终止服务的示例,分两个阶段关闭一个 ExecutorService,首先需要调用shutdown来拒绝传入的任务,然后调用shutdownNow(如果需要的话)来取消任何延迟的任务;

    CompletionService

    public interface CompletionService<V> {
        Future<V> submit(Callable<V> task);
        Future<V> submit(Runnable task, V result);
        Future<V> take() throws InterruptedException;
        Future<V> poll();
        Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
    }
    

    CompletionService旨在提供一个异步任务产生执行和已完成任务结果的解耦服务,即会有一个队列储存已完成的任务结果的合集,任务的提交和执行不会阻塞获取结果操作;

    ExecutorCompletionService

    public class ExecutorCompletionService<V> implements CompletionService<V> {
        private final Executor executor;
        private final AbstractExecutorService aes;
        private final BlockingQueue<Future<V>> completionQueue;
        ... 
    }
    

    ExecutorCompletionServiceCompletionService的实现类,内部有个阻塞队列储存的是完成任务的结果;

    // 注意这个queue的默认长度为Integer.MAX_VALUE
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
    
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }
    

    构造器默认使用LinkedBlockingQueue,那么很容易就联想到该阻塞队列的特性:

    • 是有界的双端队列
    • 提供阻塞和非阻塞方法获取已完成的任务
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }
    public Future<V> poll() {
        return completionQueue.poll();
    }
    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }
    
    • 当完成任务的队列满了,新完成的任务结果会被抛弃
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        // FutureTask提供的钩子
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }
    

    注意这里的doneFutureTask完成或者异常或者cancel都会被调用;

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }
    

    若构造器中传进来的ExecutorAbstractExecutorService的子类,那么newTaskFor就会交由子类来决定FutureTask的类型,达到定制扩展的效果;

    void solve(Executor e, Collection<Callable<Result>> solvers) 
            throws InterruptedException, ExecutionException {
        CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
        for (Callable<Result> s : solvers) 
            ecs.submit(s);
            int n = solvers.size();
            for (int i = 0; i < n; ++i) { 
                Result r = ecs.take().get(); // 这里会抛出中断异常
                if (r != null) 
                    use(r);
            } 
    }
    

    官方提供的用法示例,第一个比较简单,遍历任务集合中每个任务执行后获取结果,是一种顺序执行的过程,执行 -> 获取结果 -> 执行 -> 获取结果,当然执行任务过程可以是异步的,如果遇到中断异常会停止任务;

    相关文章

      网友评论

        本文标题:Java8源码阅读 - Executor、ExecutorSer

        本文链接:https://www.haomeiwen.com/subject/ndpmahtx.html