美文网首页JavaAPIAndroid知识Android开发经验谈
Java多线程:Executor,Executors,Futur

Java多线程:Executor,Executors,Futur

作者: 一只好奇的茂 | 来源:发表于2017-06-27 14:44 被阅读384次

    平时工作中经常碰到个各种多线程,有时候搞不清它们之间到底有什么区别,这次来个总体的总结,主要是以下这些:
    Executor,Executors,ExecutorService, CompletionServie,Future,Callable,Runnable,FutureTask

    一、Runnable(interface)

    public interface Runnable {
        public void run();
    }
    

    run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。

    二、Callable (interface)

    public interface Callable<V> {
        V call() throws Exception;
    }
    

    与 Runnable 不同的是call()函数返回的类型就是传递进来的V类型,而且能够抛出异常。一般情况下是配合ExecutorService来使用的

    三、Future( interface)

    Future是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果、设置结果操作。

    public interface Future<V> {
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        V get() throws InterruptedException, ExecutionException;
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    
    • cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
    • isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
    • isDone方法表示任务是否已经完成,若任务完成,则返回true;
    • get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
    • get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

    也就是说Future提供了三种功能:

    • 判断任务是否完成;
    • 能够中断任务;
    • 能够获取任务执行结果。

    四、FutureTask(Runnable, Future<V>的具体实现)

    public class FutureTask<V> implements RunnableFuture<V> 。。。
    
    public interface RunnableFuture<V> extends Runnable, Future<V> {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }
    

    可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值,管理任务。
    其中有两个构造方法

       //接受一个 Callable 参数
       public FutureTask(Callable<V> callable) {
           if (callable == null)
               throw new NullPointerException();
           this.callable = callable;
           this.state = NEW;       // ensure visibility of callable
       }
    
       //接受一个 Runnable ,利用 Executors.callable 将Runnable 转换为Callable
       public FutureTask(Runnable runnable, V result) {
           this.callable = Executors.callable(runnable, result);
           this.state = NEW;       // ensure visibility of callable
       }
    

    具体使用可以参考 AsyncTask 中的使用

    五、Executor(interface)

    在Executor框架中,使用执行器(Exectuor)来管理Thread对象,从而简化了并发编程。并发编程的一种编程方式把任务拆分为一系列的小任务,即Runnable,然后将这些任务提交给一个Executor执行,Executor.execute(Runnalbe) 。Executor在执行时使用其内部的线程池来完成操作。

    Executor 接口中之定义了一个方法 execute(Runnable command),该方法接收一个 Runable 实例,它用来执行一个任务,任务即一个实现了 Runnable 接口的类。

    public interface Executor {
        void execute(Runnable command);
    }
    

    为了避免调用 new Thread(new RunnableTask()).start()这样的代码我们可以

    Executor executor = anExecutor;
    executor.execute(new RunnableTask1());
    executor.execute(new RunnableTask2());
    ...
    

    Executor 并不是严格的要求一步执行,我们可以简单的直接在调用者线程执行运行提交的任务

    class DirectExecutor implements Executor {
      public void execute(Runnable r) {
        r.run();    // 在调用者线程执行
    }}
    

    一般来说任务在非调用者的线程中执行,比如产生一个新的线程

    class ThreadPerTaskExecutor implements Executor {
      public void execute(Runnable r) {
        new Thread(r).start();   //新启一个线程,在非调用者线程中执行
    }}
    

    有很多Executor 的实现是为了实现任务的某种调度,比如 AsyncTask 中的串行任务队列

    class SerialExecutor implements Executor {
       final Queue tasks = new ArrayDeque<>();
       final Executor executor;
       Runnable active;
    
       SerialExecutor(Executor executor) {
         this.executor = executor;
       
    
       public synchronized void execute(final Runnable r) {
         tasks.add(new Runnable() {
           public void run() {
             try {
               r.run();
             } finally {
               scheduleNext();
             }
           }
         });
         if (active == null) {
           scheduleNext();
         }
       }
    
       protected synchronized void scheduleNext() {
         if ((active = tasks.poll()) != null) {
           executor.execute(active);
         }
       }
     }}
    

    六、ExecutorService(interface,继承自Executor)

    ExecutorService 接口继承自 Executor 接口,它提供了更丰富的实现多线程的方法,比如,ExecutorService 提供了关闭自己的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以调用 ExecutorService 的 shutdown()方法来平滑地关闭 ExecutorService,调用该方法后,将导致 ExecutorService 停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭 ExecutorService。因此我们一般用该接口来实现和管理多线程。

    execute(Runnable)  
    submit(Runnable)  
    submit(Callable)  
    invokeAny()  
    invokeAll() 
    

    execute(Runnable)
    方法 execute(Runnable) 接收一个java.lang.Runnable 对象作为参数,并且以异步的方式执行它。

    ExecutorService executorService = Executors.newSingleThreadExecutor();  
    executorService.execute(new Runnable() {  
        public void run() {  
            System.out.println("Asynchronous task");  
        }  
    });        
    executorService.shutdown();  
    

    使用这种方式没有办法获取执行 Runnable 之后的结果,如果你希望获取运行之后的返回值,就必须使用接收 Callable 参数的 execute() 方法。
    submit(Runnable)
    方法 submit(Runnable) 同样接收一个Runnable 的实现作为参数,但是会返回一个Future 对象。这个Future 对象可以用于判断 Runnable 是否结束执行。如下是一个ExecutorService 的 submit() 方法的例子:

    Future future = executorService.submit(new Runnable() {  
        public void run() {  
            System.out.println("Asynchronous task");  
        }  
    });  
    //如果任务结束执行则返回 null  
    System.out.println("future.get()=" + future.get());  
    

    submit(Callable)
    方法 submit(Callable) 和方法 submit(Runnable) 比较类似,但是区别则在于它们接收不同的参数类型。Callable 的实例与 Runnable 的实例很类似,但是 Callable 的 call() 方法可以返回一个结果。方法 Runnable.run() 则不能返回结果。
    Callable 的返回值可以从方法 submit(Callable) 返回的 Future 对象中获取。如下是一个 ExecutorService Callable 的样例:

    Future future = executorService.submit(new Callable(){  
        public Object call() throws Exception {  
            System.out.println("Asynchronous Callable");  
            return "Callable Result";  
        }  
    });  
    System.out.println("future.get() = " + future.get());  
    

    输出结果

    Asynchronous Callable  
    future.get() = Callable Result  
    

    inVokeAny()
    方法 invokeAny() 接收一个包含 Callable 对象的集合作为参数。调用该方法不会返回 Future 对象,而是返回集合中某一个Callable 对象的结果,而且无法保证调用之后返回的结果是哪一个 Callable,只知道它是这些 Callable 中一个执行结束的 Callable 对象。如果一个任务运行完毕或者抛出异常,方法会取消其它的 Callable 的执行。
    以下是一个样例:

    ExecutorService executorService = Executors.newSingleThreadExecutor();  
    Set<Callable<String>> callables = new HashSet<Callable<String>>();  
    
    callables.add(new Callable<String>() {  
        public String call() throws Exception {  
            return "Task 1";  
        }  
    });  
    callables.add(new Callable<String>() {  
        public String call() throws Exception {  
            return "Task 2";  
        }  
    });  
    callables.add(new Callable<String>() {  
        public String call() throws Exception {  
            return "Task 3";  
        }  
    });  
    String result = executorService.invokeAny(callables);  
    System.out.println("result = " + result);  
    executorService.shutdown();  
    

    输出结果:
    以上样例代码会打印出在给定的集合中的某一个Callable 的返回结果。尝试运行后发现每次结果都在改变。有时候返回结果是"Task 1",有时候是"Task 2",等等。
    invokeAll()
    方法 invokeAll() 会调用存在于参数集合中的所有 Callable 对象,并且返回一个包含 Future 对象的集合,你可以通过这个返回的集合来管理每个 Callable 的执行结果。需要注意的是,任务有可能因为异常而导致运行结束,所以它可能并不是真的成功运行了。但是我们没有办法通过 Future 对象来了解到这个差异。
    ExecutorService服务的关闭(shutdown() 或 shutdownNow())
    当使用 ExecutorService 完毕之后,我们应该关闭它,这样才能保证线程不会继续保持运行状态。

    举例来说,如果你的程序通过 main() 方法启动,并且主线程退出了你的程序,
    如果还有一个活动的 ExecutorService 存在于程序中,那么程序将会继续保持运行状态。存在于 ExecutorService 中的活动线程会阻止Java虚拟机关闭。

    为了关闭在 ExecutorService 中的线程,需要调用** shutdown() **方法。但ExecutorService 并不会马上关闭,而是不再接收新的任务,一旦所有的线程结束执行当前任务,ExecutorServie 才会真的关闭。所有在调用 shutdown() 方法之前提交到 ExecutorService 的任务都会执行。

    如果你希望立即关闭 ExecutorService,你可以调用** shutdownNow() **方法。这个方法会尝试马上关闭所有正在执行的任务,并且跳过所有已经提交但是还没有运行的任务。但是对于正在执行的任务,是否能够成功关闭它是无法保证的,有可能他们真的被关闭掉了,也有可能它会一直执行到任务结束。这是一个最好的尝试。

    ExecutorService 接口在 java.util.concurrent 包中有如下实现类:
    ThreadPoolExecutor
    ScheduledThreadPoolExecutor

    七、Executors(class)

    Executors 提供了一系列工厂方法用于创先线程池,返回的线程池都实现了 ExecutorService 接口。

    public static ExecutorService newFixedThreadPool(int nThreads)
    // 创建固定数目线程的线程池。
    
    public static ExecutorService newCachedThreadPool()
    // 创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。
    // 如果现有线程没有可用的,则创建一个新线 程并添加到池中。
    // 终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
    
    public static ExecutorService newSingleThreadExecutor()
    // 创建一个单线程化的Executor。
    
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
    // 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
    

    Executors的使用:

    ExecutorService executorService = Executors.newFixedThreadPool(10);  
    executorService.execute(new Runnable() {  
        public void run() {  
            System.out.println("Asynchronous task");  
        }  
    });  
    executorService.shutdown();  
    

    该示例代码首先使用 newFixedThreadPool() 工厂方法创建一个ExecutorService ,上述代码创建了一个可以容纳10个线程任务的线程池。其次,向 execute() 方法中传递一个异步的 Runnable 接口的实现,这样做会让 ExecutorService 中的某个线程执行这个Runnable 线程。

    八、CompletionServie

    为什么需要CompletionServie
    如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:完成服务(Completion service)。

    CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future。ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor。
    CompletionService与ExecutorService的对比使用

    public class Main {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    //        case1();
    //        case2();
            case3();
        }
    
    
        /**
         * <一>
         * 1. 用List收集任务结果 (List记录每个submit返回的Future)
         * 2. 循环查看结果, Future不一定完成, 如果没有完成, 那么调用get会租塞
         * 3. 如果排在前面的任务没有完成, 那么就会阻塞, 这样后面已经完成的任务就没法获得结果了, 导致了不必要的等待时间.
         *    更为严重的是: 第一个任务如果几个小时或永远完成不了, 而后面的任务几秒钟就完成了, 那么后面的任务的结果都将得不到处理
         *
         * 导致: 已完成的任务可能得不到及时处理
         */
        private static void case1() throws ExecutionException, InterruptedException {
            final Random random = new Random();
            ExecutorService service = Executors.newFixedThreadPool(10);
            List<Future<String>> taskResultHolder = new ArrayList<>();
            for(int i=0; i<50; i++) {
                //搜集任务结果
                taskResultHolder.add(service.submit(new Callable<String>() {
                    public String call() throws Exception {
                        Thread.sleep(random.nextInt(5000));
                        return Thread.currentThread().getName();
                    }
                }));
            }
            // 处理任务结果
            int count = 0;
            System.out.println("handle result begin");
            for(Future<String> future : taskResultHolder) {
                System.out.println(future.get());
                count++;
            }
            System.out.println("handle result end");
            System.out.println(count + " task done !");
    
            //关闭线程池
            service.shutdown();
        }
    
        /**
         * <二> 只对第一种情况进行的改进
         *      1. 查看任务是否完成, 如果完成, 就获取任务的结果, 让后重任务列表中删除任务.
         *      2. 如果任务未完成, 就跳过此任务, 继续查看下一个任务结果.
         *      3. 如果到了任务列表末端, 那么就从新回到任务列表开始, 然后继续从第一步开始执行
         *
         *      这样就可以及时处理已完成任务的结果了
         */
        private static void case2() throws ExecutionException, InterruptedException {
            final Random random = new Random();
            ExecutorService service = Executors.newFixedThreadPool(10);
            List<Future<String>> results = new ArrayList<>();
    
            for(int i=0; i<50; i++) {
                Callable<String> task = new Callable<String>() {
                    public String call() throws Exception {
                        Thread.sleep(random.nextInt(5000)); //模拟耗时操作
                        return Thread.currentThread().getName();
                    }
                };
                Future<String> future = service.submit(task);
                results.add(future); // 搜集任务结果
            }
    
            int count = 0;
            //自旋, 获取结果
            System.out.println("handle result begin");
            for(int i=0; i<results.size(); i++) {
                Future<String> taskHolder = results.get(i);
    
                if(taskHolder.isDone()) { //任务完成
                    String result = taskHolder.get(); //获取结果, 进行某些操作
                    System.out.println("result: " + result);
                    results.remove(taskHolder);
                    i--;
    
                    count++; //完成的任务的计数器
                }
    
                //回到列表开头, 从新获取结果
                if(i == results.size() - 1) i = -1;
            }
            System.out.println("handle result end");
            System.out.println(count + " task done !");
    
            //线程池使用完必须关闭
            service.shutdown();
        }
    
    
        /**
         * <三> 使用ExecutorCompletionService管理异步任务
         * 1. Java中的ExecutorCompletionService<V>本身有管理任务队列的功能
         *    i. ExecutorCompletionService内部维护列一个队列, 用于管理已完成的任务
         *    ii. 内部还维护列一个Executor, 可以执行任务
         *
         * 2. ExecutorCompletionService内部维护了一个BlockingQueue, 只有完成的任务才被加入到队列中
         *
         * 3. 任务一完成就加入到内置管理队列中, 如果队列中的数据为空时, 调用take()就会阻塞 (等待任务完成)
         *    i. 关于完成任务是如何加入到完成队列中的, 请参考ExecutorCompletionService的内部类QueueingFuture的done()方法
         *
         * 4. ExecutorCompletionService的take/poll方法是对BlockingQueue对应的方法的封装, 关于BlockingQueue的take/poll方法:
         *    i. take()方法, 如果队列中有数据, 就返回数据, 否则就一直阻塞;
         *    ii. poll()方法: 如果有值就返回, 否则返回null
         *    iii. poll(long timeout, TimeUnit unit)方法: 如果有值就返回, 否则等待指定的时间; 如果时间到了如果有值, 就返回值, 否则返回null
         *
         * 解决了已完成任务得不到及时处理的问题
         */
        static void case3() throws InterruptedException, ExecutionException {
            Random random = new Random();
    
            ExecutorService service = Executors.newFixedThreadPool(10);
            ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(service);
    
            for(int i=0; i<50; i++) {
                completionService.submit(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        Thread.sleep(random.nextInt(5000));
                        return Thread.currentThread().getName();
                    }
                });
            }
    
            int completionTask = 0;
            while(completionTask < 50) {
                //如果完成队列中没有数据, 则阻塞; 否则返回队列中的数据
                Future<String> resultHolder = completionService.take();
                System.out.println("result: " + resultHolder.get());
                completionTask++;
            }
    
            System.out.println(completionTask + " task done !");
    
            //ExecutorService使用完一定要关闭 (回收资源, 否则系统资源耗尽! .... 呵呵...)
            service.shutdown();
        }
    }
    

    九、参考

    Java并发编程:Callable、Future和FutureTask
    Java中的Runnable、Callable、Future、FutureTask的区别与示例
    Java并发编程 - Executor,Executors,ExecutorService, CompletionServie,Future,Callable

    相关文章

      网友评论

      • 小强大草莓:浅显易懂,之前对这部分一直很模糊,总结的很系统,条理很清晰。感谢。
        一只好奇的茂:@小强大草莓 谢谢喜欢,欢迎加V讨论学习Android。:smiley:

      本文标题:Java多线程:Executor,Executors,Futur

      本文链接:https://www.haomeiwen.com/subject/ilsscxtx.html