美文网首页
20.执行器

20.执行器

作者: 0x70e8 | 来源:发表于2018-08-08 10:16 被阅读0次

    任务

    任务通常是一些抽象且离散的逻辑工作单元。当围绕任务执行来构建并发程序时,需要找到任务的边界,使得每个任务尽可能与其他任务独立开来,这样能够独立地在单独的线程中执行,提高并发性。

    线程池

    线程池是指管理一组同构工作线程的资源池,线程池与工作队列是密切相关的,在工作队列中保存了所有等待执行的任务。工作者线程的任务就是从工作队列中取出一个任务执行,执行结束返回线程池等待下一个任务。

    在线程池中的线程不是根据任务临时创建的,而是事先准备好一组线程,等待任务的出现,执行完任务并不随着任务结束而销毁,而是返回线程池等待下次任务的复用。这减少了线程频繁创建和销毁的开销,同时因为线程池的大小限制,也限制住系统中最大的并发量。

    为什么要使用线程池

    1. 构建和销毁一个线程是要与操作系统交互的,这个成本是很高的;
    2. 活跃的线程会消耗系统资源,尤其是内存。如果可运行的线程对于处理器的数量,那么就会闲置一些线程。大量的线程会占用大量的内存空间,也会给垃圾回收带来压力,因为线程往往生命周期很短。大量的线程竞争CPU也会带来性能的问题;
    3. 可创建的线程数量在各个平台上有不同的限制,如果破坏了限制,可能会抛出OOM异常而终止程序。在一定范围内,增加线程可以提高系统的吞吐量,但是物极必反,再创建更多的线程只会降低程序的执行速度,过多的创建线程可能会使系统崩溃。

    任务执行策略

    任务是一组逻辑工作单元,需要依附于运行线程被执行。任务的执行策略有简单粗暴的把所有任务放入单线程中顺序执行、为每个任务创建一个线程执行、使用异步任务执行框架来执行。

    单线程执行简单、但非常不高效且可能会因为一个任务的错误导致整个任务阻塞。每个任务创建线程的缺点上节已经阐述。最佳选择是选择异步执行框架也就是基于线程池的任务执行框架。

    Executor框架

    Executor是一个简单的函数式接口:

    public interface Executor {
    
        /**
         * Executes the given command at some time in the future.  The command
         * may execute in a new thread, in a pooled thread, or in the calling
         * thread, at the discretion of the {@code Executor} implementation.
         *
         * @param command the runnable task
         * @throws RejectedExecutionException if this task cannot be
         * accepted for execution
         * @throws NullPointerException if command is null
         */
        void execute(Runnable command);
    }
    

    它提供一种标准的方法将任务的提交和执行过程解耦,使用Runnalbe作为任务的抽象。

    Executor还提供了对周期性任务的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。

    Executor基于生产者-消费者模式,提交任务相当于生产者,执行任务相当于消费者。

    Executor有许多静态工厂方法用来构建线程池(返回的是ExecutorService):

    方法 描述
    newCachedThreadPool 必要时创建新线程,空闲线程会被保留60秒
    newFixedThreadPool 创建指定数量线程的线程池,且线程一直保留
    newSingleTheadPool 只有一个线程的线程池,改线程顺序执行提交的每一个任务
    newScheduledThreadPool 用于预定执行而构建的线程池,用于替代java.util.Timer
    newSingleThreadScheduledExecutor 用于执行预定任务的单线程池。

    线程池是执行框架的实现的一部分,作为消费者角色,执行器将任务分发给线程池执行。

    当用完一个线程池时,需要将线程池关闭,否则JVM将无法退出。关闭线程池有两种方法:

    • shutdown

    此方法是平缓的关闭方式,不再接受新的任务,等待以已经启动的任务结束,当所有的任务完成,线程池中的线程死亡。

    • shutdownNow

    暴力关闭方式,取消尚未开始的任务并试图中断正在运行的线程。

    ExecutorService的生命周期有三种:运行、关闭、终止。Executor初始创建时处于运行状态,执行shutdown之后进入关闭状态,等所有任务都完成后进入终止状态。可以调用awaitTermination等待ExecutorService到达终止状态,或者使用isTerminated轮询状态。

    使用线程池的一般逻辑:

    1. 调用Executors类的静态方法newCachedThreadPool或者newFixedThreadPoo
    2. 调用submit提交任务(Runnable或Callable对象)
    3. 如果想要取消一个任务,或者提交Callable对象,要保存好返回的Future对象
    4. 当不再提交新任务时,调用shutdown。

    ExecutorService

    public interface ExecutorService extends Executor {
    
        void shutdown();
        
        List<Runnable> shutdownNow();
        
        boolean isShutdown();
        
        boolean isTerminated();
        
        boolean awaitTermination(long timeout, TimeUnit unit)    throws InterruptedException;
        
        <T> Future<T> submit(Callable<T> task);
        
        <T> Future<T> submit(Runnable task, T result);
     
        Future<?> submit(Runnable task);
        /*
        * 提交所有对象到一个Callable对象的集合中,并返回一个Future对象列表,代表所有任务的Future对象。
        * 可以使用ExecutorCompletionService对结果按可获得的顺序排序。
        */
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)         throws InterruptedException;
         
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)  throws InterruptedException;
        /*
        *提交所有对象到一个Callable对象的集合中,并返回某个已经完成的任务的结果;
        * 这个结果无法知道是哪个任务返回的,返回值之后,这个任务组就结束了。
        */
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    Callable和Future

    Executor以Runnable为任务抽象,但是Runnable方法不能抛出异常且没有返回值。Callable是一种更好的任务抽象,它认为主入口点将返回一个值并可能抛出一个异常。

    Future表示一个任务的生命周期,并提供了方法来判断是否已经完成或取消,以及获取任务的结果或者取消任务等。Future需要和Callable合作使用才能获取返回结果。

    CompletionService控制任务组

    如果向Executor提交了一组任务,并且希望在任务执行完成后获得结果,那么可以保留每个任务的Future对象,然后通过调用get方法获取执行结果。CompletionService提供了更好的方法来完成此需求。

    CompletionService将Executor和BlockingQueue的功能融合在一起。可以将Callable任务提交给它执行,然后使用类似队列操作的take和poll方法获取已完成的结果,而且这些结果在完成时会被封装成Future对象。

    ExecutorCompletionService实现了CompletionService,并将计算部分委托给Executor。

    CompletionService<ImageData> completionService = new ExecutorCompletionService<>(executor);
    completionServie.submit(()->{//...});
    
    Future<ImageInfo> f = completionService.take();
    

    为任务设置时限

    利用Future的限时get方法。

    异构任务的并行问题

    各个任务执行时间可能差距较大,导致并行任务的时间依赖于最久执行时间的任务。

    Fork-join框架

    并行任务执行框架。
    要采用框架可用的一种方式完成递归计算,需要通过一个扩展RecursiveTasK<T>的类或者提供一个扩展RecursiveAction的类,再覆盖compute方法来生成和调用子任务,然后合并结果。

    可完成Future

    处理非阻塞调用的传统方法是使用事件监听器,为任务完成之后要出现的动作注册一个处理器,如果下一个动作也是异步的,在它之后的下一个动作会在一个不同的事件处理器中。这样虽然在功能上不会有什么问题,但是这个流程的代码可能分散到各处。
    Java8的CompletableFuture类提供了一种候选方法,可完成Future可以组合。

        @Test
        public void testCompletableFuture() {
    
            String res = CompletableFuture.supplyAsync(() -> "hello").thenApplyAsync((a) -> a + "world").join();
            System.out.println(res);
        }
    

    相关文章

      网友评论

          本文标题:20.执行器

          本文链接:https://www.haomeiwen.com/subject/nkdtbftx.html