Java中的Future

作者: 薛定谔的猫Plus | 来源:发表于2018-09-28 14:24 被阅读0次

    1 概述

    Future代表异步计算返回的结果,提供了检查是否结束、等待结束以及获取计算结果的方法。Executor框架使用Runnable作为其任务表示形式,但它不能返回一个值。许多任务实际上都是存在延迟计算的:执行数据库查询,从网络上获取资源,或者某个复杂耗时的计算。对于这种任务,Callable是一个更好的抽象,他能返回一个值,并可能抛出一个异常。

    /**
     * A task that returns a result and may throw an exception.
     * Implementors define a single method with no arguments called
     * <tt>call</tt>.
     *
     * <p>The <tt>Callable</tt> interface is similar to {@link
     * java.lang.Runnable}, in that both are designed for classes whose
     * instances are potentially executed by another thread.  A
     * <tt>Runnable</tt>, however, does not return a result and cannot
     * throw a checked exception.
     *
     * <p> The {@link Executors} class contains utility methods to
     * convert from other common forms to <tt>Callable</tt> classes.
     *
     * @see Executor
     * @since 1.5
     * @author Doug Lea
     * @param <V> the result type of method <tt>call</tt>
     */
    public interface Callable<V> {
        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }
    
    public interface Future<V> {
    
        /**
         * Attempts to cancel execution of this task.  This attempt will
         * fail if the task has already completed, has already been cancelled,
         * or could not be cancelled for some other reason. If successful,
         * and this task has not started when <tt>cancel</tt> is called,
         * this task should never run.  If the task has already started,
         * then the <tt>mayInterruptIfRunning</tt> parameter determines
         * whether the thread executing this task should be interrupted in
         * an attempt to stop the task.
         *
         * <p>After this method returns, subsequent calls to {@link #isDone} will
         * always return <tt>true</tt>.  Subsequent calls to {@link #isCancelled}
         * will always return <tt>true</tt> if this method returned <tt>true</tt>.
         *
         * @param mayInterruptIfRunning <tt>true</tt> if the thread executing this
         * task should be interrupted; otherwise, in-progress tasks are allowed
         * to complete
         * @return <tt>false</tt> if the task could not be cancelled,
         * typically because it has already completed normally;
         * <tt>true</tt> otherwise
         */
        boolean cancel(boolean mayInterruptIfRunning);
    
        /**
         * Returns <tt>true</tt> if this task was cancelled before it completed
         * normally.
         *
         * @return <tt>true</tt> if this task was cancelled before it completed
         */
        boolean isCancelled();
    
        /**
         * Returns <tt>true</tt> if this task completed.
         *
         * Completion may be due to normal termination, an exception, or
         * cancellation -- in all of these cases, this method will return
         * <tt>true</tt>.
         *
         * @return <tt>true</tt> if this task completed
         */
        boolean isDone();
    
        /**
         * Waits if necessary for the computation to complete, and then
         * retrieves its result.
         *
         * @return the computed result
         * @throws CancellationException if the computation was cancelled
         * @throws ExecutionException if the computation threw an
         * exception
         * @throws InterruptedException if the current thread was interrupted
         * while waiting
         */
        V get() throws InterruptedException, ExecutionException;
    
        /**
         * Waits if necessary for at most the given time for the computation
         * to complete, and then retrieves its result, if available.
         *
         * @param timeout the maximum time to wait
         * @param unit the time unit of the timeout argument
         * @return the computed result
         * @throws CancellationException if the computation was cancelled
         * @throws ExecutionException if the computation threw an
         * exception
         * @throws InterruptedException if the current thread was interrupted
         * while waiting
         * @throws TimeoutException if the wait timed out
         */
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    可以通过多种方法来创建一个Future来描述任务。ExecutorService中的submit方法接受一个Runnable或者Callable,然后返回一个Future来获得任务的执行结果或者取消任务。

    public interface ExecutorService extends Executor {
    
       /**
        * Submits a value-returning task for execution and returns a
        * Future representing the pending results of the task. The
        * Future's <tt>get</tt> method will return the task's result upon
        * successful completion.
        *
        * <p>
        * If you would like to immediately block waiting
        * for a task, you can use constructions of the form
        * <tt>result = exec.submit(aCallable).get();</tt>
        *
        * <p> Note: The {@link Executors} class includes a set of methods
        * that can convert some other common closure-like objects,
        * for example, {@link java.security.PrivilegedAction} to
        * {@link Callable} form so they can be submitted.
        *
        * @param task the task to submit
        * @return a Future representing pending completion of the task
        * @throws RejectedExecutionException if the task cannot be
        *         scheduled for execution
        * @throws NullPointerException if the task is null
        */
       <T> Future<T> submit(Callable<T> task);
    
       /**
        * Submits a Runnable task for execution and returns a Future
        * representing that task. The Future's <tt>get</tt> method will
        * return the given result upon successful completion.
        *
        * @param task the task to submit
        * @param result the result to return
        * @return a Future representing pending completion of the task
        * @throws RejectedExecutionException if the task cannot be
        *         scheduled for execution
        * @throws NullPointerException if the task is null
        */
       <T> Future<T> submit(Runnable task, T result);
    
       /**
        * Submits a Runnable task for execution and returns a Future
        * representing that task. The Future's <tt>get</tt> method will
        * return <tt>null</tt> upon <em>successful</em> completion.
        *
        * @param task the task to submit
        * @return a Future representing pending completion of the task
        * @throws RejectedExecutionException if the task cannot be
        *         scheduled for execution
        * @throws NullPointerException if the task is null
        */
       Future<?> submit(Runnable task);
    
       /**
        * Executes the given tasks, returning a list of Futures holding
        * their status and results when all complete.
        * {@link Future#isDone} is <tt>true</tt> for each
        * element of the returned list.
        * Note that a <em>completed</em> task could have
        * terminated either normally or by throwing an exception.
        * The results of this method are undefined if the given
        * collection is modified while this operation is in progress.
        *
        * @param tasks the collection of tasks
        * @return A list of Futures representing the tasks, in the same
        *         sequential order as produced by the iterator for the
        *         given task list, each of which has completed.
        * @throws InterruptedException if interrupted while waiting, in
        *         which case unfinished tasks are cancelled.
        * @throws NullPointerException if tasks or any of its elements are <tt>null</tt>
        * @throws RejectedExecutionException if any task cannot be
        *         scheduled for execution
        */
    
       <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
           throws InterruptedException;
    
       /**
        * Executes the given tasks, returning a list of Futures holding
        * their status and results
        * when all complete or the timeout expires, whichever happens first.
        * {@link Future#isDone} is <tt>true</tt> for each
        * element of the returned list.
        * Upon return, tasks that have not completed are cancelled.
        * Note that a <em>completed</em> task could have
        * terminated either normally or by throwing an exception.
        * The results of this method are undefined if the given
        * collection is modified while this operation is in progress.
        *
        * @param tasks the collection of tasks
        * @param timeout the maximum time to wait
        * @param unit the time unit of the timeout argument
        * @return a list of Futures representing the tasks, in the same
        *         sequential order as produced by the iterator for the
        *         given task list. If the operation did not time out,
        *         each task will have completed. If it did time out, some
        *         of these tasks will not have completed.
        * @throws InterruptedException if interrupted while waiting, in
        *         which case unfinished tasks are cancelled
        * @throws NullPointerException if tasks, any of its elements, or
        *         unit are <tt>null</tt>
        * @throws RejectedExecutionException if any task cannot be scheduled
        *         for execution
        */
       <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)
           throws InterruptedException;
    
       /**
        * Executes the given tasks, returning the result
        * of one that has completed successfully (i.e., without throwing
        * an exception), if any do. Upon normal or exceptional return,
        * tasks that have not completed are cancelled.
        * The results of this method are undefined if the given
        * collection is modified while this operation is in progress.
        *
        * @param tasks the collection of tasks
        * @return the result returned by one of the tasks
        * @throws InterruptedException if interrupted while waiting
        * @throws NullPointerException if tasks or any element task
        *         subject to execution is <tt>null</tt>
        * @throws IllegalArgumentException if tasks is empty
        * @throws ExecutionException if no task successfully completes
        * @throws RejectedExecutionException if tasks cannot be scheduled
        *         for execution
        */
       <T> T invokeAny(Collection<? extends Callable<T>> tasks)
           throws InterruptedException, ExecutionException;
    
       /**
        * Executes the given tasks, returning the result
        * of one that has completed successfully (i.e., without throwing
        * an exception), if any do before the given timeout elapses.
        * Upon normal or exceptional return, tasks that have not
        * completed are cancelled.
        * The results of this method are undefined if the given
        * collection is modified while this operation is in progress.
        *
        * @param tasks the collection of tasks
        * @param timeout the maximum time to wait
        * @param unit the time unit of the timeout argument
        * @return the result returned by one of the tasks.
        * @throws InterruptedException if interrupted while waiting
        * @throws NullPointerException if tasks, or unit, or any element
        *         task subject to execution is <tt>null</tt>
        * @throws TimeoutException if the given timeout elapses before
        *         any task successfully completes
        * @throws ExecutionException if no task successfully completes
        * @throws RejectedExecutionException if tasks cannot be scheduled
        *         for execution
        */
       <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
           throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    假设我们通过一个方法从远程获取一些计算结果,假设方法是List getDataFromRemote(),如果采用同步的方法,代码大概是List data = getDataFromRemote(),我们将一直等待getDataFromRemote返回,然后才能继续后面的工作,这个函数是从远程获取计算结果的,如果需要很长时间,后面的代码又和这个数据没有什么关系的话,阻塞在那里就会浪费很多时间。我们有什么办法可以改进呢???

    能够想到的办法是调用函数后,立即返回,然后继续执行,等需要用数据的时候,再取或者等待这个数据。具体实现有两种方式,一个是用Future,另一个是回调。

    Future<List> future = getDataFromRemoteByFuture();
            //do something....
    List data = future.get();
    

    可以看到我们返回的是一个Future对象,然后接着自己的处理后面通过future.get()来获得我们想要的值。也就是说在执行getDataFromRemoteByFuture的时候,就已经启动了对远程计算结果的获取,同时自己的线程还继续执行不阻塞。知道获取时候再拿数据就可以。看一下getDataFromRemoteByFuture的实现:

    private Future<List> getDataFromRemoteByFuture() {
    
            return threadPool.submit(new Callable<List>() {
                @Override
                public List call() throws Exception {
                    return getDataFromRemote();
                }
            });
        }
    

    我们在这个方法中调用getDataFromRemote方法,并且用到了线程池。把任务加入线程池之后,理解返回Future对象。Future的get方法,还可以传入一个超时参数,用来设置等待时间,不会一直等下去。
    也可以利用FutureTask来获取结果:

    FutureTask<List> futureTask = new FutureTask<List>(new Callable<List>() {
                @Override
                public List call() throws Exception {
                    return getDataFromRemote();
                }
            });
    
            threadPool.submit(futureTask);
    
    
            futureTask.get();
    

    FutureTask是一个具体的实现类,ThreadPoolExecutor的submit方法返回的就是一个Future的实现,这个实现就是FutureTask的一个具体实例,FutureTask帮助实现了具体的任务执行,以及和Future接口中的get方法的关联。FutureTask除了帮助ThreadPool很好的实现了对加入线程池任务的Future支持外,也为我们提供了很大的便利,使得我们自己也可以实现支持Future的任务调度。

    Java本身没有提供Promise方式实现,在Scala以及Netty中都有Promise模式的实现,可以让用户控制future的完成状态(成功、失败等)以及future的返回值,参考文档中是一个自定义Promise的简单实现。

    Reference

    相关文章

      网友评论

        本文标题:Java中的Future

        本文链接:https://www.haomeiwen.com/subject/xhhpoftx.html