美文网首页
Java高并发系列——检视阅读(五)

Java高并发系列——检视阅读(五)

作者: 卡斯特梅的雨伞 | 来源:发表于2020-09-17 22:53 被阅读0次

    Java高并发系列——线程池

    JAVA线程池

    线程池实现原理

    类似于一个工厂的运作

    当向线程池提交一个任务之后,线程池的处理流程如下:

    1. 判断是否达到核心线程数,若未达到,则直接创建新的线程处理当前传入的任务,否则进入下个流程
    2. 线程池中的工作队列是否已满,若未满,则将任务丢入工作队列中先存着等待处理,否则进入下个流程
    3. 是否达到最大线程数,若未达到,则创建新的线程处理当前传入的任务,否则交给线程池中的饱和策略进行处理。
    image.png

    java中的线程池

    jdk中提供了线程池的具体实现,实现类是:java.util.concurrent.ThreadPoolExecutor,主要构造方法:

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
    

    corePoolSize核心线程大小,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使有其他空闲线程可以处理任务也会创新线程,等到工作的线程数大于核心线程数时就不会在创建了。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前把核心线程都创造好,并启动。(prestartCoreThread:启动一个核心线程或 prestartAllCoreThreads:启动全部核心线程 )

    maximumPoolSize线程池允许创建的最大线程数。如果队列满了,并且以创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果我们使用了无界队列(或者大小是Integer.MAX_VALUE,可能还没达到就OOM了),那么所有的任务会加入队列,这个参数就没有什么效果了。

    keepAliveTime线程池的工作线程空闲后,保持存活的时间。如果没有任务处理了,有些线程会空闲,空闲的时间超过了这个值,会被回收掉。如果任务很多,并且每个任务的执行时间比较短,避免线程重复创建和回收,可以调大这个时间,提高线程的利用率

    unitkeepAliveTIme的时间单位,可以选择的单位有天、小时、分钟、毫秒、微妙、千分之一毫秒和纳秒。类型是一个枚举java.util.concurrent.TimeUnit,这个枚举也经常使用,有兴趣的可以看一下其源码

    workQueue工作队列,用于缓存待处理任务的阻塞队列,常见的有4种(ArrayBlockingQueueLinkedBlockingQueueSynchronousQueuePriorityBlockingQueue

    threadFactory线程池中创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字

    handler饱和策略,当线程池无法处理新来的任务了,那么需要提供一种策略处理提交的新任务,默认有4种策略(AbortPolicyCallerRunsPolicyDiscardOldestPolicyDiscardPolicy

    调用线程池的execute方法处理任务,执行execute方法的过程:

    1. 判断线程池中运行的线程数是否小于corepoolsize,是:则创建新的线程来处理任务,否:执行下一步
    2. 试图将任务添加到workQueue指定的队列中,如果无法添加到队列,进入下一步
    3. 判断线程池中运行的线程数是否小于maximumPoolSize,是:则新增线程处理当前传入的任务,否:将任务传递给handler对象rejectedExecution方法处理

    线程池的使用步骤:

    1. 调用构造方法创建线程池
    2. 调用线程池的方法处理任务
    3. 关闭线程池

    线程池中常见5种工作队列

    任务太多的时候,工作队列用于暂时缓存待处理的任务,jdk中常见的4种阻塞队列:

    ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按照先进先出原则对元素进行排序

    LinkedBlockingQueue:是一个基于链表结构的阻塞队列,此队列按照先进先出排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool使用了这个队列。

    SynchronousQueue一个不存储元素的阻塞队列,每个插入操作必须等到另外一个线程调用移除操作,否则插入操作一直处理阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用这个队列。

    PriorityBlockingQueue优先级队列,进入队列的元素按照优先级会进行排序。

    SynchronousQueue队列的线程池

    使用Executors.newCachedThreadPool()创建线程池,看一下的源码:

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    newCachedThreadPool()使用了SynchronousQueue同步队列,这种队列比较特殊,放入元素必须要有另外一个线程去获取这个元素,否则放入元素会失败或者一直阻塞在那里直到有线程取走,示例中任务处理休眠了指定的时间,导致已创建的工作线程都忙于处理任务,所以新来任务之后,将任务丢入同步队列会失败,丢入队列失败之后,会尝试新建线程处理任务。使用上面的方式创建线程池需要注意,如果需要处理的任务比较耗时,会导致新来的任务都会创建新的线程进行处理,可能会导致创建非常多的线程,最终耗尽系统资源,触发OOM

    //SynchronousQueue队列默认是false,采用先进后出的栈处理,也可以是公平队列先进先出。
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
    
    PriorityBlockingQueue优先级队列的线程池

    输出中,除了第一个任务,其他任务按照优先级高低按顺序处理。原因在于:创建线程池的时候使用了优先级队列,进入队列中的任务会进行排序,任务的先后顺序由Task中的i变量决定。向PriorityBlockingQueue加入元素的时候,内部会调用代码中Task的compareTo方法决定元素的先后顺序。

    示例:

    public class ThreadPoolExecutorPriorityTest {
        /**
         * 优先级队列执行的任务要实现Comparable比较
         */
        static class Task implements Runnable, Comparable<Task> {
            private int i;
            private String name;
            public Task(int i, String name) {
                this.i = i;
                this.name = name;
            }
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "处理" + this.name);
            }
            @Override
            public int compareTo(Task o) {
                return Integer.compare(o.i, this.i);
            }
        }
    
        //自定义线程工厂,优先级队列
        private static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 60,
                TimeUnit.SECONDS, new PriorityBlockingQueue<>(), new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) {
            for (int i = 1; i <= 10; i++) {
                int j = i;
                String taskName = "task" + j;
                executor.execute(new Task(j,taskName));
            }
    
            for (int i = 90; i <= 100; i++) {
                int j = i;
                String taskName = "task" + j;
                executor.execute(new Task(j,taskName));
            }
            executor.shutdown();
        }
    }
    

    输出:

    From DemoThreadFactory's 订单创建组-Worker-1处理task1
    From DemoThreadFactory's 订单创建组-Worker-1处理task100
    From DemoThreadFactory's 订单创建组-Worker-1处理task99
    From DemoThreadFactory's 订单创建组-Worker-1处理task98
    From DemoThreadFactory's 订单创建组-Worker-1处理task97
    From DemoThreadFactory's 订单创建组-Worker-1处理task96
    From DemoThreadFactory's 订单创建组-Worker-1处理task95
    From DemoThreadFactory's 订单创建组-Worker-1处理task94
    From DemoThreadFactory's 订单创建组-Worker-1处理task93
    From DemoThreadFactory's 订单创建组-Worker-1处理task92
    From DemoThreadFactory's 订单创建组-Worker-1处理task91
    From DemoThreadFactory's 订单创建组-Worker-1处理task90
    From DemoThreadFactory's 订单创建组-Worker-1处理task10
    From DemoThreadFactory's 订单创建组-Worker-1处理task9
    From DemoThreadFactory's 订单创建组-Worker-1处理task8
    From DemoThreadFactory's 订单创建组-Worker-1处理task7
    From DemoThreadFactory's 订单创建组-Worker-1处理task6
    From DemoThreadFactory's 订单创建组-Worker-1处理task5
    From DemoThreadFactory's 订单创建组-Worker-1处理task4
    From DemoThreadFactory's 订单创建组-Worker-1处理task3
    From DemoThreadFactory's 订单创建组-Worker-1处理task2
    

    自定义创建线程的工厂

    给线程池中线程起一个有意义的名字,在系统出现问题的时候,通过线程堆栈信息可以更容易发现系统中问题所在。通过jstack查看线程的堆栈信息,也可以看到我们自定义的名称 。

    自定义创建工厂需要实现java.util.concurrent.ThreadFactory接口中的Thread newThread(Runnable r)方法,参数为传入的任务,需要返回一个工作线程。

    示例:

    public class ThreadPoolExecutorTest {
    
        //默认线程创建
        /*  private static ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60,
                  TimeUnit.SECONDS, new LinkedBlockingQueue<>(15), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());*/
    
        //自定义线程工厂1
    /*    private static final AtomicInteger nextId = new AtomicInteger(1);
        private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>(12), (r -> {
            Thread t = new Thread(r);
            t.setName("示范线程" + nextId.getAndIncrement());
            return t;
        }), new ThreadPoolExecutor.AbortPolicy());*/
    
        //自定义线程工厂2 ,推荐
        private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>(15), new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) {
            //提前启动所有核心线程
            executor.prestartAllCoreThreads();
            //提前启动一个核心线程
            executor.prestartCoreThread();
            for (int i = 1; i <= 20; i++) {
                int j = i;
                String taskName = "task" + j;
                executor.execute(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(j);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "线程执行" + taskName + "完毕!");
                });
            }
            executor.shutdown();
        }
    }
    

    输出:

    From DemoThreadFactory's 订单创建组-Worker-1线程执行task1完毕!
    From DemoThreadFactory's 订单创建组-Worker-3线程执行task2完毕!
    From DemoThreadFactory's 订单创建组-Worker-2线程执行task3完毕!
    From DemoThreadFactory's 订单创建组-Worker-4线程执行task4完毕!
    From DemoThreadFactory's 订单创建组-Worker-5线程执行task5完毕!
    From DemoThreadFactory's 订单创建组-Worker-1线程执行task6完毕!
    From DemoThreadFactory's 订单创建组-Worker-3线程执行task7完毕!
    From DemoThreadFactory's 订单创建组-Worker-2线程执行task8完毕!
    From DemoThreadFactory's 订单创建组-Worker-4线程执行task9完毕!
    From DemoThreadFactory's 订单创建组-Worker-5线程执行task10完毕!
    From DemoThreadFactory's 订单创建组-Worker-6线程执行task17完毕!
    From DemoThreadFactory's 订单创建组-Worker-1线程执行task11完毕!
    From DemoThreadFactory's 订单创建组-Worker-7线程执行task20完毕!
    From DemoThreadFactory's 订单创建组-Worker-3线程执行task12完毕!
    From DemoThreadFactory's 订单创建组-Worker-2线程执行task13完毕!
    From DemoThreadFactory's 订单创建组-Worker-4线程执行task14完毕!
    From DemoThreadFactory's 订单创建组-Worker-5线程执行task15完毕!
    

    四种常见饱和策略

    当线程池中队列已满,并且线程池已达到最大线程数,线程池会将任务传递给饱和策略进行处理。这些策略都实现了RejectedExecutionHandler接口。接口中有个方法:

    void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
    

    参数说明:

    r:需要执行的任务

    executor:当前线程池对象

    JDK中提供了4种常见的饱和策略:

    AbortPolicy:直接抛出异常。

    CallerRunsPolicy:在当前调用者的线程中运行任务,即谁丢来的任务,由他自己去处理。

    DiscardOldestPolicy:丢弃队列中最老的一个任务,即丢弃队列头部的一个任务,然后执行当前传入的任务。

    DiscardPolicy:不处理,直接丢弃掉,方法内部为空。

    解释:

    //自定义线程工厂
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 60,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
            new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.CallerRunsPolicy());
            
    AbortPolicy:直接抛出异常。
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
        " rejected from " +
        e.toString());
    }
    输出:到饱和策略时抛出异常记录,丢弃掉任务11个。
    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.self.current.ThreadPoolExecutorTest$$Lambda$1/1915503092@50134894 rejected from java.util.concurrent.ThreadPoolExecutor@2957fcb0[Running, pool size = 5, active threads = 4, queued tasks = 5, completed tasks = 0]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at com.self.current.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:47)
    From DemoThreadFactory's 订单创建组-Worker-1线程执行task1完毕!
    From DemoThreadFactory's 订单创建组-Worker-1线程执行task2完毕!
    From DemoThreadFactory's 订单创建组-Worker-2线程执行task6完毕!
    From DemoThreadFactory's 订单创建组-Worker-1线程执行task3完毕!
    From DemoThreadFactory's 订单创建组-Worker-3线程执行task7完毕!
    From DemoThreadFactory's 订单创建组-Worker-4线程执行task8完毕!
    From DemoThreadFactory's 订单创建组-Worker-5线程执行task9完毕!
    From DemoThreadFactory's 订单创建组-Worker-2线程执行task4完毕!
    From DemoThreadFactory's 订单创建组-Worker-1线程执行task5完毕!
    
            
    CallerRunsPolicy:在当前调用者的线程中运行任务,即随丢来的任务,由他自己去处理。如main方法调用的线程池,则如果走到饱和策略处理时,由main方法处理这个任务。不会丢弃任何一个任务,但执行会变得很慢。
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }    
    输出:
    From DemoThreadFactory's 订单创建组-Worker-1线程执行task1完毕!
    From DemoThreadFactory's 订单创建组-Worker-1线程执行task2完毕!
    From DemoThreadFactory's 订单创建组-Worker-2线程执行task6完毕!
    From DemoThreadFactory's 订单创建组-Worker-1线程执行task3完毕!
    From DemoThreadFactory's 订单创建组-Worker-3线程执行task8完毕!
    From DemoThreadFactory's 订单创建组-Worker-4线程执行task9完毕!
    From DemoThreadFactory's 订单创建组-Worker-2线程执行task4完毕!
    From DemoThreadFactory's 订单创建组-Worker-5线程执行task10完毕!
    main线程执行task11完毕!
    From DemoThreadFactory's 订单创建组-Worker-1线程执行task5完毕!
    From DemoThreadFactory's 订单创建组-Worker-3线程执行task7完毕!
    From DemoThreadFactory's 订单创建组-Worker-4线程执行task12完毕!
    From DemoThreadFactory's 订单创建组-Worker-2线程执行task13完毕!
    From DemoThreadFactory's 订单创建组-Worker-5线程执行task14完毕!
    From DemoThreadFactory's 订单创建组-Worker-1线程执行task15完毕!
    main线程执行task17完毕!
    From DemoThreadFactory's 订单创建组-Worker-3线程执行task16完毕!
    
    DiscardOldestPolicy:丢弃队列中最老的一个任务,即丢弃队列头部的一个任务,然后执行当前传入的任务。这时候线程池会在执行到饱和策略时丢弃掉头部最老的认为,没有任何记录,任务就丢掉了。
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
        }
    }
    
    输出:20个任务被无声无息地丢掉了11个
    From DemoThreadFactory's 订单创建组-Worker-2线程执行task6完毕!
    From DemoThreadFactory's 订单创建组-Worker-3线程执行task7完毕!
    From DemoThreadFactory's 订单创建组-Worker-4线程执行task8完毕!
    From DemoThreadFactory's 订单创建组-Worker-5线程执行task9完毕!
    From DemoThreadFactory's 订单创建组-Worker-1线程执行task16完毕!
    From DemoThreadFactory's 订单创建组-Worker-2线程执行task17完毕!
    From DemoThreadFactory's 订单创建组-Worker-3线程执行task18完毕!
    From DemoThreadFactory's 订单创建组-Worker-4线程执行task19完毕!
    From DemoThreadFactory's 订单创建组-Worker-5线程执行task20完毕!
    
    DiscardPolicy:不处理,直接丢弃掉,方法内部为空。没处理Runnable r就表示丢弃了。
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
    输出:20个任务被无声无息地丢掉了10个
    From DemoThreadFactory's 订单创建组-Worker-1线程执行task1完毕!
    From DemoThreadFactory's 订单创建组-Worker-1线程执行task2完毕!
    From DemoThreadFactory's 订单创建组-Worker-1线程执行task3完毕!
    From DemoThreadFactory's 订单创建组-Worker-2线程执行task7完毕!
    From DemoThreadFactory's 订单创建组-Worker-3线程执行task8完毕!
    From DemoThreadFactory's 订单创建组-Worker-4线程执行task9完毕!
    From DemoThreadFactory's 订单创建组-Worker-5线程执行task10完毕!
    From DemoThreadFactory's 订单创建组-Worker-1线程执行task4完毕!
    From DemoThreadFactory's 订单创建组-Worker-2线程执行task5完毕!
    From DemoThreadFactory's 订单创建组-Worker-3线程执行task6完毕!
    

    自定义饱和策略

    需要实现RejectedExecutionHandler接口。任务无法处理的时候,我们想记录一下日志,我们需要自定义一个饱和策略。记录了任务的日志,对于无法处理多任务,我们最好能够记录一下,让开发人员能够知道。 任务进入了饱和策略,说明线程池的配置可能不是太合理,或者机器的性能有限,需要做一些优化调整。

    实例:

    public class ThreadPoolExecutorRejectHandlerTest {
        static class Task implements Runnable {
            String name;
    
            public Task(String name) {
                this.name = name;
            }
    
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "处理" + this.name);
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            public String toString() {
                return "Task{" +
                        "name='" + name + '\'' +
                        '}';
            }
        }
        //自定义包含策略:可以直接用函数式方法定义,也可以实现RejectedExecutionHandler自定义
        private static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 60,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
                new DemoThreadFactory("订单创建组"), (r,executor)->{
            //自定义饱和策略
            //记录一下无法处理的任务
            System.out.println("无法处理的任务:" + r.toString());
        });
    
        public static void main(String[] args) {
            //提前启动所有核心线程
            executor.prestartAllCoreThreads();
            //提前启动一个核心线程
            executor.prestartCoreThread();
            for (int i = 1; i <= 20; i++) {
                int j = i;
                String taskName = "task" + j;
                executor.execute(new Task(taskName));
            }
            executor.shutdown();
        }
    }
    

    输出:

    无法处理的任务:Task{name='task10'}
    无法处理的任务:Task{name='task11'}
    无法处理的任务:Task{name='task12'}
    无法处理的任务:Task{name='task13'}
    无法处理的任务:Task{name='task14'}
    无法处理的任务:Task{name='task15'}
    无法处理的任务:Task{name='task16'}
    无法处理的任务:Task{name='task17'}
    无法处理的任务:Task{name='task18'}
    无法处理的任务:Task{name='task19'}
    无法处理的任务:Task{name='task20'}
    From DemoThreadFactory's 订单创建组-Worker-1处理task1
    From DemoThreadFactory's 订单创建组-Worker-2处理task6
    From DemoThreadFactory's 订单创建组-Worker-3处理task7
    From DemoThreadFactory's 订单创建组-Worker-4处理task8
    From DemoThreadFactory's 订单创建组-Worker-5处理task9
    From DemoThreadFactory's 订单创建组-Worker-2处理task2
    From DemoThreadFactory's 订单创建组-Worker-1处理task3
    From DemoThreadFactory's 订单创建组-Worker-4处理task5
    From DemoThreadFactory's 订单创建组-Worker-3处理task4
    

    线程池中的2个关闭方法

    线程池提供了2个关闭方法:shutdownshutdownNow,当调用者两个方法之后,线程池会遍历内部的工作线程,然后调用每个工作线程的interrrupt方法给线程发送中断信号,内部如果无法响应中断信号的可能永远无法终止,所以如果内部有无线循环的,最好在循环内部检测一下线程的中断信号,合理的退出。调用者两个方法中任意一个,线程池的isShutdown方法(是否执行了关闭线程池命令)就会返回true,当所有的任务线程都关闭之后,才表示线程池关闭成功,这时调用isTerminaed方法(是否关闭成功)会返回true。

    调用shutdown方法之后,线程池将不再接受新任务,内部会将所有已提交的任务处理完毕,处理完毕之后,工作线程自动退出。

    而调用shutdownNow方法后,线程池会将还未处理的(在队里等待处理的任务)任务移除,将正在处理中的处理完毕之后,工作线程自动退出。

    至于调用哪个方法来关闭线程,应该由提交到线程池的任务特性决定,多数情况下调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

    扩展线程池

    ThreadPoolExecutor内部提供了几个方法beforeExecuteafterExecuteterminated,可以由开发人员自己去重写实现这些方法。

    看一下线程池内部的源码:

    try {
        beforeExecute(wt, task);//任务执行之前调用的方法
        Throwable thrown = null;
        try {
            task.run();
        } catch (RuntimeException x) {
            thrown = x;
            throw x;
        } catch (Error x) {
            thrown = x;
            throw x;
        } catch (Throwable x) {
            thrown = x;
            throw new Error(x);
        } finally {
            afterExecute(task, thrown);//任务执行完毕之后调用的方法
        }
    } finally {
        task = null;
        w.completedTasks++;
        w.unlock();
    }
    

    beforeExecute:任务执行之前调用的方法,有2个参数,第1个参数是执行任务的线程,第2个参数是任务

    protected void beforeExecute(Thread t, Runnable r) { }
    

    afterExecute:任务执行完成之后调用的方法,2个参数,第1个参数表示任务,第2个参数表示任务执行时的异常信息,如果无异常,第二个参数为null

    protected void afterExecute(Runnable r, Throwable t) { }
    

    terminated:线程池最终关闭之后调用的方法。所有的工作线程都退出了,最终线程池会退出,退出时调用该方法

    实例:

    public class ThreadPoolExecutorExtensionTest {
        static class Task implements Runnable {
            String name;
    
            public Task(String name) {
                this.name = name;
            }
    
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "处理" + this.name);
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            public String toString() {
                return "Task{" +
                        "name='" + name + '\'' +
                        '}';
            }
        }
        //扩展线程池,可以继承也可以直接重写
        private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>(15),
                new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy()){
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println(t.getName() + ",开始执行任务:" + r.toString());
            }
    
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println(Thread.currentThread().getName() + ",任务:" + r.toString() + ",执行完毕!");
            }
    
            @Override
            protected void terminated() {
                System.out.println(Thread.currentThread().getName() + ",关闭线程池!");
            }
        };
    
        public static void main(String[] args) {
            for (int i = 1; i <= 3; i++) {
                int j = i;
                String taskName = "task" + j;
                executor.execute(new Task(taskName));
            }
            executor.shutdown();
        }
    }
    

    输出:

    From DemoThreadFactory's 订单创建组-Worker-1,开始执行任务:Task{name='task1'}
    From DemoThreadFactory's 订单创建组-Worker-1处理task1
    From DemoThreadFactory's 订单创建组-Worker-2,开始执行任务:Task{name='task2'}
    From DemoThreadFactory's 订单创建组-Worker-2处理task2
    From DemoThreadFactory's 订单创建组-Worker-3,开始执行任务:Task{name='task3'}
    From DemoThreadFactory's 订单创建组-Worker-3处理task3
    From DemoThreadFactory's 订单创建组-Worker-1,任务:Task{name='task1'},执行完毕!
    From DemoThreadFactory's 订单创建组-Worker-2,任务:Task{name='task2'},执行完毕!
    From DemoThreadFactory's 订单创建组-Worker-3,任务:Task{name='task3'},执行完毕!
    From DemoThreadFactory's 订单创建组-Worker-3,关闭线程池!
    

    合理地配置线程池

    要想合理的配置线程池,需要先分析任务的特性,可以冲一下四个角度分析:

    • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务
    • 任务的优先级:高、中、低
    • 任务的执行时间:长、中、短
    • 任务的依赖性:是否依赖其他的系统资源,如数据库连接。

    性质不同任务可以用不同规模的线程池分开处理。CPU密集型任务应该尽可能小的线程,如配置cpu数量+1个线程的线程池。由于IO密集型任务并不是一直在执行任务,不能让cpu闲着,则应配置尽可能多的线程,如:cup数量2混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务*,只要这2个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。可以通过Runtime.getRuntime().availableProcessors()方法获取cpu数量。优先级不同任务可以对线程池采用优先级队列来处理,让优先级高的先执行。

    使用队列的时候建议使用有界队列,有界队列增加了系统的稳定性,如果采用无界队列,任务太多的时候可能导致系统OOM,直接让系统宕机。

    线程池中线程数量的配置

    线程池中总线程大小对系统的性能有一定的影响,我们的目标是希望系统能够发挥最好的性能,过多或者过小的线程数量无法有效的使用机器的性能。在Java Concurrency in Practice书中给出了估算线程池大小的公式:

    Ncpu = CUP的数量
    Ucpu = 目标CPU的使用率,0<=Ucpu<=1
    W/C = 等待时间与计算时间的比例
    为保存处理器达到期望的使用率,最优的线程池的大小等于:
    Nthreads = Ncpu × Ucpu × (1+W/C)
    线程池数量 = CUP的数量 * 目标CPU的使用率 * 等待时间与计算时间的比例
    

    使用建议

    在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,但都有其局限性,不够灵活;另外由于前面几种方法内部也是通过ThreadPoolExecutor方式实现,使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。

    线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

    说明:Executors返回的线程池对象的弊端如下:

    1) FixedThreadPool和SingleThreadPool:

    允许的请求队列:LinkedBlockingQueue长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。

    2) CachedThreadPool:

    允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

    疑问:

    Q:LinkedBlockingQueue吞吐量通常要高于ArrayBlockingQueue,为什么?

    LinkedBlockingQueue底层是链表,增删效率比较高,而ArrayBlockingQueue底层是数组,增删效率比较低。

    最主要的是ArrayBlockingQueue数据的插入与取出共用同一个锁,因此ArrayBlockingQueue并不能实现生产、消费同时进行;
    LinkedBlockingQueue中用于阻塞生产者、消费者的锁是两个(锁分离:注意这里是JDK8之前的设计,JDK8之后是用一个锁实现),因此生产与消费是可以同时进行的

    JUC中的Executor框架

    Excecutor框架主要包含3部分的内容:

    1. 任务相关的:包含被执行的任务要实现的接口:Runnable接口或Callable接口
    2. 任务的执行相关的:包含任务执行机制的核心接口Executor,以及继承自ExecutorExecutorService接口。Executor框架中有两个关键的类实现了ExecutorService接口(ThreadPoolExecutorScheduleThreadPoolExecutor
    3. 异步计算结果相关的:包含接口Future实现Future接口的FutureTask类

    Executors框架包括:

    • Executor
    • ExecutorService
    • ThreadPoolExecutor
    • Executors
    • Future
    • Callable
    • FutureTask
    • CompletableFuture
    • CompletionService
    • ExecutorCompletionService

    Executor接口

    Executor接口中定义了方法execute(Runable able)接口,该方法接受一个Runable实例,他来执行一个任务,任务即实现一个Runable接口的类。

    ExecutorService接口

    ExecutorService继承于Executor接口,他提供了更为丰富的线程实现方法,比如ExecutorService提供关闭自己的方法,以及为跟踪一个或多个异步任务执行状况而生成Future的方法。

    ExecutorService有三种状态:运行、关闭、终止。创建后便进入运行状态,当调用了shutdown()方法时,便进入了关闭状态,此时意味着ExecutorService不再接受新的任务,但是他还是会执行已经提交的任务,当所有已经提交了的任务执行完后,便达到终止状态。如果不调用shutdown方法,ExecutorService方法会一直运行下去,系统一般不会主动关闭。

    ThreadPoolExecutor类

    线程池类,实现了ExecutorService接口中所有方法,参考线程池的使用。

    ScheduleThreadPoolExecutor定时器

    ScheduleThreadPoolExecutor继承自ThreadPoolExecutor(实现了线程池的核心功能),实现了ScheduledExecutorService(实现了定时器调度功能),他主要用来延迟执行任务,或者定时执行任务。功能和Timer类似,但是ScheduleThreadPoolExecutor更强大、更灵活一些。Timer后台是单个线程,而ScheduleThreadPoolExecutor可以在创建的时候指定多个线程。

    public class ScheduledThreadPoolExecutor
            extends ThreadPoolExecutor
            implements ScheduledExecutorService {
                public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                               long delay, TimeUnit unit);
            }
    
    schedule:延迟执行任务1次

    使用ScheduleThreadPoolExecutor的schedule方法,看一下这个方法的声明:

    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
    

    3个参数:

    command:需要执行的任务

    delay:需要延迟的时间

    unit:参数2的时间单位,是个枚举,可以是天、小时、分钟、秒、毫秒、纳秒等

    实例:

    //只延迟调度一次
    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3,
                new DemoThreadFactory("延迟调度线程池"));
        scheduledThreadPool.schedule(()->{
            System.out.println(System.currentTimeMillis()+"开始执行调度!");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(System.currentTimeMillis()+"执行调度结束!");
        },3,TimeUnit.SECONDS);
    }
    

    输出:

    1598509985652开始执行调度!
    1598509990653执行调度结束!
    
    scheduleAtFixedRate:固定的频率执行任务

    使用ScheduleThreadPoolExecutor的scheduleAtFixedRate方法,该方法设置了执行周期,下一次执行时间相当于是上一次的执行时间加上period,任务每次执行完毕之后才会计算下次的执行时间。

    看一下这个方法的声明:

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit);
    

    4个参数:

    command:表示要执行的任务

    initialDelay:表示延迟多久执行第一次

    period:连续执行之间的时间间隔

    unit:参数2和参数3的时间单位,是个枚举,可以是天、小时、分钟、秒、毫秒、纳秒等

    假设系统调用scheduleAtFixedRate的时间是T1,那么执行时间如下:

    第1次:T1+initialDelay

    第2次:T1+initialDelay+period(这时候如果第一次执行完后时间大于固定频率的时间,就会被马上调度起来)

    第3次:T1+initialDelay+2*period

    第n次:T1+initialDelay+(n-1)*period

    实例:

    //scheduleAtFixedRate()表示每次方法的执行周期是多久关注的是执行周期,如果已经到了执行周期,就会立即开启调度任务,时间间隔是调度任务开始时间加周期
    public static void main2(String[] args) throws ExecutionException, InterruptedException {
        //任务执行计数器
        AtomicInteger count = new AtomicInteger(1);
        ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(3,
                new DemoThreadFactory("延迟调度线程池"),new ThreadPoolExecutor.AbortPolicy());
        ScheduledFuture<?> schedule = scheduledThreadPool.scheduleAtFixedRate(() -> {
            int currCount = count.getAndIncrement();
            System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "开始执行");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "结束执行");
        }, 1,3, TimeUnit.SECONDS);
    
    }
    

    输出:

    From DemoThreadFactory's 延迟调度线程池-Worker-1:Thu Aug 27 14:36:17 CST 2020 第1次开始执行
    From DemoThreadFactory's 延迟调度线程池-Worker-1:Thu Aug 27 14:36:22 CST 2020 第1次结束执行
    From DemoThreadFactory's 延迟调度线程池-Worker-1:Thu Aug 27 14:36:22 CST 2020 第2次开始执行
    From DemoThreadFactory's 延迟调度线程池-Worker-1:Thu Aug 27 14:36:27 CST 2020 第2次结束执行
    From DemoThreadFactory's 延迟调度线程池-Worker-2:Thu Aug 27 14:36:27 CST 2020 第3次开始执行
    任务当前执行完毕之后会计算下次执行时间,下次执行时间为上次执行的开始时间+period,这个时间小于第一次结束的时间了,说明小于系统当前时间了,会立即执行。
    
    scheduleWithFixedDelay:固定的间隔执行任务

    使用ScheduleThreadPoolExecutor的scheduleWithFixedDelay方法,该方法设置了执行周期,与scheduleAtFixedRate方法不同的是,下一次执行时间是上一次任务执行完的系统时间加上period,因而具体执行时间不是固定的,但周期是固定的,是采用相对固定的延迟来执行任务。看一下这个方法的声明:

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit);
    

    4个参数:

    command:表示要执行的任务

    initialDelay:表示延迟多久执行第一次

    period:表示下次执行时间和上次执行结束时间之间的间隔时间

    unit:参数2和参数3的时间单位,是个枚举,可以是天、小时、分钟、秒、毫秒、纳秒等

    假设系统调用scheduleAtFixedRate的时间是T1,那么执行时间如下:

    第1次:T1+initialDelay,执行结束时间:E1(执行结束时间是不固定的)

    第2次:E1+period,执行结束时间:E2

    第3次:E2+period,执行结束时间:E3

    第4次:E3+period,执行结束时间:E4

    第n次:上次执行结束时间+period

    实例:

    //scheduleWithFixedDelay()表示每次方法执行完后延迟多久执行,关注的是延迟时间,时间间隔是调度任务结束时间加延迟时间
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //任务执行计数器
        AtomicInteger count = new AtomicInteger(1);
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4,
                new DemoThreadFactory("延迟调度线程池"));
        ScheduledFuture<?> schedule = scheduledThreadPool.scheduleWithFixedDelay(() -> {
            int currCount = count.getAndIncrement();
            System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "开始执行");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "结束执行");
        }, 1,3, TimeUnit.SECONDS);
    
    }
    

    输出:

    From DemoThreadFactory's 延迟调度线程池-Worker-1:Thu Aug 27 14:39:16 CST 2020 第1次开始执行
    From DemoThreadFactory's 延迟调度线程池-Worker-1:Thu Aug 27 14:39:22 CST 2020 第1次结束执行
    From DemoThreadFactory's 延迟调度线程池-Worker-1:Thu Aug 27 14:39:25 CST 2020 第2次开始执行
    From DemoThreadFactory's 延迟调度线程池-Worker-1:Thu Aug 27 14:39:30 CST 2020 第2次结束执行
    From DemoThreadFactory's 延迟调度线程池-Worker-2:Thu Aug 27 14:39:33 CST 2020 第3次开始执行
    延迟1秒之后执行第1次,后面每次的执行时间和上次执行结束时间间隔3秒。
    

    定时任务有异常——没有对异常处理则定时任务会结束

    先说补充点知识:schedule、scheduleAtFixedRate、scheduleWithFixedDelay这几个方法有个返回值ScheduledFuture,通过ScheduledFuture可以对执行的任务做一些操作,如判断任务是否被取消、是否执行完成。

    再回到上面代码,任务中有个10/0的操作,会触发异常,发生异常之后没有任何现象,被ScheduledExecutorService内部给吞掉了,然后这个任务再也不会执行了,scheduledFuture.isDone()输出true,表示这个任务已经结束了,再也不会被执行了。所以如果程序有异常,开发者自己注意try-catch处理一下,不然跑着跑着发现任务怎么不跑了,也没有异常输出。

    实例:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //任务执行计数器
        AtomicInteger count = new AtomicInteger(1);
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4,
                new DemoThreadFactory("延迟调度线程池"));
        ScheduledFuture<?> schedule = scheduledThreadPool.scheduleWithFixedDelay(() -> {
            int currCount = count.getAndIncrement();
            System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "开始执行");
        /*    try {
                System.out.println(10/0);
            } catch (Exception e) {
                e.printStackTrace();
            }*/
            System.out.println(10/0);
            System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "结束执行");
        }, 1,3, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(3);
        System.out.println(schedule.isCancelled());
        System.out.println(schedule.isDone());
    }
    

    输出:

    From DemoThreadFactory's 延迟调度线程池-Worker-1:Thu Aug 27 14:45:09 CST 2020 第1次开始执行
    false
    true
    

    取消定时任务的执行——调用ScheduledFuturecancel方法

    可能任务执行一会,想取消执行,可以调用ScheduledFuturecancel方法,参数表示是否给任务发送中断信号。

    示例:

        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //任务执行计数器
            AtomicInteger count = new AtomicInteger(1);
            ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4,
                    new DemoThreadFactory("延迟调度线程池"));
            ScheduledFuture<?> schedule = scheduledThreadPool.scheduleWithFixedDelay(() -> {
                int currCount = count.getAndIncrement();
                System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "开始执行");
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "结束执行");
            }, 1,3, TimeUnit.SECONDS);
            TimeUnit.SECONDS.sleep(5);
            schedule.cancel(false);
            TimeUnit.SECONDS.sleep(1);
            System.out.println("任务是否被取消:"+schedule.isCancelled());
            System.out.println("任务是否已完成:"+schedule.isDone());
        }
    }
    

    输出:

    From DemoThreadFactory's 延迟调度线程池-Worker-1:Thu Aug 27 14:53:12 CST 2020 第1次开始执行
    任务是否被取消:true
    任务是否已完成:true
    From DemoThreadFactory's 延迟调度线程池-Worker-1:Thu Aug 27 14:53:17 CST 2020 第1次结束执行
    

    Executors类——线程池工具类

    Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。常用的方法有:

    newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor()
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
    

    创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。内部使用了无限容量的LinkedBlockingQueue阻塞队列来缓存任务,任务如果比较多,单线程如果处理不过来,会导致队列堆满,引发OOM

    newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads)
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
    
    

    创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,在提交新任务,任务将会进入等待队列中等待。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。内部使用了无限容量的LinkedBlockingQueue阻塞队列来缓存任务,任务如果比较多,如果处理不过来,会导致队列堆满,引发OOM

    newCachedThreadPool

    public static ExecutorService newCachedThreadPool()
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
    
    

    创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,

    那么就会回收部分空闲(60秒处于等待任务到来)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池的最大值是Integer的最大值(2^31-1)。内部使用了SynchronousQueue同步队列来缓存任务,此队列的特性是放入任务时必须要有对应的线程获取任务,任务才可以放入成功。如果处理的任务比较耗时,任务来的速度也比较快,会创建太多的线程引发OOM

    newScheduledThreadPool

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
    

    创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

    在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,但都有其局限性,不够灵活;另外由于前面几种方法内部也是通过ThreadPoolExecutor方式实现,使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。

    Future、Callable接口

    Future、Callable接口需要结合ExecutorService来使用,需要有线程池的支持。

    Future接口定义了操作异步异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。

    Callable接口中定义了需要有返回的任务需要实现的方法。——相当于有返回值的Runnable

    @FunctionalInterface
    public interface Callable<V> {
        V call() throws Exception;
    }
    
    

    比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会才去获取子任务的执行结果。

    Future其他方法介绍一下

    cancel:取消在执行的任务,参数表示是否对执行的任务发送中断信号,方法声明如下:

    boolean cancel(boolean mayInterruptIfRunning);
    

    isCancelled:用来判断任务是否被取消

    isDone:判断任务是否执行完毕。

    调用线程池的submit方法执行任务,submit参数为Callable接口:表示需要执行的任务有返回值,submit方法返回一个Future对象,Future相当于一个凭证,可以在任意时间拿着这个凭证去获取对应任务的执行结果(调用其get方法),代码中调用了result.get()方法之后,此方法会阻塞当前线程直到任务执行结束。

    实例:

        public static void main(String[] args) throws ExecutionException, InterruptedException {
                String taskName = "task";
            Future<String> future = executor.submit(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
               // System.out.println(Thread.currentThread().getName() + "线程执行" + taskName + "完毕!");
                return "finished";
            });
            TimeUnit.SECONDS.sleep(1);
            //取消正在执行的任务,mayInterruptIfRunning:是否发送中断信息
            future.cancel(false);
            System.out.println(future.isCancelled());
            System.out.println(future.isDone());
            //System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",结果:" + future.get());
            try {
                //超时获取异步任务执行结果
                System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",结果:" + future.get(10,TimeUnit.SECONDS));
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            executor.shutdown();
        }
    }
    

    输出:

    Exception in thread "main" java.util.concurrent.CancellationException
        at java.util.concurrent.FutureTask.report(FutureTask.java:121)
        at java.util.concurrent.FutureTask.get(FutureTask.java:206)
        at com.self.current.FutureTest.main(FutureTest.java:46)
    true
    true
    

    FutureTask类

    FutureTask除了实现Future接口,还实现了Runnable接口,因此FutureTask可以交给Executor执行,也可以交给线程执行执行(Thread有个Runnable的构造方法),FutureTask表示带返回值结果的任务。线程池的submit方法返回的Future实际类型正是FutureTask对象

    疑问:

    Q:线程池执行submit()方法是如何调用Callable任务的?

    A:Callable通过线程池执行的过程,封装为Runnable。线程池执行submit()方法会把Callable包装成FutrueTask对象,此对象实现了Runnable接口,当调用FutrueTask的run方法时,会把其属性中的Callable拿出来执行call()方法。示例代码如下:

      public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
        
            public void run() {
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            }
        }
    

    Q:多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。是因为ScheduledExecutorService是多线程么?

    A:是因为Timer只有一个线程在运行,while(true)循环不断地从队列中获取任务执行,而当线程被被杀死或者中断时,就相当于关闭了Timer.

    Q: ScheduleThreadPoolExecutor定时器并不关心线程数多少,他不是并发的执行多任务,只关心调度一个定时任务,线程数的多少只是影响多个任务再调度时需要多个线程,这样理解对么?

    A:我认为这样理解是对的,而这样也可以解释上面Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行的原因,是因为Timer只有一个线程在运行,while(true)循环不断地从队列中获取任务执行,而当线程被被杀死或者中断时,就相当于关闭了Timer.下面是多个任务调度时会创建多个线程去执行。

    From DemoThreadFactory's 延迟调度线程池-Worker-1:Thu Aug 27 14:22:22 CST 2020 第1次开始执行
    From DemoThreadFactory's 延迟调度线程池-Worker-2:Thu Aug 27 14:22:22 CST 2020 第2次开始执行
    From DemoThreadFactory's 延迟调度线程池-Worker-3:Thu Aug 27 14:22:22 CST 2020 第3次开始执行
    From DemoThreadFactory's 延迟调度线程池-Worker-1:Thu Aug 27 14:22:27 CST 2020 第1次结束执行
    From DemoThreadFactory's 延迟调度线程池-Worker-2:Thu Aug 27 14:22:27 CST 2020 第2次结束执行
    From DemoThreadFactory's 延迟调度线程池-Worker-3:Thu Aug 27 14:22:27 CST 2020 第3次结束执行
    From DemoThreadFactory's 延迟调度线程池-Worker-1:Thu Aug 27 14:22:30 CST 2020 第4次开始执行
    From DemoThreadFactory's 延迟调度线程池-Worker-4:Thu Aug 27 14:22:30 CST 2020 第5次开始执行
    From DemoThreadFactory's 延迟调度线程池-Worker-2:Thu Aug 27 14:22:30 CST 2020 第6次开始执行
    

    CompletionService接口——获取线程池中已经完成的任务

    CompletionService相当于一个执行任务的服务,通过submit丢任务给这个服务,服务内部去执行任务,可以通过服务提供的一些方法获取服务中已经完成的任务。

    接口内的几个方法:

    Future<V> submit(Callable<V> task);
    

    用于向服务中提交有返回结果的任务,并返回Future对象

    Future<V> submit(Runnable task, V result);
    

    用户向服务中提交有返回值的任务去执行,并返回Future对象。Runnable会被包装成有返回值的Callable,返回值为传入的result。

    Future<V> take() throws InterruptedException;
    

    从服务中返回并移除一个已经完成的任务,如果获取不到,会一致阻塞到有返回值为止。此方法会响应线程中断。

    Future<V> poll();
    

    从服务中返回并移除一个已经完成的任务,如果内部没有已经完成的任务,则返回空,此方法会立即响应。

    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
    

    尝试在指定的时间内从服务中返回并移除一个已经完成的任务,等待的时间超时还是没有获取到已完成的任务,则返回空。此方法会响应线程中断

    通过submit向内部提交任意多个任务,通过take方法可以获取已经执行完成的任务,如果获取不到将等待。

    ExecutorCompletionService

    ExecutorCompletionService类是CompletionService接口的具体实现

    说一下其内部原理,ExecutorCompletionService创建的时候会传入一个线程池,调用submit方法传入需要执行的任务,任务由内部的线程池来处理;ExecutorCompletionService内部有个阻塞队列,任意一个任务完成之后,会将任务的执行结果(Future类型)放入阻塞队列中,然后其他线程可以调用它take、poll方法从这个阻塞队列中获取一个已经完成的任务,获取任务返回结果的顺序和任务执行完成的先后顺序一致,所以最先完成的任务会先返回。

    看一下构造方法:

    public ExecutorCompletionService(Executor executor) {
            if (executor == null)
                throw new NullPointerException();
            this.executor = executor;
            this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
            this.completionQueue = new LinkedBlockingQueue<Future<V>>();
        }
    

    构造方法需要传入一个Executor对象,这个对象表示任务执行器,所有传入的任务会被这个执行器执行。

    completionQueue是用来存储任务结果的阻塞队列,默认用采用的是LinkedBlockingQueue也支持开发自己设置。通过submit传入需要执行的任务,任务执行完成之后,会放入completionQueue中。

    任务完成入队操作原理:

    还是通过线程池execute()方法执行一个FutureTask包装的Callable任务,FutureTask里的run方法会调用Callable任务call()方法执行具体的行为,并在执行结算后执行set(result);设置返回值操作,而设置返回值操作中的finishCompletion()方法会调用钩子方法done(),ExecutorCompletionService里定义的QueueingFuture继承了FutureTask,重写了钩子方法,把完成的方法入队保存起来了。

    场景:买新房了,然后在网上下单买冰箱、洗衣机,电器商家不同,所以送货耗时不一样,然后等他们送货,快递只愿送到楼下,然后我们自己将其搬到楼上的家中。 这时候我们需要根据异步先完成的快递,拿个先到对其获取做处理——搬上楼。

    示例:

    public class ExecutorCompletionServiceTest {
    
        static class GoodsModel {
            //商品名称
            String name;
            //购物开始时间
            long startime;
            //送到的时间
            long endtime;
    
            public GoodsModel(String name, long startime, long endtime) {
                this.name = name;
                this.startime = startime;
                this.endtime = endtime;
            }
    
            @Override
            public String toString() {
                return name + ",下单时间[" + this.startime + "," + endtime + "],耗时:" + (this.endtime - this.startime);
            }
        }
        /**
         * 将商品搬上楼
         *
         * @param goodsModel
         * @throws InterruptedException
         */
        static void moveUp(GoodsModel goodsModel) throws InterruptedException {
            //休眠5秒,模拟搬上楼耗时
            TimeUnit.SECONDS.sleep(5);
            System.out.println("将商品搬上楼,商品信息:" + goodsModel);
        }
    
        /**
         * 模拟下单
         *
         * @param name     商品名称
         * @param costTime 耗时
         * @return
         */
        static Callable<GoodsModel> buyGoods(String name, long costTime) {
            return () -> {
                long startTime = System.currentTimeMillis();
                System.out.println(startTime + "购买" + name + "下单!");
                //模拟送货耗时
                try {
                    TimeUnit.SECONDS.sleep(costTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                long endTime = System.currentTimeMillis();
                System.out.println(endTime + name + "送到了!");
                return new GoodsModel(name, startTime, endTime);
            };
        }
    
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            long st = System.currentTimeMillis();
            System.out.println(st + "开始购物!");
            ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
                    TimeUnit.SECONDS, new LinkedBlockingQueue<>(10),
                    new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
            ExecutorCompletionService<GoodsModel> completionService = new ExecutorCompletionService<>(executor);
            //异步下单购买
            completionService.submit(buyGoods("电视机", 3));
            completionService.submit(buyGoods("洗碗机", 5));
            executor.shutdown();
            for (int i = 0; i < 2; i++) {
                //可以获取到最先到的商品
                GoodsModel goodsModel = completionService.take().get();
                //将最先到的商品送上楼
                moveUp(goodsModel);
            }
            long et = System.currentTimeMillis();
            System.out.println(et + "货物已送到家里咯,哈哈哈!");
            System.out.println("总耗时:" + (et - st));
        }
    }
    
    1598583792616开始购物!
    1598583792707购买电视机下单!
    1598583792708购买洗碗机下单!
    1598583795708电视机送到了!
    1598583797709洗碗机送到了!
    将商品搬上楼,商品信息:电视机,下单时间[1598583792707,1598583795708],耗时:3001
    将商品搬上楼,商品信息:洗碗机,下单时间[1598583792708,1598583797709],耗时:5001
    1598583805710货物已送到家里咯,哈哈哈!
    总耗时:13094
    

    异步执行一批任务,有一个完成立即返回,其他取消——线程池invokeAny ()方法

    如果是要返回所有的任务结果,则调用 invokeAll(Collection<? extends Callable<T>> tasks)方法,invokeAny ()和invokeAll()都有超时调用方法。如果超时时间到了,调用结束后还没有全部完成,会对所有工作线程发送中断信号中断操作。

    方法声明如下:

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    

    示例:

        public static void main(String[] args) throws InterruptedException, ExecutionException {
            long st = System.currentTimeMillis();
            ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
                    TimeUnit.SECONDS, new LinkedBlockingQueue<>(10),
                    new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
            List<Callable<Integer>> list = new ArrayList<>();
            int taskCount = 5;
            for (int i = taskCount; i > 0; i--) {
                int j = i * 2;
                String taskName = "任务"+i;
                list.add(() -> {
                    TimeUnit.SECONDS.sleep(j);
                    System.out.println(taskName+"执行完毕!");
                    return j;
                });
            }
            //Integer integer = invokeAny(executor, list);
            //ExecutorService提供异步执行一批任务,有一个完成立即返回,其他取消
            Integer integer = executor.invokeAny(list);
            System.out.println("耗时:" + (System.currentTimeMillis() - st) + ",执行结果:" + integer);
            executor.shutdown();
        }
    
        private static <T> T invokeAny(ThreadPoolExecutor executor, List<Callable<T>> list) throws InterruptedException, ExecutionException {
            ExecutorCompletionService<T> completionService = new ExecutorCompletionService(executor);
            List<Future<T>> futureList = new ArrayList<>();
            for (Callable<T> s : list) {
                futureList.add(completionService.submit(s));
            }
            int n = list.size();
            try {
                for (int i = 0; i < n; ++i) {
                    T r = completionService.take().get();
                    if (r != null) {
                        return r;
                    }
                }
            } finally {
                for (Future<T> future : futureList) {
                    future.cancel(true);
                }
            }
            return null;
        }
    }
    

    输出:

    任务1执行完毕!
    耗时:2053,执行结果:2
    

    CompletableFuture——当异步任务完成或者发生异常时,自动调用回调对象的回调方法,主线程无需等待获取结果,异步是以守护线程执行的,如果是用线程池作为执行器则不是守护线程

    使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待

    从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

    我们以获取股票价格为例,看看如何使用CompletableFuture

    CompletableFuture优点是:

    • 异步任务结束时,会自动回调某个对象的方法;
    • 异步任务出错时,会自动回调某个对象的方法;
    • 主线程设置好回调后,不再关心异步任务的执行。

    如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势。CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,多个CompletableFuture还可以并行执行。

    除了anyOf()可以实现“任意个CompletableFuture只要一个成功”,allOf()可以实现“所有CompletableFuture都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。

    最后我们注意CompletableFuture的命名规则:

    • xxx():表示该方法将继续在已有的线程中执行;
    • xxxAsync():表示将异步在线程池中执行。

    示例:

    public class CompletableFutureTest {
    
        public static void main(String[] args) throws Exception {
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(CompletableFutureTest::fetchPrice);
            // 如果执行成功:
            cf.thenAccept((result) -> {
                System.out.println("price: " + result);
            });
            // 如果执行异常:
            cf.exceptionally((e) -> {
                e.printStackTrace();
                return null;
            });
            // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
            Thread.sleep(200);
        }
    
        static Double fetchPrice() {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
            if (Math.random() < 0.3) {
                throw new RuntimeException("fetch price failed!");
            }
            return 5 + Math.random() * 20;
        }
    
    }
    

    定义两个CompletableFuture,第一个CompletableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格,这两个CompletableFuture实现串行操作如下:

    public class CompletableFutureSerialTest {
    
        public static void main(String[] args) throws InterruptedException {
            //先获取股票代码
            CompletableFuture<String> tesla = CompletableFuture.supplyAsync(() -> {
                return CompletableFutureSerialTest.queryCode("tesla");
            });
            //再获取股票代码对应的股价
            CompletableFuture<Double> priceFuture = tesla.thenApplyAsync((code) -> {
                return CompletableFutureSerialTest.fetchPrice(code);
            });
            //打印结果
            priceFuture.thenAccept((price)->{
                System.out.println("price: " + price);
            });
            // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
            Thread.sleep(2000);
        }
    
        static String queryCode(String name) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
            return "601857";
        }
    
        static Double fetchPrice(String code) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
            return 5 + Math.random() * 20;
        }
    }
    

    输出:

    price: 23.116752498711122
    

    示例:同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作。

    public class CompletableFutureParallelTest {
    
        public static void main(String[] args) throws InterruptedException {
            // 两个CompletableFuture执行异步查询:
            CompletableFuture<String> teslaSina = CompletableFuture.supplyAsync(() -> {
                return CompletableFutureParallelTest.queryCode("tesla","https://finance.sina.com.cn/code/");
            });
    
            CompletableFuture<String> tesla163 = CompletableFuture.supplyAsync(() -> {
                return CompletableFutureParallelTest.queryCode("tesla","https://money.163.com/code/");
            });
            // 用anyOf合并为一个新的CompletableFuture:
            CompletableFuture<Object> stockFuture = CompletableFuture.anyOf(tesla163, teslaSina);
    
            //再获取股票代码对应的股价
            // 两个CompletableFuture执行异步查询:
            CompletableFuture<Double> priceSina = stockFuture.thenApplyAsync((code) -> {
                return CompletableFutureParallelTest.fetchPrice(String.valueOf(code),"https://money.163.com/code/");
            });
            CompletableFuture<Double> price163 = stockFuture.thenApplyAsync((code) -> {
                return CompletableFutureParallelTest.fetchPrice(String.valueOf(code),"https://money.163.com/code/");
            });
            // 用anyOf合并为一个新的CompletableFuture:
            CompletableFuture<Object> priceFuture = CompletableFuture.anyOf(priceSina, price163);
    
            //打印结果
            priceFuture.thenAccept((price)->{
                System.out.println("price: " + price);
            });
            // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
            Thread.sleep(2000);
        }
    
        static String queryCode(String name, String url) {
            System.out.println("query code from " + url + "...");
            try {
                Thread.sleep((long) (Math.random() * 100));
            } catch (InterruptedException e) {
            }
            return "601857";
        }
    
        static Double fetchPrice(String code, String url) {
            System.out.println("query price from " + url + "...");
            try {
                Thread.sleep((long) (Math.random() * 100));
            } catch (InterruptedException e) {
            }
            return 5 + Math.random() * 20;
        }
    }
    
    query code from https://finance.sina.com.cn/code/...
    query code from https://money.163.com/code/...
    query price from https://money.163.com/code/...
    query price from https://money.163.com/code/...
    price: 17.34369661842006
    

    相关文章

      网友评论

          本文标题:Java高并发系列——检视阅读(五)

          本文链接:https://www.haomeiwen.com/subject/unzryktx.html