美文网首页
Java并发编程 -- 结构化并发应用程序

Java并发编程 -- 结构化并发应用程序

作者: TomyZhang | 来源:发表于2019-07-21 21:02 被阅读0次

    一、任务执行

    1.Executor框架

    Executor接口:

    public interface Executor {
        void execute(Runnable command);
    }
    

    Executor基于生产者-消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。如果要在程序中实现一个生产者-消费者的设计,那么最简单的方式通常就是使用Executor。

    每当看到下面这种形式的代码时:

    new Thread(runnable).start();
    

    并且你希望获得一种更灵活的执行策略时,请考虑使用Executor来代替Thread。

    2.线程池

    线程池是指管理一组同构工作线程的资源池。线程池是与工作队列密切相关的,其中在工作队列中保存了所有等待执行的任务。工作者线程的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。

    "在线程池中执行任务"比"为每个任务分配一个线程"优势更多:

    • 通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。
    • 当请求到达时,工作线程已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性。
    • 通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态,同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。

    类库提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用Executors中的静态工厂方法之一来创建一个线程池:

    • newFixedThreadPool。
      newFixedThreadPool将创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(如果某个线程由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程)。
    • newCachedThreadPool。
      newCachedThreadPool将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
    • newSingleThreadExecutor。
      newSingleThreadExecutor是一个单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。newSingleThreadExecutor能确保依照任务在队列中的顺序来串行执行(例如FIFO、LIFO、优先级)。
    • newScheduledThreadPool。
      newScheduledThreadPool创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer。

    Executor的生命周期:
    ExecutorService中的生命周期管理方法:

    public interface ExecutorService extends Executor {
        void shutdown();
        List<Runnable> shutdownNow();
        boolean isShutdown();
        boolean isTerminated();
        boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
        ...
    }
    

    ExecutorService的生命周期有3种状态:运行、关闭和已终止。ExecutorService在初始创建时处于运行状态。shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括那些还未开始执行的任务。shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

    在ExecutorService关闭后提交的任务将由"拒绝执行处理器"来处理,它会抛弃任务,或者使得execute方法抛出一个未检查的RejectedExecutionException。等所有任务都完成后,ExecuteService将转入终止状态。可以调用awaitTermination来等待ExecutorService到达终止状态,或者通过调用isTerminated来轮询ExecutorService是否已经终止。通常在调用awaitTermination之后会立即调用shutdown,从而产生同步地关闭ExecutorService的效果。

    //ExecutorTest
    public class ExecutorTest {
        private final ExecutorService executorService = Executors.newFixedThreadPool(2);
    
        public void doSomething() {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " start");
                    try {
                        Thread.sleep(6000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " end");
                }
            });
        }
    
        public void stop() {
            Log.d("ExecutorTest", "zwm, stop Executor");
            executorService.shutdown();
            Log.d("ExecutorTest", "zwm, isShutdown: " + executorService.isShutdown());
            Log.d("ExecutorTest", "zwm, isTerminated: " + executorService.isTerminated());
            while (!executorService.isTerminated()) {
    
            }
            Log.d("ExecutorTest", "zwm, isTerminated: " + executorService.isTerminated());
        }
    
        public void stopNow() {
            Log.d("ExecutorTest", "zwm, stop Executor now");
            executorService.shutdownNow();
            Log.d("ExecutorTest", "zwm, isShutdown: " + executorService.isShutdown());
            Log.d("ExecutorTest", "zwm, isTerminated: " + executorService.isTerminated());
            while (!executorService.isTerminated()) {
    
            }
            Log.d("ExecutorTest", "zwm, isTerminated: " + executorService.isTerminated());
        }
    }
    
    //测试代码
    private void testMethod() {
        final ExecutorTest executorTest = new ExecutorTest();
        executorTest.doSomething();
        executorTest.doSomething();
        executorTest.doSomething();
        executorTest.doSomething();
        new Handler().postDelayed(new Runnable() {
            @Override
            public void run() {
                executorTest.stop();
                //executorTest.stopNow();
            }
        }, 3000);
    }
    
    //输出log
    2019-07-18 10:11:34.007 zwm, thread: pool-2-thread-1 start
    2019-07-18 10:11:34.007 zwm, thread: pool-2-thread-2 start
    2019-07-18 10:11:37.011 zwm, stop Executor
    2019-07-18 10:11:37.013 zwm, isShutdown: true
    2019-07-18 10:11:37.013 zwm, isTerminated: false
    2019-07-18 10:11:40.008 zwm, thread: pool-2-thread-1 end
    2019-07-18 10:11:40.008 zwm, thread: pool-2-thread-2 end
    2019-07-18 10:11:40.008 zwm, thread: pool-2-thread-1 start
    2019-07-18 10:11:40.008 zwm, thread: pool-2-thread-2 start
    2019-07-18 10:11:46.008 zwm, thread: pool-2-thread-1 end
    2019-07-18 10:11:46.008 zwm, thread: pool-2-thread-2 end
    2019-07-18 10:11:46.009 zwm, isTerminated: true
    

    延迟任务与周期任务:
    Timer类负责管理延迟任务("在100ms后执行该任务")以及周期任务("每10ms执行一次该任务")。然而,Timer存在一些缺陷,在Java 5.0或更高的JDK中,将很少使用Timer。因此应该考虑使用ScheduledThreadPoolExecutor来代替它。可以通过ScheduledThreadPoolExecutor的构造函数或newScheduledThreadPool工厂方法来创建该类的对象。

    Timer类的缺陷:

    • Timer在执行所有定时任务时只会创建一个线程。如果某个任务的执行时间过长,那么将破坏其他TimerTask的定时精确性。
    • 如果TimerTask抛出了一个未检查的异常,那么Timer将表现出糟糕的行为:Timer线程并不捕获异常,将终止定时线程。

    携带结果的任务Callable与Future:
    Executor框架使用Runnable作为其基本的任务表示形式,但是Runnable不能返回一个值或抛出一个受检查的异常。

    Runnable和Callable描述的都是抽象的计算任务。这些任务通常是有范围的,即都有一个明确的起始点,并且最终会结束。Executor执行的任务有4个生命周期阶段:创建、提交、开始和完成。由于有些任务可能要执行很长的时间,因此通常希望能够取消这些任务。在Executor框架中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当它们能响应中断时,才能取消。取消一个已经完成的任务不会有任何影响。

    Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。在Future规范中包含的隐含意义是,任务的生命周期只能前进,不能后退,就像ExecutorService的生命周期一样。当某个任务完成后,它就永远停留在"完成"状态上。
    get方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已经完成,那么get会立即返回或者抛出一个Exception,如果任务没有完成,那么get将阻塞并直到任务完成。如果任务抛出了异常,那么get将该异常封装为ExecutionException并重新抛出。如果任务被取消,那么get将抛出CancellationException。如果get抛出了ExecutionException,那么可以通过getCause来获得被封装的初始异常。

    public interface Callable<V> {
        V call() throws Exception;
    }
    
    public interface Future<V> {
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        V get() throws InterruptedException, ExecutionException,
                       CancellationException;
        V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
                                                  CancellationException, TimeoutException;
    }
    

    ExecutorService中的所有submit方法都将返回一个Future,从而将一个Runnable或Callable提交给Executor,并得到一个Future用来获得任务的执行结果或者取消任务。还可以显示地为某个指定的Runnable或Callable实例化一个FutureTask。(由于FutureTask实现了Runnable,因此可以将它提交给Executor来执行,或者直接调用它的run方法。)

    在将Runnable或Callable提交到Executor的过程中,包含了一个安全发布过程,即将Runnable或Callable从提交线程发布到最终执行任务的线程。类似地,在设置Future结果的过程中也包含了一个安全发布,即将这个结果从计算它的线程发布到任何通过get获得它的线程。

    Callable + Future:

    //ExecutorTest
    public class ExecutorTest {
        private final ExecutorService executorService = Executors.newFixedThreadPool(2);
    
        public void doSomething() {
            Future<String> future = executorService.submit(new Callable<String>() {
                @Override
                public String call() throws InterruptedException {
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " start");
                    Thread.sleep(3000);
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " end");
                    return "tomorrow";
                }
            });
    
            try {
                String result = future.get();
                Log.d("ExecutorTest", "zwm, result: " + result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    //测试代码
    private void testMethod() {
       final ExecutorTest executorTest = new ExecutorTest();
       executorTest.doSomething();
    }
    
    //输出log
    2019-07-18 11:34:34.179 zwm, thread: pool-3-thread-1 start
    2019-07-18 11:34:37.179 zwm, thread: pool-3-thread-1 end
    2019-07-18 11:34:37.179 zwm, result: tomorrow
    

    Callable + FutureTask:

    //ExecutorTest
    public class ExecutorTest {
        private final ExecutorService executorService = Executors.newFixedThreadPool(2);
    
        public void doSomething() {
            FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
                @Override
                public String call() throws InterruptedException {
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " start");
                    Thread.sleep(3000);
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " end");
                    return "tomorrow";
                }
            });
             executorService.submit(futureTask);
            try {
                String result = futureTask.get();
                Log.d("ExecutorTest", "zwm, result: " + result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    //测试代码
    private void testMethod() {
       final ExecutorTest executorTest = new ExecutorTest();
       executorTest.doSomething();
    }
    
    //输出log
    2019-07-18 11:42:38.580 zwm, thread: pool-2-thread-1 start
    2019-07-18 11:42:41.580 zwm, thread: pool-2-thread-1 end
    2019-07-18 11:42:41.581 zwm, result: tomorrow
    

    CompletionService:Executor与BlockingQueue:
    如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务(CompletionService)。

    CompletionService将Executor和BlockingQueue的功能融合在一起。你可以将Callable任务提交给它来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果,而这些结果会在完成时将被封装为Future。ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。

    //ExecutorTest
    public class ExecutorTest {
        private final ExecutorService executorService = Executors.newFixedThreadPool(3);
        private final CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
    
        public void doSomething() {
            completionService.submit(new Callable<String>() {
                @Override
                public String call() throws InterruptedException {
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " start");
                    Thread.sleep(3000);
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " end");
                    return Thread.currentThread().getName();
                }
            });
        }
    
        public void getResult() {
            Log.d("ExecutorTest", "zwm, getResult");
            try {
                for(int i=0; i<3; i++) {
                    String result = completionService.take().get();
                    Log.d("ExecutorTest", "zwm, result is: " + result);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    //测试代码
    private void testMethod() {
        final ExecutorTest executorTest = new ExecutorTest();
        executorTest.doSomething();
        executorTest.doSomething();
        executorTest.doSomething();
        executorTest.getResult();
    }
    
    //输出log
    2019-07-18 14:06:14.688 zwm, thread: pool-2-thread-1 start
    2019-07-18 14:06:14.688 zwm, getResult
    2019-07-18 14:06:14.692 zwm, thread: pool-2-thread-2 start
    2019-07-18 14:06:14.693 zwm, thread: pool-2-thread-3 start
    2019-07-18 14:06:17.690 zwm, thread: pool-2-thread-1 end
    2019-07-18 14:06:17.692 zwm, result is: pool-2-thread-1
    2019-07-18 14:06:17.694 zwm, thread: pool-2-thread-2 end
    2019-07-18 14:06:17.694 zwm, thread: pool-2-thread-3 end
    2019-07-18 14:06:17.695 zwm, result is: pool-2-thread-3
    2019-07-18 14:06:17.696 zwm, result is: pool-2-thread-2
    

    二、取消与关闭

    1.任务取消

    在Java中没有一种安全的抢占式方法来停止线程,因此也就没有安全的抢占式方法来停止任务。只有一些协作式的机制,使请求取消的任务和代码都遵循一种协商好的协议。

    使用volatile类型的域来保存取消状态:

    //ExecutorTest
    public class ExecutorTest {
        private final ExecutorService executorService = Executors.newFixedThreadPool(3);
        private static final Task task = new Task();
    
        public void doSomething() {
            executorService.execute(task);
        }
    
        public void cancel() {
            task.cancel();
        }
    
        public static class Task implements Runnable {
            private volatile boolean cancelled;
    
            @Override
            public void run() {
                Log.d("Task", "zwm, thread: " + Thread.currentThread().getName() + " start");
                while(!cancelled) {
    
                }
                Log.d("Task", "zwm, thread: " + Thread.currentThread().getName() + " end");
            }
    
            public void cancel() {
                Log.d("Task", "zwm, thread: " + Thread.currentThread().getName() + " cancel");
                cancelled = true;
            }
        }
    }
    
    //测试代码
    private void testMethod() {
        final ExecutorTest executorTest = new ExecutorTest();
        executorTest.doSomething();
        executorTest.doSomething();
        executorTest.doSomething();
    
        new Handler().postDelayed(new Runnable() {
            @Override
            public void run() {
                executorTest.cancel();
            }
        }, 3000);
    }
    
    //输出log
    2019-07-18 16:54:48.461 zwm, thread: pool-2-thread-1 start
    2019-07-18 16:54:48.463 zwm, thread: pool-2-thread-3 start
    2019-07-18 16:54:48.463 zwm, thread: pool-2-thread-2 start
    2019-07-18 16:54:51.463 zwm, thread: main cancel
    2019-07-18 16:54:51.463 zwm, thread: pool-2-thread-2 end
    2019-07-18 16:54:51.463 zwm, thread: pool-2-thread-3 end
    2019-07-18 16:54:51.464 zwm, thread: pool-2-thread-1 end
    

    中断:
    线程中断是一种协作机制,线程可以通过这种机制来通知另一个线程,告诉它在合适的或者可能的情况下停止当前工作,并转而执行其他的工作。

    每个线程都有一个boolean类型的中断状态。当中断线程时,这个线程的中断状态将被设置为true。在Thread中包含了中断线程以及查询线程中断状态的方法:

    public class Thread {
        public void interrupt() {...} //中断目标线程
        public boolean isInterrupted() {...} //返回目标线程的中断状态
        public static boolean interrupted() {...} //清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法
    }
    

    调用interrupt并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息,然后由线程在下一个合适的时刻中断自己。这些时刻也被称为取消点。有些方法,例如wait、sleep和join等,将严格地处理这种请求,当它们收到中断请求或者在开始执行时发现某个已被设置好的中断状态时,将抛出一个异常。设计良好的方法可以完全忽略这种请求,只要它们能使调用代码对中断请求进行某种处理。设计糟糕的方法可能会屏蔽中断请求,从而导致调用栈中的其他代码无法对中断请求作出响应。在使用静态的interrupted时应该小心,因为它会清除当前线程的中断状态。如果在调用interrupted时返回了true,那么除非你想屏蔽这个中断,否则必须对它进行处理——可以抛出InterruptedException,或者通过再次调用interrupt来恢复中断状态。

    通常,中断是实现取消的最合理方式。

    //ExecutorTest
    public class ExecutorTest {
        private final ExecutorService executorService = Executors.newFixedThreadPool(3);
        private static final Task task = new Task();
    
        public void doSomething() {
            executorService.execute(task);
        }
    
        public void cancel() {
            executorService.shutdownNow();
        }
    
        public static class Task implements Runnable {
            @Override
            public void run() {
                Log.d("Task", "zwm, thread: " + Thread.currentThread().getName() + " start");
                while(!Thread.currentThread().isInterrupted()) { //线程中断条件
    
                }
                Log.d("Task", "zwm, thread: " + Thread.currentThread().getName() + " end");
            }
        }
    }
    
    //测试代码
    private void testMethod() {
        final ExecutorTest executorTest = new ExecutorTest();
        executorTest.doSomething();
        executorTest.doSomething();
        executorTest.doSomething();
    
        new Handler().postDelayed(new Runnable() {
            @Override
            public void run() {
                executorTest.cancel();
            }
        }, 3000);
    }
    
    //输出log
    2019-07-18 19:25:53.552 zwm, thread: pool-2-thread-1 start
    2019-07-18 19:25:53.554 zwm, thread: pool-2-thread-2 start
    2019-07-18 19:25:53.556 zwm, thread: pool-2-thread-3 start
    2019-07-18 19:25:56.559 zwm, thread: pool-2-thread-3 end
    2019-07-18 19:25:56.559 zwm, thread: pool-2-thread-2 end
    2019-07-18 19:25:56.560 zwm, thread: pool-2-thread-1 end
    

    中断策略:
    当检查到中断请求时,任务并不需要放弃所有的操作——它可以推迟处理中断请求,并直到某个更合适的时刻。因此需要记住中断请求,并在完成当前任务后抛出InterruptedException或者表示已收到中断请求。这项技术能够确保在更新过程中发生中断时,数据结构不会被破坏。

    由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断这个线程。

    响应中断:
    在调用可中断的阻塞函数时,例如Thread.sleep或BlockingQueue.put等,有两种使用策略可用于处理InterruptedException:

    • 传递异常(可能在执行某个特定任务的清除操作之后),从而使你的方法也成为可中断的阻塞方法。
    • 恢复中断状态,从而使调用栈中的上层代码能够对其进行处理。

    如果不想或无法传递InterruptedException(或许通过Runnable来定义任务),那么需要寻找另一种方式来保存中断请求。一种标准的方法就是通过再次调用interrupt来恢复中断状态。只有实现了线程中断策略的代码才可以屏蔽中断请求。在常规的任务和库代码中都不应该屏蔽中断请求。

    通过Future来实现取消:
    当Future.get抛出InterruptedException或TimeoutException时,如果你知道不再需要结果,那么就可以调用Future.cancel来取消任务。

    //ExecutorTest
    public class ExecutorTest {
        private final ExecutorService executorService = Executors.newFixedThreadPool(3);
    
        public void doSomething() {
            Future<String> future = executorService.submit(new Callable<String>() {
    
                @Override
                public String call() {
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " run");
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " InterruptedException");
                    }
                    return Thread.currentThread().getName();
                }
            });
    
            try {
                String result = future.get(3000, TimeUnit.MILLISECONDS);
                Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + ", result: " + result);
            } catch (InterruptedException e) {
                Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " InterruptedException");
            } catch (TimeoutException e) {
                Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " TimeoutException");
            } catch (ExecutionException e) {
                Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " ExecutionException");
            } finally {
                future.cancel(true); //如果任务已经结束,那么执行取消操作也不会带来任何影响
                                     //如果任务正在运行,那么将被中断
            }
        }
    }
    
    //测试代码
    private void testMethod() {
        final ExecutorTest executorTest = new ExecutorTest();
        executorTest.doSomething();
    }
    
    //输出log
    2019-07-18 20:25:37.411 zwm, thread: pool-2-thread-1 run
    2019-07-18 20:25:40.409 zwm, thread: main TimeoutException
    2019-07-18 20:25:40.410 zwm, thread: pool-2-thread-1 InterruptedException
    

    处理不可中断的阻塞:
    在Java库中,许多可阻塞的方法都是通过提前返回或者抛出InterruptedException来响应中断请求的,从而使开发人员更容易构建出能响应取消请求的任务。然而,并非所有的可阻塞方法或者阻塞机制都能响应中断,具体如下:

    • java.io包中的同步Socket I/O。
      在服务器应用程序中,最常见的阻塞I/O形式就是对套接字进行读取和写入。虽然InputStream和OutputStream中的read和write等方法都不会响应中断,但通过关闭底层的套接字,可以使得由于执行read或write等方法而被阻塞的线程抛出一个SocketException。
    • java.io包中的同步I/O。
      当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptException并关闭链路(这还会使得其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException)。当关闭一个InterruptibleChannel时,将导致所有在链路操作上阻塞的线程都抛出AsynchronousCloseException。大多数标准的Channel都实现了InterruptibleChannel。
    • Selector的异步I/O。
      如果一个线程在调用Selector.select方法(在java.nio.channels中)时阻塞了,那么调用close或wakeup方法会使线程抛出ClosedSelectorException并提前返回。
    • 获得某个锁。
      如果一个线程由于等待某个内置锁而阻塞,那么将无法响应中断,因为线程认为它肯定会获得锁,所以将不会理会中断请求。但是,在Lock类中提供了lockInterruptibly方法,该方法允许在等待一个锁的同时仍能响应中断。

    2.停止基于线程的服务

    线程有一个相应的所有者,即创建该线程的类。因此线程池是其工作者线程的所有者,如果要中断这些线程,那么应该使用线程池。

    关闭ExecutorService:
    在ExecutorService中提供了shutdown正常关闭和shutdownNow强行关闭。在进行强行关闭时,shutdownNow首先关闭当前正在执行的任务,然后返回所有尚未启动的任务清单。这两种关闭方式的差别在于各自的安全性和响应性:强行关闭的速度更快,但风险也更大,因为任务很可能在执行到一半时被结束;而正常关闭虽然速度慢,但却更安全,因为ExecutorService会一直等到队列中的所有任务都执行完成后才关闭。

    //ExecutorTest
    public class ExecutorTest {
        private final ExecutorService executorService = Executors.newFixedThreadPool(3);
    
        public void doSomething() {
             executorService.submit(new Callable<String>() {
    
                @Override
                public String call() {
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " run");
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " InterruptedException");
                    }
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " end");
                    return Thread.currentThread().getName();
                }
            });
        }
    
        public void stop() {
            try {
                Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " stop");
                executorService.shutdown();
                Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + ", executorService.isTerminated 1: " + executorService.isTerminated());
                executorService.awaitTermination(2000, TimeUnit.MILLISECONDS);
                Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + ", executorService.isTerminated 2: " + executorService.isTerminated());
            } catch (InterruptedException e) {
                Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " InterruptedException");
            }
        }
    }
    
    //测试代码
    private void testMethod() {
        final ExecutorTest executorTest = new ExecutorTest();
        executorTest.doSomething();
        new Handler().postDelayed(new Runnable() {
            @Override
            public void run() {
                executorTest.stop();
            }
        }, 2000);
    }
    
    //输出log
    2019-07-19 09:59:17.118 zwm, thread: pool-2-thread-1 run
    2019-07-19 09:59:19.119 zwm, thread: main stop
    2019-07-19 09:59:19.120 zwm, thread: main, executorService.isTerminated 1: false
    2019-07-19 09:59:21.120 zwm, thread: main, executorService.isTerminated 2: false
    2019-07-19 09:59:22.119 zwm, thread: pool-2-thread-1 end
    

    "毒丸"对象:
    另一种关闭生产者-消费者服务的方式就是使用"毒丸"对象。"毒丸"对象是指一个放在队列上的对象,当得到这个对象时,立即停止。在FIFO队列中,"毒丸"对象将确保消费者在关闭之前首先完成队列中的所有工作,在提交"毒丸"对象之前提交的所有工作都会被处理,而生产者在提交了"毒丸"对象后,将不会再提交任何工作。

    //ExecutorTest
    public class ExecutorTest {
        private final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(10);
    
        public void produce() {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " run");
                    try {
                        for (int i = 0; i < 5; i++) {
                            blockingQueue.put(String.valueOf(i));
                        }
                        blockingQueue.put("task-end");
                    } catch (InterruptedException e) {
    
                    }
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " end");
                }
            });
            thread.start();
        }
    
        public void consume() {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " run");
                    while(!Thread.currentThread().isInterrupted()) {
                        String result = null;
                        try {
                            result = blockingQueue.take();
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " InterruptedException");
                            Thread.currentThread().interrupt();
                        }
    
                        Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " result: " + result);
                        if(TextUtils.equals(result, "task-end")) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " end");
                }
            });
            thread.start();
        }
    }
    
    //测试代码
    private void testMethod() {
        final ExecutorTest executorTest = new ExecutorTest();
        executorTest.produce();
        executorTest.consume();
    }
    
    //输出log
    2019-07-19 10:35:56.321 zwm, thread: Thread-10 run
    2019-07-19 10:35:56.323 zwm, thread: Thread-10 end
    2019-07-19 10:35:56.324 zwm, thread: Thread-11 run
    2019-07-19 10:35:58.325 zwm, thread: Thread-11 result: 0
    2019-07-19 10:36:00.326 zwm, thread: Thread-11 result: 1
    2019-07-19 10:36:02.327 zwm, thread: Thread-11 result: 2
    2019-07-19 10:36:04.327 zwm, thread: Thread-11 result: 3
    2019-07-19 10:36:06.329 zwm, thread: Thread-11 result: 4
    2019-07-19 10:36:08.331 zwm, thread: Thread-11 result: task-end
    2019-07-19 10:36:08.331 zwm, thread: Thread-11 end
    

    只有在生产者和消费者的数量都已知的情况下,才可以使用"毒丸"对象。然而,当生产者和消费者的数量较大时,这种方法将变得难以使用。只有在无界队列中,"毒丸"对象才能可靠地工作。

    3.处理非正常的线程终止

    导致线程提前死亡的最主要原因就是RuntimeException。由于这些异常表示出现了某种编程错误或者其他不可修复的错误,因此它们通常不会被捕获。它们不会在调用栈中逐层传递,而是默认地在控制台中输出栈追踪信息,并终止线程。任何代码都可能抛出一个RuntimeException。每当调用另一个方法时,都要对它的行为保持怀疑,不要盲目地认为它一定会正常返回,或者一定会抛出在方法原型中声明的某个已检查异常。对调用的代码越不熟悉,就越应该对其代码行为保持怀疑。

    在Thread API中提供了UncaughtExceptionHandler,它能检测出某个线程由于未捕获的异常而终结的情况。

    //ExecutorTest
    public class ExecutorTest {
        private final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(10);
    
        public void doSomething() {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " run");
                    int i = 0;
                    int result = 100 / i;
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " result: " + result);
                }
            });
            thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
            thread.start();
        }
    
        private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                Log.d("ExecutorTest", "zwm, Thread: " + t.getName() + ", Throwable: " + e.getMessage());
            }
        };
    }
    
    //测试代码
    private void testMethod() {
        final ExecutorTest executorTest = new ExecutorTest();
        executorTest.doSomething();
    }
    
    //输出log
    2019-07-19 11:10:36.103 27509-27646/com.tomorrow.testnetworkcache D/ExecutorTest: zwm, thread: Thread-10 run
    2019-07-19 11:10:36.106 27509-27646/com.tomorrow.testnetworkcache E/AndroidRuntime: FATAL EXCEPTION: Thread-10
        Process: com.tomorrow.testnetworkcache, PID: 27509
        java.lang.ArithmeticException: divide by zero
            at com.tomorrow.testnetworkcache.ExecutorTest$1.run(ExecutorTest.java:18)
            at java.lang.Thread.run(Thread.java:764)
    2019-07-19 11:10:36.106 27509-27646/com.tomorrow.testnetworkcache D/ExecutorTest: zwm, Thread: Thread-10, Throwable: divide by zero
    

    4.JVM关闭

    JVM既可以正常关闭,也可以强行关闭。正常关闭的触发方式有多种,包括:当最后一个"正常(非守护)"线程结束时,或者当调用了System.exit时,或者通过其他特定于平台的方法关闭时(例如发送了SIGINT信号或键入Ctrl-C)。虽然可以通过这些标准方法来正常关闭JVM,但也可以通过调用Runtime.halt或者在操作系统中"杀死"JVM进程(例如发送SIGKILL)来强行关闭JVM。

    关闭钩子:
    在正常关闭中,JVM首先调用所有已注册的关闭钩子(Shutdown Hook)。关闭钩子是指通过Runtime.addShutdownHook注册的但尚未开始的线程。JVM并不能保证关闭钩子的调用顺序。在关闭应用程序线程时,如果有(守护和非守护)线程仍然在运行,那么这些线程接下来将与关闭进程并发执行。当所有的关闭钩子都执行结束时,如果runFinalizersOnExit为true,那么JVM将运行终结器,然后再停止。JVM并不会停止或中断任何在关闭时仍然运行的应用程序线程。当JVM最终结束时,这些线程将被强行结束。如果关闭钩子或终结器没有执行完成,那么正常关闭进程"挂起"并且JVM必须被强行关闭。当被强行关闭时,只是关闭JVM,而不会运行关闭钩子。

    关闭钩子应该是线程安全的:它们在访问共享数据时必须使用同步机制,并且小心地避免发生死锁,这与其他并发代码的要求相同。而且,关闭钩子不应该对应用程序的状态(例如,其他服务是否已经关闭,或者所有的正常线程是否已经执行完成)或者JVM的关闭原因做出任何假设,因此在编写关闭钩子的代码时必须考虑周全。最后,关闭钩子必须尽快退出,因为它们会延迟JVM的结束时间,而用户可能希望JVM能尽快终止。

    关闭钩子可以用于实现服务或应用程序的清理工作,例如删除临时文件,或者清除无法由操作系统自动清除的资源。

    //ExecutorTest
    public class ExecutorTest {
        private final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(10);
    
        public void doSomething() {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " run");
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " end");
                }
            });
            Runtime.getRuntime().addShutdownHook(thread);
        }
    }
    
    //测试代码
    private void testMethod() {
        Log.d(TAG, "zwm, testMetod");
        final ExecutorTest executorTest = new ExecutorTest();
        executorTest.doSomething();
        new Handler().postDelayed(new Runnable() {
            @Override
            public void run() {
                Log.d(TAG, "zwm, System exit");
                System.exit(0);
            }
        }, 3000);
    }
    
    //输出log
    2019-07-19 13:12:05.022 zwm, testMetod
    2019-07-19 13:12:08.023 zwm, System exit
    2019-07-19 13:12:08.028 zwm, thread: Thread-10 run
    2019-07-19 13:12:11.028 zwm, thread: Thread-10 end
    

    守护线程:
    有时候,你希望创建一个线程来执行一些辅助工作,但又不希望这个线程阻碍JVM的关闭。在这种情况下就需要使用守护线程(Deamon Thread)。

    线程可分为两种:普通线程和守护线程。在JVM启动时创建的所有线程中,除了主线程以外,其他的线程都是守护线程(例如垃圾回收器以及其他执行辅助工作的线程)。当创建一个新线程时,新线程将继承创建它的线程的守护状态,因此在默认情况下,主线程创建的所有线程都是普通线程。普通线程与守护线程之间的差异仅在于当线程退出时发生的操作。当一个线程退出时,JVM会检查其他正在运行的线程,如果这些线程都是守护线程,那么JVM会正常退出操作。当JVM停止时,所有仍然存在的守护线程都将被抛弃——既不会执行finally代码块,也不会执行回卷栈,而JVM只是直接退出。

    我们应尽可能少地使用守护线程。守护线程通常不能用来替代应用程序管理程序中各个服务的生命周期。

    终结器:
    终结器可以在某个由JVM管理的线程中运行,因此终结器访问的任何状态都可能被多个线程访问,这样就必须对其访问操作进行同步。终结器并不能保证它们将在何时运行甚至是否会运行,并且复杂的终结器通常还会在对象上产生巨大的性能开销。

    避免使用终结器。

    三、线程池的使用

    1.在任务与执行策略之间的隐形耦合

    只有当任务都是同类型的并且相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则将可能造成"拥塞"。如果提交的任务依赖于其他任务,那么除非线程池无限大,否则将可能造成死锁。

    线程饥饿死锁:
    只要线程池中的任务需要无限期地等待一些必须由池中其他任务才能提供的资源或条件,例如某个任务等待另一个任务的返回值或执行结果,那么除非线程池足够大,否则将发生线程饥饿死锁。

    每当提交了一个有依赖性的Executor任务时,要清楚地知道可能会出现线程"饥饿"死锁,因此需要在代码或配置Executor的配置文件中记录线程池的大小限制或配置规则。

    运行时间较长的任务:
    如果任务阻塞的时间过长,那么即使不出现死锁,线程池的响应性也会变得糟糕。

    在平台类库的大多数可阻塞方法中,都同时定义了限时版本和无限时版本,例如Thread.join、BlockingQueue.put、CountdownLatch.await以及Selector.select等。如果等待超时,那么可以把任务标识为失败,然后中止任务或者将任务重新放回队列以便随后执行。

    2.设置线程池的大小

    线程池的理想大小取决于被提交任务的类型以及所部属系统的特性。在代码中通常不会固定线程池的大小,而应该通过某种配置机制来提供,或者根据Runtime.availableProcessors来动态计算。

    3.配置ThreadPoolExecutor

    ThreadPoolExecutor为一些Executor提供了基本的实现,这些Executor是由Executors中的newCachedThreadPool、newFixedThreadPool和newScheduledThreadExecutor等工厂方法返回的。ThreadPoolExecutor是一个灵活的、稳定的线程池,允许进行各种定制。

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectExecutionHandler handler) {...}
    

    线程的创建与销毁:
    线程池的基本大小(Core Pool Size)、最大大小(Maximum Pool Size)以及存活时间等因素共同负责线程的创建和销毁。基本大小就是线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。线程池的最大大小表示可同时活动的线程数量的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。通过调节线程池的基本大小和存活时间,可以帮助线程池回收空闲线程占有的资源,从而使得这些资源可以用于执行其他工作。

    newFixedThreadPool工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时。newCachedThreadPool工厂方法将线程池的最大大小设置为Integer.MAX_VALUE,而将基本大小设置为零,并将超时设置为1分钟,这种方法创建出来的线程池可以被无限扩展,并且当需求降低时会自动收缩。其他形式的线程池可以通过显示的ThreadPoolExecutor构造函数来构造。

    管理队列任务:
    ThreadPoolExecutor允许提供一个BlockingQueue来保存等待执行的任务。基本的任务排队方法有3种:无界队列、有界队列和同步移交。队列的选择与其他的配置参数有关,例如线程池的大小等。

    newFixedThreadPool和newSingleThreadExecutor在默认情况下将使用一个无界的LinkedBlockingQueue。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过了线程池处理它们的速度,那么队列将无限制地增加。一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue。对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。只有当线程池是无界的或者可以拒绝任务时,SynchronousQueue才有实际价值。在newCachedThreadPool工厂方法中就使用了SynchronousQueue。

    当使用像LinkedBlockingQueue或ArrayBlockingQueue这样的FIFO(先进先出)队列时,任务的执行顺序与它们的到达顺序相同。如果想进一步控制任务执行顺序,还可以使用PriorityBlockingQueue,这个队列将根据优先级来安排任务。

    对于Executor,newCachedThreadPool工厂方法是一种很好的默认选择,它能提供比固定大小的线程池更好的排队性能。当需要限制当前任务的数量以满足资源管理需求时,那么可以选择固定大小的线程池,就像在接受网络客户请求的服务器应用程序中,如果不进行限制,那么很容易发生过载问题。只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么有界的线程池或队列就可能导致"饥饿"死锁问题。此时应该使用无界的线程池,例如newCachedThreadPool。

    饱和策略:
    当有界队列被填满后,饱和策略开始发挥作用。ThreadPoolExecutor的饱和策略可以通过调用setRejectedExecutionHandler来修改。(如果某个任务被提交到一个已被关闭的Executor时,也会用到饱和策略。)JDK提供了几种不同的RejectedExecutionHandler实现,每种实现都包含有不同的饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy和DiscardOldersPolicy。

    • AbortPolicy
      默认的饱和策略,该策略将抛出未检查的RejectedExecutionException。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。
    • CallerRunsPolicy
      该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。
    • DiscardPolicy
      当新提交的任务无法保存到队列中等待执行时,会抛弃该任务。
    • DiscardOldestPolicy
      当新提交的任务无法保存到队列中等待执行时,会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。如果工作队列是一个优先队列,那么将会抛弃优先级最高的任务。

    创建一个固定大小的线程池,并采用有界队列以及"调用者运行"饱和策略:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(N_THREADS, N_THREADS, 
                                        0L, TimeUnit.MILLISECONDS, 
                                        new LinkedBlockingQueue<Runnable>(CAPACITY));
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    

    线程工厂:
    每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊的配置信息。通过指定一个线程工厂方法,可以定制线程池的配置信息。在ThreadFactory中只定义了一方法newThread,每当线程池需要创建一个新线程时都会调用这个方法。

    public interface ThreadFactory {
        Thread newThread(Runnable r);
    }
    

    在许多情况下都需要使用定制的线程工厂方法。例如,你希望为线程池中的线程指定一个UncaughtExceptionHandler,或者实例化一个定制的Thread类用于执行调试信息的记录。你还可能希望修改线程的优先级(不推荐这么做)或者守护状态(不推荐这么做)。或许你只是希望给线程取一个更有意义的名称,用来解释线程的转储信息和错误日志。

    //ExecutorTest
    public class ExecutorTest {
        private final ExecutorService executorService = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new MyThreadFactory("TomyThreadPool"));
    
        public void doSomething() {
            Log.d("ExecutorTest", "zwm, doSomething");
            executorService.execute(new Runnable() {
    
                @Override
                public void run() {
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " run");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " InterruptedException");
                    }
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " end");
                }
            });
        }
    }
    
    //MyThreadFactory
    public class MyThreadFactory implements ThreadFactory {
        private final String poolName;
    
        public MyThreadFactory(String poolName) {
            this.poolName = poolName;
        }
    
        @Override
        public Thread newThread(Runnable r) {
            return new MyAppThread(r, poolName);
        }
    }
    
    //MyAppThread
    public class MyAppThread extends Thread {
        public static final String DEFAULT_NAME = "MyAppThread";
        private static volatile boolean debugLifecycle = false;
        private static final AtomicInteger created = new AtomicInteger();
        private static final AtomicInteger alive = new AtomicInteger();
    
        public MyAppThread(Runnable r) {
            this(r, DEFAULT_NAME);
        }
    
        public MyAppThread(Runnable r,String name) {
            super(r, name + "-" + created.incrementAndGet());
            setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){
                public void uncaughtException(Thread t,Throwable e){
                    Log.d(DEFAULT_NAME, "zwm, UNCAUGHT in thread " + t.getName(), e);
                }
            });
        }
    
        @Override
        public void run(){
            boolean debug = debugLifecycle;
            if(debug){
                Log.d(DEFAULT_NAME, "zwm, Created " + getName());
            }
            try {
                alive.incrementAndGet();
                super.run();
                Log.d(DEFAULT_NAME, "zwm, super run end");
            } finally {
                alive.decrementAndGet();
                if(debug){
                    Log.d(DEFAULT_NAME, "zwm, Exiting " + getName());
                }
            }
        }
    
        public static int getThreadCreated(){
            return created.get();
        }
    
        public static int getThreadAlive(){
            return alive.get();
        }
    
        public static boolean getDebug(){
            return debugLifecycle;
    
        }
    
        public static void setDebug(boolean b){
            debugLifecycle = b;
        }
    }
    
    //测试代码
    private void testMethod() {
        MyAppThread.setDebug(true);
        Log.d(TAG, "zwm, MyAppThread getDebug: " + MyAppThread.getDebug());
        ExecutorTest executorTest = new ExecutorTest();
        executorTest.doSomething();
        executorTest.doSomething();
        new Handler().postDelayed(new Runnable() {
            @Override
            public void run() {
                Log.d(TAG, "zwm, MyAppThread getThreadCreated: " + MyAppThread.getThreadCreated());
                Log.d(TAG, "zwm, MyAppThread getThreadAlive: " + MyAppThread.getThreadAlive());
            }
        }, 3000);
    }
    
    //输出log
    07-20 16:50:50.034 zwm, MyAppThread getDebug: true
    07-20 16:50:50.035 zwm, doSomething
    07-20 16:50:50.037 zwm, doSomething
    07-20 16:50:50.046 zwm, Created TomyThreadPool-2
    07-20 16:50:50.046 zwm, Created TomyThreadPool-1
    07-20 16:50:50.046 zwm, thread: TomyThreadPool-1 run
    07-20 16:50:50.050 zwm, thread: TomyThreadPool-2 run
    07-20 16:50:51.046 zwm, thread: TomyThreadPool-1 end
    07-20 16:50:51.051 zwm, thread: TomyThreadPool-2 end
    07-20 16:50:53.039 zwm, MyAppThread getThreadCreated: 2
    07-20 16:50:53.039 zwm, MyAppThread getThreadAlive: 2
    

    如果在应用程序中需要利用安全策略来控制对某些特殊代码库的访问权限,那么可以通过Executors中的privilegedThreadFactory工厂来定制自己的线程工厂。通过这种方式创建出来的线程,将与创建privilegedThreadFactory的线程拥有相同的访问权限、AccessControlContext和contextClassLoader。如果不使用privilegedThreadFactory,线程池创建的线程将从在需要新线程时调用execute或submit的客户程序中继承访问权限,从而导致令人困惑的安全性异常。

    在调用构造函数后再定制ThreadPoolExecutor:
    在调用完ThreadPoolExecutor的构造函数后,仍然可以通过设置函数(Setter)来修改大多数传递给它的构造函数的参数(例如线程池的基本大小、最大大小、存活时间、线程工厂以及拒绝执行处理器(Rejected Execution Handler))。如果Executor是通过Executors中的某个(newSingleThreadExecutor除外)工厂方法创建的,那么可以将结果的类型转换为ThreadPoolExecutor以访问设置器。

    ExecutorService exec = Executors.newCachedThreadPool();
    if(exec instanceof ThreadPoolExecutor) {
        ((ThreadPoolExecutor)exec).setCorePoolSize(10);
    } else {
        throw new AssertionError("Oops, bad assumption");
    }
    

    在Executors中包含一个unconfigurableExecutorService工厂方法,该方法对一个现有的ExecutorService进行包装,使其只暴露出ExecutorService的方法,因此不能对它进行配置。newSingleThreadExecutor返回按这种方式封装的ExecutorService,而不是最初的ThreadPoolExecutor。虽然单线程的Executor实际上被实现为一个只包含唯一线程的线程池,但它同样确保了不会并发地执行任务。如果在代码中增加单线程Executor的线程池大小,那么将破坏它的执行语义。可以在自己的Executor中使用这项技术以防止执行策略被修改。

    4.扩展ThreadPoolExecutor

    ThreadPoolExecutor是可扩展的,它提供了几个可以在子类化中改写的方法:beforeExecute、afterExecute和terminated,这些方法可以用于扩展ThreadPoolExecutor的行为。

    在执行任务的线程中将调用beforeExecute和afterExecute等方法,在这些方法中还可以添加日志、计时、监视或统计信息收集的功能。无论任务是从run中正常返回,还是抛出一个异常而返回,afterExecute都会被调用。(如果任务在完成后带有一个Error,那么就不会调用afterExecute。)如果beforeExecute抛出一个RuntimeException,那么任务将不被执行,并且afterExecute也不会被调用。在线程池完成关闭操作时调用terminated,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后。terminated可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志或者收集finalize统计信息等操作。

    //ExecutorTest
    public class ExecutorTest {
        public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    
            public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
                super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
            }
    
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                Log.d("MyThreadPoolExecutor", "zwm, beforeExecute, thread: " + t.getName());
                super.beforeExecute(t, r);
            }
    
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                Log.d("MyThreadPoolExecutor", "zwm, afterExecute");
                super.afterExecute(r, t);
            }
    
            @Override
            protected void terminated() {
                Log.d("MyThreadPoolExecutor", "zwm, terminated");
                super.terminated();
            }
        }
    
        private final ExecutorService executorService = new MyThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new MyThreadFactory("TomyThreadPool"));
    
        public void doSomething() {
            Log.d("ExecutorTest", "zwm, doSomething");
            executorService.execute(new Runnable() {
    
                @Override
                public void run() {
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " run");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " InterruptedException");
                    }
                    Log.d("ExecutorTest", "zwm, thread: " + Thread.currentThread().getName() + " end");
                }
            });
        }
    
        public void stop() {
            Log.d("ExecutorTest", "zwm, stop executor!");
            executorService.shutdown();
        }
    }
    
    //MyThreadFactory
    public class MyThreadFactory implements ThreadFactory {
        private final String poolName;
    
        public MyThreadFactory(String poolName) {
            this.poolName = poolName;
        }
    
        @Override
        public Thread newThread(Runnable r) {
            return new MyAppThread(r, poolName);
        }
    }
    
    //MyAppThread
    public class MyAppThread extends Thread {
        public static final String DEFAULT_NAME = "MyAppThread";
        private static volatile boolean debugLifecycle = false;
        private static final AtomicInteger created = new AtomicInteger();
        private static final AtomicInteger alive = new AtomicInteger();
    
        public MyAppThread(Runnable r) {
            this(r, DEFAULT_NAME);
        }
    
        public MyAppThread(Runnable r,String name) {
            super(r, name + "-" + created.incrementAndGet());
            setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){
                public void uncaughtException(Thread t,Throwable e){
                    Log.d(DEFAULT_NAME, "zwm, UNCAUGHT in thread " + t.getName(), e);
                }
            });
        }
    
        @Override
        public void run(){
            boolean debug = debugLifecycle;
            if(debug){
                Log.d(DEFAULT_NAME, "zwm, Created " + getName());
            }
            try {
                alive.incrementAndGet();
                super.run();
                Log.d(DEFAULT_NAME, "zwm, super run end");
            } finally {
                alive.decrementAndGet();
                if(debug){
                    Log.d(DEFAULT_NAME, "zwm, Exiting " + getName());
                }
            }
        }
    
        public static int getThreadCreated(){
            return created.get();
        }
    
        public static int getThreadAlive(){
            return alive.get();
        }
    
        public static boolean getDebug(){
            return debugLifecycle;
    
        }
    
        public static void setDebug(boolean b){
            debugLifecycle = b;
        }
    }
    
    //测试代码
    private void testMethod() {
        MyAppThread.setDebug(true);
        Log.d(TAG, "zwm, MyAppThread getDebug: " + MyAppThread.getDebug());
        final ExecutorTest executorTest = new ExecutorTest();
        executorTest.doSomething();
        executorTest.doSomething();
        new Handler().postDelayed(new Runnable() {
            @Override
            public void run() {
                Log.d(TAG, "zwm, MyAppThread getThreadCreated: " + MyAppThread.getThreadCreated());
                Log.d(TAG, "zwm, MyAppThread getThreadAlive: " + MyAppThread.getThreadAlive());
                executorTest.stop();
            }
        }, 3000);
    }
    
    //输出log
    07-20 17:39:36.175 D/MainActivity: zwm, MyAppThread getDebug: true
    07-20 17:39:36.176 D/ExecutorTest: zwm, doSomething
    07-20 17:39:36.177 D/ExecutorTest: zwm, doSomething
    07-20 17:39:36.178 D/MyAppThread: zwm, Created TomyThreadPool-1
    07-20 17:39:36.178 D/MyThreadPoolExecutor: zwm, beforeExecute, thread: TomyThreadPool-1
    07-20 17:39:36.178 D/ExecutorTest: zwm, thread: TomyThreadPool-1 run
    07-20 17:39:36.182 D/MyAppThread: zwm, Created TomyThreadPool-2
    07-20 17:39:36.182 D/MyThreadPoolExecutor: zwm, beforeExecute, thread: TomyThreadPool-2
    07-20 17:39:36.182 D/ExecutorTest: zwm, thread: TomyThreadPool-2 run
    07-20 17:39:37.179 D/ExecutorTest: zwm, thread: TomyThreadPool-1 end
    07-20 17:39:37.179 D/MyThreadPoolExecutor: zwm, afterExecute
    07-20 17:39:37.182 D/ExecutorTest: zwm, thread: TomyThreadPool-2 end
    07-20 17:39:37.182 D/MyThreadPoolExecutor: zwm, afterExecute
    07-20 17:39:39.179 D/MainActivity: zwm, MyAppThread getThreadCreated: 2
    07-20 17:39:39.180 D/MainActivity: zwm, MyAppThread getThreadAlive: 2
    07-20 17:39:39.180 D/ExecutorTest: zwm, stop executor!
    07-20 17:39:39.181 D/MyAppThread: zwm, super run end
    07-20 17:39:39.181 D/MyAppThread: zwm, Exiting TomyThreadPool-1
    07-20 17:39:39.182 D/MyThreadPoolExecutor: zwm, terminated
    07-20 17:39:39.182 D/MyAppThread: zwm, super run end
    07-20 17:39:39.182 D/MyAppThread: zwm, Exiting TomyThreadPool-2
    

    相关文章

      网友评论

          本文标题:Java并发编程 -- 结构化并发应用程序

          本文链接:https://www.haomeiwen.com/subject/jgwvlctx.html