美文网首页
Java线程池使用与分析

Java线程池使用与分析

作者: Dotry | 来源:发表于2019-12-25 18:26 被阅读0次

    1.前言

    作为一名Android开发,工作中少不了使用线程。最近因为在优化广告请求结构,重新查看一遍线程相关的知识,在此做一个总结。

    2.线程使用

    在Java中开启子线程,执行异步任务最简单的做法是:

    new Thread(new Runnable() {
        @Override
        public void run() {
            // TODO 相关业务处理
        }
    }).start();
    

    每当需要一个线程的时候就new 一个线程出来操作相关业务,但这样做的话会存在一些问题:

    1. 每次new Thread新建对象性能差。
    2. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
    3. 缺乏更多功能,如定时执行、定期执行、线程中断。

    作为一个有想法的开发,是应该拒绝该方式使用线程的,此时线程池的概念就应运而来。

    3. 线程池 Executors

    线程池的定义:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。
    在Java中系统会提供一下自带的几种线程池:

    • newFixedThreadPool:一个可以无限扩大的线程池
    • newCachedThreadPool:固定大小的线程池;corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;keepAliveTime为0,
    • newScheduledThreadPool:一个主要接受定时任务的线程池,有两种提交任务的方式:scheduledAtFixedRate和scheduledWithFixedDelaySchduled
    • newSingleThreadExecutor:一个只会创建一条工作线程处理任务线程池

    查看Executors 源码可以看到有如下几种方法:

    Executors.png
    分别对应上述几种线程池。
    关于线程池的设计思想其根本是生产消费者,线程池会维护两个队列
    1. 各个线程的集合
    2. 各个任务的集合

    前者负责生产维护线程,后者则消费线程。
    通过源码跟踪,上述几种创建线程最终都会调用ThreadPoolExecutor的构造方法

     public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
                                 }
    
    • int corePoolSize:核心线程池数量,线程池创建后保持的数量
    • int maximumPoolSize:线程池最大线程数量,大于这个数则只需Rejected策略
    • long keepAliveTime:空闲线程存活的最大时间,超过这个时间则回收
    • TimeUnit unit: 空闲线程存活的最大时间单位设置
    • BlockingQueue<Runnable> workQueue:core线程数满后存放任务需要的阻塞队列
    • ThreadFactory threadFactory: 线程创建工厂,可以指定自定义的线程池名称等
    • RejectedExecutionHandler handler:自定义拒绝策略,如果任务数量大于maximumPoolSize则执行此策略。
      corePoolSize 和maximumPoolSize 以及workQueue 的长度会有以下几种关系:
    • 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
    • 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
    • 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
    • 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池执行拒绝策略

    3.1 newFixedThreadPool

    newFixedThreadPool.png
    • 无界任务队列,有界的线程队列
      查看源码可以看到该线程的策略
    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    只需要设置参数nThreads ,参数workQueue 是一个空间无届的队列,基本可以认为keepAliveTimemaximumPoolSize 之类的参数无效,因为此时核心线程是满负荷运行,没有线程会处于空闲状态。使用方式如下:

    public class NewFixedThreadPoolExample {
        public static void main(String[] args) {
            int coreSize = Runtime.getRuntime().availableProcessors();
            System.out.println("coreSize is :"+coreSize);
            ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
            for (int i =0;i<10;i++){
                int  index =i;
                executorService.execute(new Runnable() {
                  @Override
                  public void run() {
                      System.out.println("currentTime is :"+System.currentTimeMillis()+"---index is :"+index);
                      try {
                          Thread.sleep(2000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }
              });
            }
    
        }
    
    }
    

    3.2 newCachedThreadPool

    newCacheThreadPool.png

    查看源码可以看到该线程的策略

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
    

    corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大,采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来, 就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

    • 无界任务队列,无界线程队列。
      使用方式如下:
    public class NewCachedThreadPoolExample {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i =0;i<10;i++){
                int index = i;
                try {
                    Thread.sleep(1000*index);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("currentTime is :"+System.currentTimeMillis()+"---index is :"+index);
                    }
                });
            }
        }
    }
    

    3.3 newScheduledThreadPool

    newScheduledThreadPool.png

    其源码策略如下:

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    
    • 无界任务队列
      该类型的线程池存在两种调用:
      scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit);
      scheduleWithFixedDelay(Runnable command,long initialDelay, long delay, TimeUnit unit);
      调用如下:
    public class NewScheduledThreadPoolExample {
        public static void main(String[] args) {
            int coreSize = Runtime.getRuntime().availableProcessors();
            ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
            for (int i= 0 ;i<10;i++){
                int index = i;
                executorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("currentTime is :"+System.currentTimeMillis()+"---index is :"+index);
                     }
                },1,3, TimeUnit.SECONDS);
    //            executorService.scheduleWithFixedDelay()
            }
        }
    }
    

    3.4 newSingleThreadExecutor

    newSingleThreadExecutor.png
    • 无界任务队列,有界线程队列
    • corePoolSize :1 线程队列只有一个,和单例模式相差无几。它只会创建一条工作线程处理任务;采用的阻塞队列为LinkedBlockingQueue。
      使用方式如下:
    public class NewSingleThreadExecutorExample {
        public static void main(String[] args) {
            ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
            for (int i = 0; i < 10; i++) {
                final int index = i;
                singleThreadExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            System.out.println("currentTime is :" + System.currentTimeMillis() + "---index is :" + index);
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    }
    

    以上是系统自带的线程池创建方式,其根本设计思想生产者--消费者,其中任务队列和线程队列的长度如果采用无界的话,就会存在OOM的风险。阿里巴巴Java开发手册中强制不允许使用Executors创建线程池。

    Executors-OOM.png
    鉴于上述原因,我们在某些场景下需要自定义线程池。

    4.自定义线程池

    1. ThreadPoolExecutor 自定义线程池
    public class CustomThreadPool {
        //以下相关参数可以考虑根据不同设备不同环境计算得到
        private static final int CORE_POOL_SIZE = 1;
        private static final int MAX_POOL_SIZE = 1;
        private static final int KEEP_ALIVE_TIME = 30;
        private static final int CAPACITY = 2;
        public static void main(String[] args) {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,
                    TimeUnit.SECONDS,
                    new LinkedBlockingDeque<Runnable>(CAPACITY), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    //线程创建的次数同核心线程数,最大线程数,任务数有关系
                    Thread thread = new Thread(r);
                    System.out.println("newThread==="+thread.getName());
                    return thread;
                }
    
            }, new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    //线程拒绝之后的策略,可以延时将该任务再次加入到队列中
                    System.out.println("rejectedExecution===="+((MyRunnable)r).Name);
    
                }
            });
            MyRunnable t1 = new MyRunnable("T1");
            MyRunnable t2 = new MyRunnable("T2");
            MyRunnable t3 = new MyRunnable("T3");
            MyRunnable t4 = new MyRunnable("T4");
            MyRunnable t5 = new MyRunnable("T5");
            MyRunnable t6 = new MyRunnable("T6");
            MyRunnable t7 = new MyRunnable("T7");
            // 将线程放入池中进行执行
            threadPoolExecutor.execute(t1);
            threadPoolExecutor.execute(t2);
            threadPoolExecutor.execute(t3);
            threadPoolExecutor.execute(t4);
            threadPoolExecutor.execute(t5);
            threadPoolExecutor.execute(t6);
            threadPoolExecutor.execute(t7);
        }
        static class  MyRunnable implements Runnable {
            public MyRunnable(String name) {
                Name = name;
            }
    
            public String Name ;
            @Override
            public void run() {
                System.out.println(Name + "正在执行。。。");
            }
        }
    }
    
    
    custom-thread-pool.png
    拒绝了3个任务,因为run方法耗时不同,核心线程执行效率不同所致,运行结果回不一致。

    4.2 自定义ThreadPoolExecutor

    1.自定义ThreadPoolExecutor ,可以自定义核心线程数等相关参数。

    public class MyExecutorPools extends ThreadPoolExecutor {
        public MyExecutorPools(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                               BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        /**
         * 线程执行后进行调用
         */
        protected void afterExecute(Runnable r, Throwable t) {
            MyRunnable myr = (MyRunnable) r;
            System.out.println(myr.getMyname() + "..执行完毕");
    
            }
    
        /**
         * 重写执行方法,线程池执行前进行调用
         */
        protected void beforeExecute(Thread t, Runnable r) {
            MyRunnable myr = (MyRunnable) r;
            System.out.println(myr.getMyname() + "..准备执行");
        }
    }
    
    1. ThreadFactory
    public class MyThreadFactory implements ThreadFactory {
        @Override
        public Thread newThread(Runnable r) {
            Thread td = new Thread(r);
            td.setName("myfactory-" + td.getId());
            td.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                @Override
                public void uncaughtException(Thread t, Throwable e) {
                    //自定义异常捕获机制..
                    System.out.println("thread execute error");
                }
            });
            return td;
        }
    }
    
    1. Runnable
    public class MyRunnable implements Runnable {
    
        public MyRunnable(String name) {
            this.myname = name;
        }
    
        private String myname;
    
        public String getMyname() {
            return myname;
        }
    
        public void setMyname(String myname) {
            this.myname = myname;
        }
    
        @Override
        public void run() {
            System.out.println("myname is :" + myname);
            try {
                Thread.sleep(5 * 100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    1. 拒绝策略 实现
    /**
     * 拒绝策略
     */
    public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            MyRunnable my = (MyRunnable)r;
            System.out.println("拒绝执行...:"+my.getMyname());
        }
    }
    
    1. 测试
    public class MyExecuterPoolsTest {
        public static void main(String[] args) {
            MyExecutorPools ex = new MyExecutorPools(3, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
                    new MyThreadFactory(), new RejectedExecutionHandlerImpl());
            for (int i = 0; i < 20; i++) {
                ex.execute(new MyRunnable("n" + i));
            }
        }
    }
    

    以上是两种自定义线程池的方式,根据不同场景选择使用,但是究其根本还是维护两个队列

    • 线程队列
    • 任务对列

    5 拒绝策略

    上述是自定义一个拒绝策略,当该任务被拒绝之后,多久再次加入到任务队列中。其中Java 自带几种策略:

    • AbortPolicy 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。
    • CallerRunsPolicy 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。
    • DiscardOldestPolicy 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
    • DiscardPolicy 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。
      关于不同策略的使用可以自行搜索。
      上面是Java相关的线程池的使用和分析,后面结合项目中广告任务相关业务,采用自定义线程池的方式进行改造。

    相关文章

      网友评论

          本文标题:Java线程池使用与分析

          本文链接:https://www.haomeiwen.com/subject/odscoctx.html