美文网首页
Java Executors.newFixedThreadPoo

Java Executors.newFixedThreadPoo

作者: xiaofudeng | 来源:发表于2017-11-19 22:52 被阅读0次

    Exectuor

    接口只有一个方法

    public interface Executor {
    
        /**
         * Executes the given command at some time in the future.  The command
         * may execute in a new thread, in a pooled thread, or in the calling
         * thread, at the discretion of the {@code Executor} implementation.
         *
         * @param command the runnable task
         * @throws RejectedExecutionException if this task cannot be
         * accepted for execution
         * @throws NullPointerException if command is null
         */
        void execute(Runnable command);
    }
    

    ExecutorService

    继承了接口Exectuor, 定义了一些用于管理性的方法. 比如说shutdown系列方法, 同时返回Future类型的对象来追踪任务的进展情况.
    ExecutorService, 因为太长了,去掉了注释, 仅留下了方法签名, 后面有一些方法的实例.

    public interface ExecutorService extends Executor {
    
    
        void shutdown();
        
        List<Runnable> shutdownNow();
    
        boolean isShutdown();
        
        boolean isTerminated();
    
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        <T> Future<T> submit(Callable<T> task);
    
        <T> Future<T> submit(Runnable task, T result);
    
        Future<?> submit(Runnable task);
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
    
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
    
        
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    
        
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    Future

    public interface Future<V> {
    
        boolean cancel(boolean mayInterruptIfRunning);
    
    
        boolean isCancelled();
    
    
        boolean isDone();
    
    
        V get() throws InterruptedException, ExecutionException;
    
    
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    Executors.newFixedThreadPool

    该工厂方法返回的实际类型:

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    submit(Runnable task)

        /**
         * Submits a Runnable task for execution and returns a Future
         * representing that task. The Future's {@code get} method will
         * return {@code null} upon <em>successful</em> completion.
         *
         * @param task the task to submit
         * @return a Future representing pending completion of the task
         * @throws RejectedExecutionException if the task cannot be
         *         scheduled for execution
         * @throws NullPointerException if the task is null
         */
        Future<?> submit(Runnable task);
    

    因为Runnable中的run方法是没有返回值的, 但是为了统一形式(返回Future对象), 所以该submit方法依然会返回一个Future对象. 可以通过该Future对象的get方法来获取任务的运行情况, 如果返回的是null, 说明任务成功运行结束, 其他情况调用get方法之后会抛出对应的异常, 后面有例子会说明.
    简单例子:
    成功的情况, future.get()直接返回了null.

        public static void testSubmitRunnable() throws ExecutionException, InterruptedException {
            Runnable task = () -> {
                try {
                    System.out.println("Task is running!");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            Future<?> future = executorService.submit(task);
            // null 表示成功
            System.out.println("future.get()");
            // get() blocks until the runnable finishes
            System.out.println(future.get());
        }
    

    输出:

    future.get()
    Task is running!
    null
    

    submit(Runnable task, T result)

    相比上面的submit, 多了一个T result参数. 即当任务成功完成之后, future.get()会返回该result (而不是前面的null).

        /**
         * Submits a Runnable task for execution and returns a Future
         * representing that task. The Future's {@code get} method will
         * return the given result upon successful completion.
         *
         * @param task the task to submit
         * @param result the result to return
         * @param <T> the type of the result
         * @return a Future representing pending completion of the task
         * @throws RejectedExecutionException if the task cannot be
         *         scheduled for execution
         * @throws NullPointerException if the task is null
         */
        <T> Future<T> submit(Runnable task, T result);
    

    例子略, 因为和上面的submit基本一致.

    Future.get()

        /**
         * Waits if necessary for the computation to complete, and then
         * retrieves its result.
         *
         * @return the computed result
         * @throws CancellationException if the computation was cancelled
         * @throws ExecutionException if the computation threw an
         * exception
         * @throws InterruptedException if the current thread was interrupted
         * while waiting
         */
        V get() throws InterruptedException, ExecutionException;
    
        /**
         * Waits if necessary for at most the given time for the computation
         * to complete, and then retrieves its result, if available.
         *
         * @param timeout the maximum time to wait
         * @param unit the time unit of the timeout argument
         * @return the computed result
         * @throws CancellationException if the computation was cancelled
         * @throws ExecutionException if the computation threw an
         * exception
         * @throws InterruptedException if the current thread was interrupted
         * while waiting
         * @throws TimeoutException if the wait timed out
         */
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    

    因为两个get方法都是阻塞的, 不同的是后者提供了一个超时操作, 并在超时以后抛出TimeoutException异常, 用于通知用户超时.
    其他异常的抛出情况:

    • CancellationException
      如果该任务已经被cancel了, 通过Future.cancel来进行取消, 那么再调用get的时候就会抛出该异常.
    • ExecutionException
      如果我们传进去的Runnable在执行的过程中抛出了异常, 那么Future.get()就会得到该异常, 可以从ExecutionException中的getCause得到抛出的异常的信息.
    • InterruptedException
      因为该方法是阻塞的, 所以可能有等待过程. 如果在等待过程中, 当前线程被interrupt了, 那么就会抛出该异常.

    所以get()方法主要是通过异常来通知用户任务的执行情况. 如果一切正常, 那么返回null(当调用submit(Runnable)时)或者我们传进去的某个结果(submit(Runnable, T result)), 或者我们使用的是Callable, 那么返回的就是任务的返回值.
    实例:

        public static void testSubmitRunnable2() throws InterruptedException, ExecutionException {
    
            // 在Runnable中故意抛出一个异常
            Runnable task = () -> {
                throw new RuntimeException("This exception is deliberately thrown");
            };
            // 用于调用`submit(Runnable, result)方法
            Boolean flag = true;
            Future<Boolean> future = executorService.submit(task, flag);
            // 调用Future.get()
            try {
                System.out.println("future.get()");
                System.out.println(future.get());
            } catch (ExecutionException e) {
                System.out.println("The task threw a exception");
                System.out.print("And the cause is: ");
                System.out.println(e.getCause().getMessage());
            }
    
            // 成功后返回值为我们传进去的值
            task = () -> {};
            future = executorService.submit(task, flag);
            System.out.println("return value from future.get(): " +  future.get());
    
            // 演示取消后task后, 再调用get方法
            task = () -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            future = executorService.submit(task, flag);
            // cancel的参数用于指定是否取消已经在执行的任务
            future.cancel(true);
            try {
                future.get();
            } catch (CancellationException e){
                System.out.println("The task is cancelled");
            }
            
            // 超时的情况
            task = () -> {
                try {
                    Thread.sleep(10000000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
    
            future = executorService.submit(task, flag);
    
            try {
                future.get(1000, TimeUnit.MILLISECONDS);
            } catch (TimeoutException e) {
                System.out.println("the task is not finished within the given time");
                // 这里也可以看到cancel(true)的效果
                // 即使任务已经在执行了, 还是会通过interrupt去打断该任务
                future.cancel(true);
            }
        }
        
        
    

    输出:

    future.get()
    The task threw a exception
    And the cause is: This exception is deliberately thrown
    return value from future.get(): true
    The task is cancelled
    the task is not finished within the given time
    java.lang.InterruptedException: sleep interrupted
    

    使用Callable的情况

    Callable其实和Runnable基本一致, 只是能够返回值. 而且ExecutorService可以将返回值放入到Future中, 我们可以通过get方法得到.

    public interface Callable<V> {
        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }
    

    AbstractExecutorService中对三种方法签名的实现:

        /**
         * Returns a {@code RunnableFuture} for the given callable task.
         *
         * @param callable the callable task being wrapped
         * @param <T> the type of the callable's result
         * @return a {@code RunnableFuture} which, when run, will call the
         * underlying callable and which, as a {@code Future}, will yield
         * the callable's result as its result and provide for
         * cancellation of the underlying task
         * @since 1.6
         */
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    
        /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
        /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    

    newTaskFor:

        /**
         * Returns a {@code RunnableFuture} for the given runnable and default
         * value.
         *
         * @param runnable the runnable task being wrapped
         * @param value the default value for the returned future
         * @param <T> the type of the given value
         * @return a {@code RunnableFuture} which, when run, will run the
         * underlying runnable and which, as a {@code Future}, will yield
         * the given value as its result and provide for cancellation of
         * the underlying task
         * @since 1.6
         */
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
        /**
         * Returns a {@code RunnableFuture} for the given callable task.
         *
         * @param callable the callable task being wrapped
         * @param <T> the type of the callable's result
         * @return a {@code RunnableFuture} which, when run, will call the
         * underlying callable and which, as a {@code Future}, will yield
         * the callable's result as its result and provide for
         * cancellation of the underlying task
         * @since 1.6
         */
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    

    FutureTask:

    /**
     * A cancellable asynchronous computation.  This class provides a base
     * implementation of {@link Future}, with methods to start and cancel
     * a computation, query to see if the computation is complete, and
     * retrieve the result of the computation.  The result can only be
     * retrieved when the computation has completed; the {@code get}
     * methods will block if the computation has not yet completed.  Once
     * the computation has completed, the computation cannot be restarted
     * or cancelled (unless the computation is invoked using
     * {@link #runAndReset}).
     *
     * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
     * {@link Runnable} object.  Because {@code FutureTask} implements
     * {@code Runnable}, a {@code FutureTask} can be submitted to an
     * {@link Executor} for execution.
     *
     * <p>In addition to serving as a standalone class, this class provides
     * {@code protected} functionality that may be useful when creating
     * customized task classes.
     *
     * @since 1.5
     * @author Doug Lea
     * @param <V> The result type returned by this FutureTask's {@code get} methods
     */
    public class FutureTask<V> implements RunnableFuture<V>{...}
    

    RunnableFuture:

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }
    

    相关文章

      网友评论

          本文标题:Java Executors.newFixedThreadPoo

          本文链接:https://www.haomeiwen.com/subject/wsqmvxtx.html