美文网首页程序编码
Android 多线程探索(三)— 线程池

Android 多线程探索(三)— 线程池

作者: Little丶Jerry | 来源:发表于2018-02-27 00:02 被阅读0次

    构建服务器应用程序的有效方法 — 线程池

    为什么使用线程池?

    每次通过 new Thread 创建线程并不是一种好的方式,每次 new Thread 新建和销毁对象性能较差,线程缺乏统一管理,可能无限制新建线程,相互之间竞争、占用过多资源导致死锁,并且缺乏定时执行、定期执行、线程中断等功能。

    Java 提供了 4 种线程池,能够有效地管理、调度线程,避免过多的资源损耗。它的优点如下:

    1. 重用存在的线程,减少对象创建、销毁的开销;

    2. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞;

    3. 提供定时执行、定期执行、单线程、并发数控制等功能。

    线程池原理简单地解释就是:会创建多个线程并且进行管理,提交给线程的任务会被线程池指派给其中的线程进行执行,通过线程池的统一调度、管理使得多线程的使用更简单、高效。

    线程池

    线程池都实现了 ExecutorService 接口,该接口定义了线程池需要实现的接口,如 submit、execute、shutdown 等。

    public interface ExecutorService extends Executor {
    
        void shutdown();
    
        List<Runnable> shutdownNow();
    
        boolean isTerminated();
    
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        <T> Future<T> submit(Callable<T> task);
    
        <T> Future<T> submit(Runnable task, T result);
    
        Future<?> submit(Runnable task);
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
    
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    
    
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    
    public interface Executor {
    
        void execute(Runnable command);
    }
    
    两种常用的线程池实现:
    • ThreadPoolExecutor,也就是我们用最多的线程池实现;

    • ScheduledThreadPoolExecutor,则用于周期性地执行任务。

    我们一般都不会通过 new 的形式来创建线程池,因为创建参数过程相对复杂,所以,JDK 提供了一个 Executors 工厂类来简化这个过程。下面分别介绍 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 的使用。

    1. 启动指定数量的线程 — ThreadPoolExecutor

    ThreadPoolExecutor 是线程池的实现之一,它的功能是启动指定数量的线程以及将任务添加到一个队列中,并且将任务分发给空闲的线程。

    ExecutorService 的生命周期包括 3 种状态:运行、关闭、终止。创建后便进入运行状态,当调用 shutdown() 方法时便进入关闭状态,此时 ExecutorService 不再接受新的任务,但它还在执行已经提交了的任务。当所有已经提交了的任务执行完后,就变成终止状态。

    ThreadPoolExecutor 构造函数如下:

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    参数名 作用
    corePoolSize 线程池中所保存的核心线程数。
    maximumPoolSize 线程池允许创建的最大线程数。
    keepAliveTime 当前线程池线程总数大于核心线程数时,终止多余的空闲线程的时间。
    unit keepAliveTime 参数的时间单位,可选值有毫秒、秒、分等。
    workQueue 任务队列,如果当前线程池达到核心线程数 corePoolSize,且当前所有线程都处于活动状态时,则将新加入的任务放到此队列中。
    threadFactory 线程工厂,让用户可以定制线程的创建过程,通常不需设置。
    Handler 拒绝策略,当线程池与 workQueue 队列都满了的情况下,对新加任务采取的处理策略。
    • 线程池启动后默认是空的,只有任务来临才会创建线程以处理请求。prestartAllCoreThreads() 方法可以让线程池启动后立即启动所有核心线程以等待任务。

    • 任务数量小于 corePoolSize,则立即创建新线程来处理任务;

    • 任务数量大于 corePoolSize,但小于 maximumPoolSize,则将任务放进 workQueue,当阻塞队列满时才创建新线程;

    • 如果 corePoolSize 与 maximumPoolSize 相同,则创建了固定大小的线程池。

    workQueue 有下列几个常用实现:

    1. ArrayBlockingQueue:基于数组结构的有界队列,此队列按 FIFO 原则对任务进行排序。如果队列满了还有任务进来,则调用拒绝策略。

    2. LinkedBlockingQueue:基于链表结构的无界队列,此队列按 FIFO 原则对任务进行排序。因为它是无界的,所以不会满,采用此队列后线程池将忽略拒绝策略(handler)参数,同时忽略最大线程数 maximumPoolSize 等参数。

    3. SynchronousQueue:直接将任务提交给线程而不是将它加入到队列,实际上此队列是空的。如果新任务来了线程池没有任何可用线程处理的话,则调用拒绝策略。其实要是把 maximumPoolSize 设置成无界(integer.MAX_VALUE),加上 SynchronousQueue 队列,就等同于 Executors.newCachedThreadPool()。

    4. PriorityBlockingQueue:具有优先级的队列的有界队列,可以自定义优先级,默认是按自然排序,可能很多场合并不适合。

    当线程池与 workQueue 队列都满了的情况下,对新加任务采取的处理策略有如下四个默认实现:

    1. AbortPolicy:拒绝任务,抛出 RejectedExecutionException 异常。线程池的默认策略。

    2. CallerRunsPolicy:用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。

    3. DiscardOldestPolicy:如果线程池尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。这样的结果是最后加入的任务反而有可能被执行,先前加入的都被抛弃了。

    4. DiscardPolicy:加不进的任务都被抛弃了,同时没有异常抛出。


    1.1 newFixedThreadPool(int size)

    对与 Android 平台来说,由于资源有限,最常使用的就是通过 Executors.newFixedThreadPool(int size) 函数来启动固定数量的线程池:

    public class ExecutorDemo {
    
        // 任务数量
        private static final int MAX = 10;
    
    
        public static void fixedThreadPool(int size) throws ExecutionException, InterruptedException {
             // 创建固定数量的线程池
            ExecutorService executorService = Executors.newFixedThreadPool(size);
    
            for (int i = 1; i <= MAX; i++) {
                // 提交任务
                Future<Integer> task = executorService.submit(new Callable<Integer>() {
                    @Override
                    public Integer call() throws Exception {
                        System.out.println("执行线程:" + Thread.currentThread().getName());
                        return fibc(40);
                    }
                });
                System.out.println("第 " + i + " 次计算,结果:" + task.get());
            }
        }
    
        private static int fibc(int n) {
            if (n == 0) {
                return 0;
            }
    
            if (n == 1) {
                return 1;
            }
    
            return fibc(n - 1) + fibc(n - 2);
        }
    }
    

    newFixedThreadPool(int nThreads) 的实现:

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    在该函数中,会调用 ThreadPoolExecutor 的构造函数,设置它的 corePoolSize 和 maximumPoolSize 值都是 nThreads,并且设置 keepAliveTime 参数为 0 毫秒,最后设置无界任务队列。这样该线程池就含有固定个数的线程,并且能容纳无限个任务。

    输出结果如下:

    执行线程:pool-3-thread-1
    第 1 次计算,结果:102334155
    执行线程:pool-3-thread-2
    第 2 次计算,结果:102334155
    执行线程:pool-3-thread-3
    第 3 次计算,结果:102334155
    执行线程:pool-3-thread-1
    第 4 次计算,结果:102334155
    执行线程:pool-3-thread-2
    第 5 次计算,结果:102334155
    执行线程:pool-3-thread-3
    第 6 次计算,结果:102334155
    执行线程:pool-3-thread-1
    第 7 次计算,结果:102334155
    执行线程:pool-3-thread-2
    第 8 次计算,结果:102334155
    执行线程:pool-3-thread-3
    第 9 次计算,结果:102334155
    执行线程:pool-3-thread-1
    第 10 次计算,结果:102334155
    

    1.2 newCachedThreadPool()

    线程越多,并发量越大,然而占用的内存也就越大,指定过大的线程数量并不可取。因此,我们可能需要一种场景,如果来了一个新的任务,并且没有空闲线程可用,此时必须马上创建一个线程来立即执行任务。这时就可以通过 Executors 的 newCachedThreadPool 函数来实现。

        // 创建线程池
        public static void cachedThreadPool() throws ExecutionException, InterruptedException {
    
            ExecutorService executorService = Executors.newCachedThreadPool();
    
            for (int i = 1; i <= MAX; i++) {
                // 提交任务
                Future<Integer> task = executorService.submit(new Callable<Integer>() {
                    @Override
                    public Integer call() throws Exception {
                        System.out.println("执行线程:" + Thread.currentThread().getName());
                        return fibc(40);
                    }
                });
                System.out.println("第 " + i + " 次计算,结果:" + task.get());
            }
        }
    

    newCachedThreadPool() 函数实现:

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    可以看到,newCachedThreadPool 函数不需传入线程的数量。它会调用 ThreadPoolExecutor 的构造函数,设置它的
    maximumPoolSize 值为无界值(Integer.MAX_VALUE),并且设置 keepAliveTime 参数为 60 秒,最后设置 SynchronousQueue
    任务队列。这样就可以适应任意数量的并发任务。线程池为每个任务都创建了 1 个线程,当然这是在没有线程空闲的情况下才会创建新的线程。若一个线程中的任务已经做完了,那么这个线程可以为未被执行的任务提供执行。

    输出结果如下:

    执行线程:pool-3-thread-1
    第 1 次计算,结果:102334155
    执行线程:pool-3-thread-2
    第 2 次计算,结果:102334155
    执行线程:pool-3-thread-2
    第 3 次计算,结果:102334155
    执行线程:pool-3-thread-2
    第 4 次计算,结果:102334155
    执行线程:pool-3-thread-2
    第 5 次计算,结果:102334155
    执行线程:pool-3-thread-2
    第 6 次计算,结果:102334155
    执行线程:pool-3-thread-2
    第 7 次计算,结果:102334155
    执行线程:pool-3-thread-2
    第 8 次计算,结果:102334155
    执行线程:pool-3-thread-2
    第 9 次计算,结果:102334155
    执行线程:pool-3-thread-2
    第 10 次计算,结果:102334155
    

    2. 定时执行一些任务 — ScheduledThreadPoolExecutor

    当我们需要定时执行一些任务,可以通过 ScheduledThreadPoolExecutor 来实现。通过 Executors 的 newScheduledThreadPool 函数可以很方便地创建定时执行任务的线程池。

    下面是一个例子:

    public class ScheduledThreadPoolDemo {
    
        public static void scheduledThreadPool() {
            
            // 创建定时执行的线程池
            ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
            
            // 参数 2 为第一次执行任务延迟的时间,
            // 意思就是第一次调度开始时间点=当前时间 + initialDelay 
            // 参数 3 为执行周期
            scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Thread: " + Thread.currentThread().getName() + ",定时计算1:");
                    System.out.println("结果:" + fibc(30));
                }
            }, 1, 2, TimeUnit.SECONDS);
    
            scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Thread: " + Thread.currentThread().getName() + ",定时计算2:");
                    System.out.println("结果:" + fibc(40));
                }
            }, 1, 2, TimeUnit.SECONDS);
        }
    
    
        private static int fibc(int n) {
            if (n == 0) {
                return 0;
            }
    
            if (n == 1) {
                return 1;
            }
    
            return fibc(n - 1) + fibc(n - 2);
        }
    }
    
    public class Main {
    
        public static void main(String[] args) {
            // write your code here
            ScheduledThreadPoolDemo.scheduledThreadPool();
    
    }
    

    该线程池有 3 个线程,我们指定了 2 个定时任务,因此,该线程池有两个线程来定时完成任务。

    scheduleAtFixedRate() 函数的实现:

        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit);
    

    scheduleAtFixedRate() 函数就是设置定时任务的方法,参数 1 是要执行的任务,参数 2 是第一次运行任务时延迟时间(第一次调度开始时间点 = 当前时间 + initialDelay ),参数 3 是定时任务的周期(两次任务调度的间隔时间),参数 4 是时间单元,这里设置为秒。

    部分输出结果:

    Thread: pool-2-thread-1,定时计算1:
    Thread: pool-2-thread-2,定时计算2:
    结果:832040
    结果:102334155
    
    Thread: pool-2-thread-1,定时计算1:
    Thread: pool-2-thread-3,定时计算2:
    结果:832040
    结果:102334155
    

    相关文章

      网友评论

        本文标题:Android 多线程探索(三)— 线程池

        本文链接:https://www.haomeiwen.com/subject/mccdxftx.html