线程池

作者: 主音King | 来源:发表于2020-08-10 09:59 被阅读0次

    使用线程池原因

    Android开发,用传统new Thread创建线程,造成一些问题:

    • 任务众多时,每个任务创建一个线程,任务结束后销毁,造成线程频繁创建和销毁
    • 多个线程频繁创建占用大量资源,资源竞争容易出错,缺乏统一管理,造成界面卡顿。
    • 多线程频繁销毁,会频繁调用GC机制,会使性能降低,非常耗时

    线程池好处:
    1、多线程统一管理,避免资源竞争中出现问题
    2、对线程复用,线程执行完后不会立即销毁,等待另外任务,不会频繁创建、销毁线程和调用GC
    3、提供了一套完整的ExecutorService线程池创建api,创建功能不一的线程池,使用方便。

    几种常见的线程池

    ThreadPoolExecutor创建基本线程池

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory)
    

    corePoolSize:线程池核心线程数量
    注意:线程池存在核心线程和非核心线程;核心线程一旦创建会一直执行任务或等待任务到来;非核心线程只会任务队列塞满任务时去执行多出的任务,非核心线程一段时间后会被回收,这个时间作为参数可调配。
    maximumPoolSize:最大线程数量。
    keepAliveTime:非核心线程要等待下一个任务到来的时间;任务很多、执行时间很短可调大有助提高线程利用率。
    unit:时间单位
    workQueue:任务队列
    threadFactory:线程工厂。设置线程名字等。

        fun test1() {
            // 1、核心线程数量;最大线程数;线程空闲时等待下一个任务到来时间;时间单位;任务队列长度
            // 2、任务队列为100,不会开启额外的 5-3=2 个非核心线程。如果队列设置25,则前三个任务核心线程执行,剩下的30-3=27进入队列会满
            //    此时会开启2个非核心线程来执行剩下的2个任务
            // 3、疑问:为何每个for循环里都有一个sleep(200) ,为何每2s打印三个任务;因为开始时候,只声明runnable对象,并重写run方法,并没有运行,
            //    而后execute(runnable)才会sleep,又因为核心线程为3,所以会开启三个核心线程,各执行run方法。
            // 4、如果队列给25,for给31个,则会报任务拒绝异常
            val threadPoolExecutor = ThreadPoolExecutor(
                3, 5, 1,
                TimeUnit.SECONDS, LinkedBlockingQueue<Runnable>(25)
            )
            // 30个runnable
            for (i in 0..29) {
                Log.d(TAG, "index:$i")
                val runnable = Runnable {
                    run {
                        try {
                            Thread.sleep(2000)
                            Log.d(TAG, "index:$i ")
                            Log.d(TAG, "当前线程:${Thread.currentThread().name}")
                        } catch (e: Exception) {
                            Log.e(TAG, "e:$e")
                        }
                    }
                }
                // 1、execute 一个线程之后,如果未达到核心线程数,立马启用一个核心线程执行。
                // 2、execute 一个线程之后,如果已达到核心线程数,且workQueue未满,则将新线程放入workQueue中等待执行
                // 3、execute 一个线程之后,如果已达到超过非核心线程数,则拒绝执行该任务,采取饱和策略,抛出RejectedExecutionException异常
                try {
                    // 如果任务溢出,任务拒绝异常
                    threadPoolExecutor.execute(runnable)
                }catch (e:Exception){
                    Log.e(TAG, "开启线程异常-e:$e")
                }
            }
        }
    

    FixedThreadPool(可重用固定线程数)

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    特点:参数核心线程数。无非核心线程数,阻塞队列无界

           val fixedThreadPool = Executors.newFixedThreadPool(5)
            for (i in 0..29){
                val runnable = Runnable {
                    run {
                        try {
                            Thread.sleep(2000)
                            Log.d(TAG, "index:$i")
                            Log.d(TAG, "当前线程:${Thread.currentThread().name}")
                        }catch (e:Exception){
                            Log.d(TAG, "e:$e")
                        }
                    }
                }
                fixedThreadPool.execute(runnable)
            }
    

    每2s打印5次任务

    CachedThreadPool(按需创建)

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    特点:没有核心线程。只有非核心线程,并且每个非核心线程空闲等待为60s采用SynchronousQueue队列。

        fun test1() {
            val cachedThreadPool = Executors.newCachedThreadPool();
            for (i in 0..29) {
                val runnable = Runnable {
                    run {
                        try {
                            Thread.sleep(2000)
                            Log.d(TAG, "index:$i")
                            Log.d(TAG, "name:${Thread.currentThread().name}")
                        } catch (e: Exception) {
                            Log.d(TAG, "e:$e")
                        }
                    }
                }
                cachedThreadPool.execute(runnable)
            }
        }
    

    过2s后直接打印30个任务
    分析:

    • 因为没有核心线程,全为非核心线程,SynchronousQueue是不存储元素的,每次插入操作伴随一个移除操作,一个移除操作伴随一个插入操作。
    • 当一个任务执行时,先用SynchronousQueue的offer提交任务,如果线程池中又空闲线程,则调用SynchronousQueue的poll方法来移除任务并交给线程处理;如果没有线程空闲,则开启新非核心线程处理任务
    • 由于maximumPoolSize是无界的,所以如果线程处理任务速度小于提交任务速度,则会不断创建新线程,注意过度创建,应调整双方速度,不然创建过多影响性能。
    • CachedThreadPool使用于大量需要立即执行的耗时任务的情况。

    SingleThreadPool(单个核心的fixed)

        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
        fun test1() {
            val singlePoolExecutor = Executors.newSingleThreadExecutor();
            for (i in 0..29){
                val runnable = Runnable {
                    run {
                        try {
                            Thread.sleep(2000)
                            Log.d(TAG, "index:$i")
                            Log.d(TAG, "name:${Thread.currentThread().name}")
                        }catch (e:Exception){
                            Log.d(TAG, "e:$e")
                        }
                    }
                }
                singlePoolExecutor.execute(runnable)
            }
        }
    

    每2s打印一个任务;只有一个核心线程,无非核心线程,当被占用时,其的任务需要进入队列等待。

    ScheduledThreadPool(定时延时执行)

        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
        fun test1() {
           val scheduleExecutorService = Executors.newScheduledThreadPool(3)
            val runnable = Runnable {
                run {
                    Log.d(TAG, "延迟执行-name:${Thread.currentThread().name}")
                }
            }
            scheduleExecutorService.schedule(runnable,5,TimeUnit.SECONDS)
        }
    

    自定义PriorityThreadPool(队列中优先级比较的线程池)

    自定义Runnable, 继承Comparable接口

    public abstract class PriorityRunnable implements Runnable, Comparable<PriorityRunnable> {
        private int priority;
    
        public PriorityRunnable(int priority) {
            if (priority < 0) {
                throw new IllegalArgumentException();
            }
            this.priority = priority;
        }
    
        public int getPriority() {
            return priority;
        }
    
        @Override
        public int compareTo(PriorityRunnable o) {
            int me = this.priority;
            int anOtherPri = o.getPriority();
            return me == anOtherPri ? 0 : me < anOtherPri ? 1 : -1;
        }
    
        @Override
        public void run() {
            doSomeThing();
        }
        protected abstract void doSomeThing();
    }
    

    利用抽象类Comparable接口重写compareTo方法来比较优先级

        fun test1() {
            val priorityThreadPool = ThreadPoolExecutor(
                3, 3, 0, TimeUnit.SECONDS,
                PriorityBlockingQueue<Runnable>()
            )
            for (i in 0..29) {
                priorityThreadPool.execute(object : PriorityRunnable(i) {
                    override fun doSomeThing() {
                        Log.d(TAG, "优先级-pri:$i 的任务被执行 name:${Thread.currentThread().name}")
                        try {
                            Thread.sleep(2000)
                        } catch (e: Exception) {
                            Log.d(TAG, "e:$e")
                        }
                    }
                })
            }
        }
    

    前三个任务被创建的三个核心线程执行,之后27个任务进入队列,并调用compareTo方法进行排序,从大到小顺序打印。

    Java中的阻塞队列

    生产者--消费者。生产者向队列放元素,消费者取,队列中无元素,消费者取则阻塞,如果队列元素满,则生产者阻塞。
    常见阻塞队列7种:

    ArrayBlockingQueue:数组结果组成有界阻塞队列
    LinkedBlockingQueue:链表结果组成有界阻塞队列
    PriorityBlockingQueue:优先级排序无界队列
    DelayQueue:优先级队列实现无界队列
    SynchronousQueue:不存储元素阻塞队列
    LinkedTransferQueue:链表结构无界阻塞队列
    LinkedBlockingDeque:链表结构双向阻塞队列

    线程池使用场景

    • newCachedThreadPool:
      底层:核心线程为0,最大线程为Integer.MAX_VALUE,等待60s,SynchronousQueue同步队列。
      当有新任务来,则插入队列中,寻找可用线程来执行,若有则执行,若无则创建来执行;若线程空闲时间超过指定大小,则线程会被销毁。
      适合:执行很多短期异步的小程序或者负载较轻的服务器

    • newFixedThreadPool:
      适合:执行长期任务,性能好

    *newScheduledThreadPool
    周期执行任务场景

    线程池常用方法

    shutDown()// 关闭线程池,不影响已经提交的任务
    shotDownNow()// 关闭线程池,并尝试去终止正在执行的线程
    allowCoreThreadTimeOut(boolean)// 允许核心线程闲置超时时被回收
    submit // 一般用execute提交任务。submit有返回值
    beforeExecute()// 任务执行前执行的方法
    afterExecute()// 任务执行结束后执行的方法
    terminated() // 线程池关闭后执行的方法 
    

    相关文章

      网友评论

          本文标题:线程池

          本文链接:https://www.haomeiwen.com/subject/znezrktx.html