美文网首页
JAVA高并发实战——线程复用:线程池

JAVA高并发实战——线程复用:线程池

作者: XHHP | 来源:发表于2019-08-05 23:28 被阅读0次

    (一)、什么是线程池

    • 在线程池中,总有那么几个活跃的线程。当你需要使用线程时,可以从池子中随便拿一个空间线程,当完成工作时,并不着急关闭线程,而是将这个线程退回到线程池中,方便其他人使用。
    • 创建线程变成了从线程池获得空闲线程,关闭线程变成了向线程池归还线程。


      在这里插入图片描述

    (二)、不要重复发明轮子:JDK对线程池的支持

    • 下面是Executors框架结构图


      在这里插入图片描述
    • ThreadPoolExecutor表示一个线程池。Executors类则扮演着线程工厂的角色,通过Executors可以取得一个拥有特定功能的线程池。从UML图中也可知,ThreadPoolExecutor实现了Executor接口,任何Runnable对象都可以被ThreadPoolExecutor线程池调度。
    • Executor框架提供了各种类型的线程池,主要有以下工厂方法。


      在这里插入图片描述
    • newFixedThreadPool()方法: 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,即立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理任务队列中的任务
    • newSingleThreadExecutor()方法: 该方法返回一个只有一个线程的线程池,若多于一个任务被提交到线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
    • newCachedThreadPool()方法: 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以使用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
    • newSingledThreadScheduledExecutor()方法: 该方法返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService接口之上拓展了在给定时间执行某任务的功能,如在固定的延时之后执行,或者周期性执行某个任务
    • newScheduledThreadPool()方法: 该方法也返回一个ScheduledExecutorService对象,但该线程池可以指定线程数量。

    1、固定大小的线程池

    public class ThreadPoolDemo {
        public static class MyTask implements Runnable {
    
            @Override
            public void run() {
                System.out.println("Thread ID:" + Thread.currentThread().getId());
                try {
                    Thread.sleep(1000);
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            MyTask myTask = new MyTask();
            ExecutorService es = Executors.newFixedThreadPool(5);
            for(int i = 0;i < 10; i++) {
                es.submit(myTask);
            }
        }
    }
    
    

    2、计划任务

    • newScheduledThreadPool()方法返回一个ScheduledExecutorService对象,可以根据时间需要对线程进行调度。方法如下:
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                               long delay, TimeUnit unit);
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit);
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit);
    
    • schedule()方法:在给定时间,对任务进行一次调度
    • sceduleAtFixedRate()方法:创建一个周期性任务。任务开始于给定的初始延时。后续的任务按照给定的周期进行:后续的第一个任务将在initialDelay+period时执行,后续第二个任务将在initialDelay+2*period时执行。
    • scheduledWithFixedDelay:创建并执行一个周期性任务,任务开始于初始延时时间,后续任务将会按照给定的时延进行:即上一个任务的结束时间到下一个任务的开始时间的时间差。
    public class ScheduleThreadServiceDemo {
        public static void main(String[] args) {
            ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
            ses.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                        System.out.println(System.currentTimeMillis()/1000);
                    }catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },0,2, TimeUnit.SECONDS);
        }
    }
    
    

    (三)、刨根究底:核心线程池的内部实现

    • 线程池的实现方法(都只是对ThreadPoolExecutor类的封装):
    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
    • ThreadPoolExecutor最重要的构造函数:
    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
        }
    
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    • newFixedThreadPool()方法使用了LinkedBlokingQueue任务队列的线程池。对于线程数固定不变的线程池而言,coolPoolSize和maximumPoolSize可以相等。但使用的是无界队列存放无法立即完成的任务,当任务提交特别频繁时,队列会迅速耗尽系统资源
    • newSingleThreadExecutor()方法直接返回单线程线程池
    • newCachedThreadPool()方法采用了SynchronousQueue队列,是一种直接提交队列,总会迫使线程增加新的线程执行任务。如果同时有大量任务被提交,系统会开启等量的线程,很快会耗尽系统资源。
    • ==需要根据实际情况选择合适的线程池==


      在这里插入图片描述

    (四)、拒绝策略

    • 当任务数量超过系统实际承载能力时,就要用到拒绝策略了。拒绝策略可以说是系统超负荷运行时的补救策略,通常由于压力太大而引起的,也就是线程池中的线程用完了,无法继续为新任务服务。

    • JDK内置的四种拒绝策略


      在这里插入图片描述
    • AbortPolicy策略:直接抛出异常,组织系统正常工作

    • CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。但是,任务提交线程的性能会极具下降

    • DiscardOldestPolicy策略:丢弃最老的一个请求,也就是即将被执行的任务

    • DiscardPolicy策略:默默丢弃无法处理的任务,不予处理。

    • 以上内置策略均实现了RejectedExecutionHandler接口,我们可以进行自定义。

    public class RejectThreadPoolDemo {
        public static class MyTask implements Runnable{
    
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis()+":Thread ID:" + Thread.currentThread().getId());
                try {
                    Thread.sleep(1000);
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            MyTask task = new MyTask();
            ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS
                    , new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    System.out.println(r.toString() + " is discard");       //重现拒绝策略
                }
            });
    
            for(int i = 0; i < Integer.MAX_VALUE; i++) {
                es.submit(task);
                Thread.sleep(10);
            }
        }
    }
    
    
    • maxPoolSize是5,表示允许最大线程数是5。并且这里使用的是BlocklingQueue有界队列,而且队列的长度时10.因此会有许多线程被丢弃。

    (五)、自定义线程创建:ThreadFactory

    • ThreadFactory是一个接口,它只有一个用来创建线程的方法
     Thread newThread(Runnable r);
    
    public class ThreadFactoryDemo {
        public static class MyTask implements Runnable {
    
            @Override
            public void run() {
                System.out.println("Thread ID:" + Thread.currentThread().getId());
                try {
                    Thread.sleep(1000);
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            MyTask myTask = new MyTask();
            ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS,
                    new SynchronousQueue<>(), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setDaemon(true);
                    System.out.println("create" + t);
                    return t;
                }
            });
    
            for(int i = 0; i < 5; i++) {
                executorService.submit(myTask);
            }
            Thread.sleep(2000);
        }
    }
    
    

    (六)、拓展线程池

    • 如果需要对线程池进行拓展,它提供了beforeExecute()、afterExecute()、terminated()三个接口来对线程池进行拓展。
    public class ExtThreadPool {
        public static class MyTask implements Runnable {
    
            public String name;
    
            public MyTask(String name) {
                this.name = name;
            }
    
            @Override
            public void run() {
                System.out.println("正在执行" + Thread.currentThread().getId() + " Task" + name);
                try {
                    Thread.sleep(100);
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingDeque<>()) {
    
                @Override
                public void execute(Runnable command) {
                    super.execute(command);
                }
    
                @Override
                protected void beforeExecute(Thread t, Runnable r) {
                    System.out.println("准备执行" + ((MyTask)r).name);
                }
    
                @Override
                protected void afterExecute(Runnable r, Throwable t) {
                    System.out.println("执行完成" + ((MyTask)r).name);
                }
    
                @Override
                protected void terminated() {
                    System.out.println("线程池退出");
                }
            };
            for (int i = 0; i < 5 ; i++) {
                MyTask myTask = new MyTask("Task"+i);
                executorService.execute(myTask);
                Thread.sleep(1000);
            }
            executorService.shutdown();
        }
    
    }
    
    

    (七)、分而治之:Fork/Join框架

    • 具体实现原理如图:


      在这里插入图片描述
    • 由于线程池的优化,提交的任务和线程数量并不是一对一的关系。在绝大多数情况下,每个线程必然需要拥有一个任务队列。线程A已经把自己的任务执行完了,而线程B还有一堆任务等着处理。这时线程A就会“帮助”线程B,从线程B的任务队列中拿一个任务过来处理。==需要注意的是,当一个线程试图“帮助另一个线程时,总是从任务队列的底部开始获取数据,线程执行自己的任务则是从顶部开始获取数据。”==


      在这里插入图片描述
    • ForkJoinPool线程池的一个重要的接口

    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
    
    • 你可以向ForkJoinPool线程池提交一个ForkJoinTask任务。所谓ForkJoinTask任务支持fork()方法分解及join()方法等待任务。
    • ForkJoinTask任务有两个重要的子类,RecursiveAction和RecursiveTask。它们分别表示没有返回值的任务和携带返回值的任务。


      在这里插入图片描述
    public class CountTask extends RecursiveTask<Long> {
    
        private static final int THRESHOLD = 10000;
        private long start;
        private long end;
    
        public CountTask(long start, long end) {
            this.start = start;
            this.end = end;
        }
    
    
        @Override
        protected Long compute() {
            long sum = 0;
            boolean canCompute = (end - start) < THRESHOLD;
            if (canCompute) {              //如果数目比较小,就直接运算
                for (long i = start; i <= end; i++) {
                    sum += i;
                }
            } else {
                long step = (start + end) / 100;        //将任务划分成100个小任务
                ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
                long pos = start;               //初始化pos
                for (int i = 0; i < 100; i++) {
                    long lastOne = pos + step;  //lastOne标志每次末端的值
                    if (lastOne > end) {         //如果lastOne大于末端,就以end作为末端
                        lastOne = end;
                    }
                    CountTask subTask = new CountTask(pos, lastOne);    //创建子任务
                    pos += step + 1;        //pos指向下一次的初始位置
                    subTasks.add(subTask);  //添加子任务
                    subTask.fork();         //执行子任务
                }
                for (CountTask t : subTasks) {
                    sum += t.join();        //合并子任务
                }
            }
            return sum;
        }
    
        public static void main(String[] args) {
            ForkJoinPool forkJoinPool = new ForkJoinPool();     //创建ForkJoin线程池
            CountTask task = new CountTask(0, 200000L);         //创建任务
            ForkJoinTask<Long> result = forkJoinPool.submit(task);  //将任务提交到线程池
            try {
                long res = result.get();                //主线程等待返回结果
                System.out.println("sum = " + res);      //输出结果
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    (八)、Guava中对线程池的拓展

    1. 特殊的DirectExecutor线程池
    • DirectExecutor线程池很简单,它并没有真的创建或者使用额外线程, 它总是将任务在当前线程中直接执行
    • 有时候异步执行是没必要的,便可使用这个线程池
    public static void main(String[] args) {
            Executor  executor = MoreExecutors.directExecutor();
            executor.execute(() ->{
                System.out.println("I'am runnint in" + Thread.currentThread().getName());
            });
        }
    
    1. Daemon线程池
    • 在很多场合,我们并不希望后台线程池组织程序的退出,当系统执行完成后,即便有线程池存在,依然希望进程结束执行
    • 这里就可以使用MoreExecutors.getExitingExecutorService()方法
    public static void main(String[] args) {
            ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(2);
            MoreExecutors.getExitExecutorService(executor);
            executor.execute(() ->{
                System.out.println(Thread.currentThread().getName());
            });
        }
    

    相关文章

      网友评论

          本文标题:JAVA高并发实战——线程复用:线程池

          本文链接:https://www.haomeiwen.com/subject/ldhbdctx.html