美文网首页
Android线程池学习笔记(一)

Android线程池学习笔记(一)

作者: sollian | 来源:发表于2018-05-02 11:52 被阅读24次

    Android线程学习笔记学习了线程源码,Android Future学习笔记学习了Future体系,接下来我们就可以学习一下线程池技术了,同样基于api26。

    前言

    线程的创建是很简单的,那为什么还要使用线程池呢?下面罗列几个原因:

    1. 线程的创建是比较耗费资源的,如果每个异步作业都启动新的线程,会造成资源的极大浪费。有些手机(也可能是所有的)对于线程数量是有控制的,超过最大数量,会使进程崩溃。
    2. 线程创建比较耗费时间,会降低程序的响应速度。
    3. 对每个线程进行单独管理,比较复杂。

    基于以上几点,我们来学习一下线程池技术。

    开始

    线程池继承关系如下:


    线程池继承关系图:绿色为接口,黄色为抽象类,蓝色为实现类

    线程池工场类
    Executors

    下面依次学习。

    Executor

    public interface Executor {
    
        /**
         * Executes the given command at some time in the future.  The command
         * may execute in a new thread, in a pooled thread, or in the calling
         * thread, at the discretion of the {@code Executor} implementation.
         *
         * @param command the runnable task
         * @throws RejectedExecutionException if this task cannot be
         * accepted for execution
         * @throws NullPointerException if command is null
         */
        void execute(Runnable command);
    }
    

    最顶层的接口,只有一个execute方法,接收一个Runnable参数。该接口的作用是将任务的提交和运行进行解耦,调用者无需关心任务是如何运行的,以及线程使用和调度的细节。
    通常情况下,Executor用来替代显示的创建线程,比如,下面的伪代码来替代new Thread(new RunnableTask()).start()

    Executor executor = anExecutor();
    executor.execute(new RunnableTask1());
    executor.execute(new RunnableTask2());
    

    然而,Executor并不强制要求异步调用,也可以在提交任务的线程直接调用:

    class DirectExecutor implements Executor {
        public void execute(Runnable r) {
            r.run();
        }
    }
    

    不过,大多数情况下,我们会另起一个线程:

    class ThreadPerTaskExecutor implements Executor {
        public void execute(Runnable r) {
            new Thread(r).start();
        }
    }
    

    许多Executor的实现会对任务的调度添加一些限制。下面的代码是一个组合Executor,外层将任务顺序交给内层去执行:

    class SerialExecutor implements Executor {
        final Queue<Runnable> tasks = new ArrayDeque<>();
        final Executor executor;
        Runnable active;
    
        SerialExecutor(Executor executor) {
            this.executor = executor;
        }
    
        public synchronized void execute(final Runnable r) {
            tasks.add(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                }
            });
            if (active == null) {
                scheduleNext();
            }
        }
    
        protected synchronized void scheduleNext() {
            if ((active = tasks.poll()) != null) {
                executor.execute(active);
            }
        }
    }
    

    ExecutorService

    ExecutoreService扩展了Executor接口,提供了终止任务的方法,以及Future接口相关的方法来追踪异步任务的进度。

    ExecutorService可以关闭,关闭后不再接收新的任务,同时也会释放占有的资源。

    下面是一个网络服务的例子,线程池用来处理网络请求。用到了工厂类Executors来创建线程池:

    class NetworkService implements Runnable {
        private final ServerSocket serverSocket;
        private final ExecutorService pool;
    
        public NetworkService(int port, int poolSize) throws IOException {
            serverSocket = new ServerSocket(port);
            pool = Executors.newFixedThreadPool(poolSize);
        }
    
        public void run() { // run the service
             try {
                for (;;) {
                    pool.execute(new Handler(serverSocket.accept()));
                }
            } catch (IOException ex) {
                pool.shutdown();
            }
        }
    }
    
    class Handler implements Runnable {
        private final Socket socket;
    
        Handler(Socket socket) {
            this.socket = socket; 
        }
    
        public void run() {
            // read and service request on socket
        }
    }
    

    ExecutorService接口新添了下列方法:

    void shutdown()

    调用该方法,ExecutorService不再接收新的任务,已提交的任务会继续执行。
    调用一次后,再次调用没有效果。
    该方法调用后立即返回,不会等待已提交的任务执行完毕。

    List<Runnable> shutdownNow()

    该方法会尝试停止所有正在执行的任务,未被执行的任务将永不会再执行,并返回未被执行的任务的列表。
    该方法不会等待正在执行的任务终结。
    该方法不保证正在执行的任务一定会被终结。

    boolean isShutdown()

    判断ExecutorService是否关闭

    boolean isTerminated()

    ExecutorService关闭后,所有任务终结,返回ture。
    注意,只有在调用shutdown或者shutdownNow后,才有可能返回ture。

    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException

    等待所有任务终结。ExecutorService关闭后,调用该方法来阻塞当前线程,直到所有任务终结,或者超时,或者当前线程中断。
    所有任务终结返回true,否则否会false。

    下面是一个使用的例子:

    void shutdownAndAwaitTermination(ExecutorService pool) {
        pool.shutdown(); // Disable new tasks from being submitted
        try {
            // Wait a while for existing tasks to terminate
            if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                pool.shutdownNow(); // Cancel currently executing tasks
                // Wait a while for tasks to respond to being cancelled
                if (!pool.awaitTermination(60, TimeUnit.SECONDS))
                    System.err.println("Pool did not terminate");
            }
        } catch (InterruptedException ie) {
            // (Re-)Cancel if current thread also interrupted
            pool.shutdownNow();
            // Preserve interrupt status
            Thread.currentThread().interrupt();
        }
    }
    

    <T> Future<T> submit(Callable<T> task)

    <T> Future<T> submit(Runnable task, T result)

    Future<?> submit(Runnable task)

    这三个方法类似,都是在提交任务后返回一个Future对象,除参数不同外,Future.get的返回值也有区别:
    第一个返回值为T类型
    第二个返回值为result
    第三个返回值为null

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException

    这两个方法都是提交一组任务,这里只看方法二,对方法一的理解可以参考方法二。

    1. 返回值列表和参数列表一一对应
    2. 该方法是阻塞方法,会等待所有任务执行完毕,或者达到超时条件。任务执行完毕包含任务中断或者抛出异常的情况。
    3. 发生超时或者其他未知的异常,会取消所有还未执行的任务。

    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException

    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException

    invokeAny和invokeAll的不同之处在于,invokeAll需要等待所有任务执行完毕或者超时,invokeAny只要有一个任务成功执行完毕(非异常终止或者中断,而是真正执行完毕)就返回该任务结果,其他任务则取消,或者超时后取消其他任务。

    AbstractExecutorService

    AbstractExecutorService是对ExecutorService的实现,上述对ExecutorService方法的介绍就是参照的AbstractExecutorService。
    这里只针对invokeAny的实现,介绍一下CompletionService接口。

    CompletionService

    CompletionService<V>

    CompletionService将任务的生产,和对任务结果的消费进行了解耦。生产者只关心任务的提交,消费者只关心已完成的任务结果的处理。CompletionService可以用来管理异步的I/O,比如一个程序只关心对输入的文件进行读操作,另一个程序则负责对已经读取到的结果进行处理,其中读取的顺序和处理的顺序不应定一致。

    CompletionService依赖一个单独的Executor来实际执行任务,而CompletionService本身只用来管理一个处理结果的队列。这个队列中处理结果的顺序和任务队列的顺序未必一致,先处理完的任务,其结果靠前,后处理完的结果靠后。

    Future<V> submit(Callable<V> task)

    Future<V> submit(Runnable task, V result)

    和ExecutorService的submit方法类似,用来提交任务。只不过对于返回值,在这里可以不处理。

    Future<V> take() throws InterruptedException

    阻塞方法。返回并移除最先执行完的结果,如果没有,则等待。

    Future<V> poll()

    非阻塞方法。返回并移除最先执行完的结果,如果没有,则返回null。

    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException

    阻塞方法。添加了超时时间。其余同上。

    ExecutorCompletionService

    CompletionService的唯一实现类。内部使用BlockingQueue<Future>来保存任务的执行结果,执行完的任务,其结果会顺序添加到该队列中。它的构造方法public ExecutorCompletionService(Executor executor)需要传入一个Executor作为实际任务的实际执行者。

    下面举一个使用的例子:
    假如针对一个问题需要做多个运算,每个运算会返回一个特定的Result,并且可以并行执行,每个Result都是一个非空的值方法use(Result r)用来消费结果,则可以如下编写代码:

    void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException {
        CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
        for (Callable<Result> s : solvers)
            ecs.submit(s);
        int n = solvers.size();
        for (int i = 0; i < n; ++i) {
            Result r = ecs.take().get();
            if (r != null)
                use(r);
        }
    }
    

    又比如你只想得到第一个非空的结果,然后取消剩余任务的执行:

    void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
        CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
        int n = solvers.size();
        List<Future<Result>> futures = new ArrayList<>(n);
        Result result = null;
        try {
            for (Callable<Result> s : solvers)
                futures.add(ecs.submit(s));
    
            for (int i = 0; i < n; ++i) {
                try {
                    Result r = ecs.take().get();
                    if (r != null) {
                        result = r;
                        break;
                    }
                } catch (ExecutionException ignore) {
                }
            }
        } finally {
            for (Future<Result> f : futures)
                f.cancel(true);
        }
    
        if (result != null)
            use(result);
    }
    

    invokeAny就是上例的情形,不过做了优化,如下:

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }
    
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
    
        ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
        ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
        // For efficiency, especially in executors with limited
        // parallelism, check to see if previously submitted tasks are
        // done before submitting more of them. This interleaving
        // plus the exception mechanics account for messiness of main
        // loop.
        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            ExecutionException ee = null;
            //设置超时情况下的截止时间
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();
    
            // Start one task for sure; the rest incrementally
            futures.add(ecs.submit(it.next()));//先提交第一个任务
            --ntasks;//还未提交的任务数
            int active = 1;//当前活跃的任务数 ntasks+active=任务总数
    
            for (;;) {
                Future<T> f = ecs.poll();
                if (f == null) {//已提交的任务还没有一个处理完
                    if (ntasks > 0) {//还有未提交的任务
                        --ntasks;
                       futures.add(ecs.submit(it.next()));//提交一个还未提交的任务
                        ++active;
                    } else if (active == 0) {//所有任务处理完毕
                        break;
                    } else if (timed) {//检查是否超时
                        //此时所有任务都已提交,所以直接调用阻塞方法,看超时时间内能否返回结果
                        f = ecs.poll(nanos, NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    } else {//不设超时限制,则直接调用阻塞方法来获取结果。
                        f = ecs.take();
                    }
                }//if(f==null)
    
                if (f != null) {//f!=null说明有任务已经返回结果
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }//for
    
            if (ee == null)
                ee = new ExecutionException();
            throw ee;
        } finally {
            cancelAll(futures);//取消其他已提交的任务,未提交的任务永不会提交
        }
    }
    

    相关文章

      网友评论

          本文标题:Android线程池学习笔记(一)

          本文链接:https://www.haomeiwen.com/subject/xsoqrftx.html