美文网首页
CloseableExecutorService

CloseableExecutorService

作者: 大道至简_Andy | 来源:发表于2016-09-21 00:18 被阅读92次

    对Old ExecutorService进行了装饰以创建一个可关闭Running的Future,Runnable的New ExecutorService

    import com.google.common.annotations.VisibleForTesting;
    import com.google.common.base.Preconditions;
    import com.google.common.collect.Maps;
    import com.google.common.collect.Sets;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.io.Closeable;
    import java.util.Iterator;
    import java.util.Set;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    /**
     * Decoration on an ExecutorService that tracks created futures and provides
     * a method to close futures created via this class
     */
    public class CloseableExecutorService implements Closeable
    {
        private final Logger log = LoggerFactory.getLogger(CloseableExecutorService.class);
        private final Set<Future<?>> futures = Sets.newSetFromMap(Maps.<Future<?>, Boolean>newConcurrentMap());
        private final ExecutorService executorService;
        private final boolean shutdownOnClose;
        protected final AtomicBoolean isOpen = new AtomicBoolean(true);
    
        protected class InternalScheduledFutureTask implements Future<Void>
        {
            private final ScheduledFuture<?> scheduledFuture;
    
            public InternalScheduledFutureTask(ScheduledFuture<?> scheduledFuture)
            {
                this.scheduledFuture = scheduledFuture;
                futures.add(scheduledFuture);
            }
    
            @Override
            public boolean cancel(boolean mayInterruptIfRunning)
            {
                futures.remove(scheduledFuture);
                return scheduledFuture.cancel(mayInterruptIfRunning);
            }
    
            @Override
            public boolean isCancelled()
            {
                return scheduledFuture.isCancelled();
            }
    
            @Override
            public boolean isDone()
            {
                return scheduledFuture.isDone();
            }
    
            @Override
            public Void get() throws InterruptedException, ExecutionException
            {
                return null;
            }
    
            @Override
            public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
            {
                return null;
            }
        }
    
        protected class InternalFutureTask<T> extends FutureTask<T>
        {
            private final RunnableFuture<T> task;
    
            InternalFutureTask(RunnableFuture<T> task)
            {
                super(task, null);
                this.task = task;
                futures.add(task);
            }
    
            protected void done()
            {
                futures.remove(task);
            }
        }
    
        /**
         * @param executorService the service to decorate
         */
        public CloseableExecutorService(ExecutorService executorService)
        {
           this(executorService, false);
        }
    
        /**
         * @param executorService the service to decorate
         * @param shutdownOnClose if true, shutdown the executor service when this is closed
         */
        public CloseableExecutorService(ExecutorService executorService, boolean shutdownOnClose)
        {
            this.executorService = Preconditions.checkNotNull(executorService, "executorService cannot be null");
            this.shutdownOnClose = shutdownOnClose;
        }
    
        /**
         * Returns <tt>true</tt> if this executor has been shut down.
         *
         * @return <tt>true</tt> if this executor has been shut down
         */
        public boolean isShutdown()
        {
            return !isOpen.get();
        }
    
        @VisibleForTesting
        int size()
        {
            return futures.size();
        }
    
        /**
         * Closes any tasks currently in progress
         */
        @Override
        public void close()
        {
            isOpen.set(false);
            Iterator<Future<?>> iterator = futures.iterator();
            while ( iterator.hasNext() )
            {
                Future<?> future = iterator.next();
                iterator.remove();
                if ( !future.isDone() && !future.isCancelled() && !future.cancel(true) )
                {
                    log.warn("Could not cancel " + future);
                }
            }
            if (shutdownOnClose) {
                executorService.shutdownNow();
            }
        }
    
        /**
         * Submits a value-returning task for execution and returns a Future
         * representing the pending results of the task.  Upon completion,
         * this task may be taken or polled.
         *
         * @param task the task to submit
         * @return a future to watch the task
         */
        public<V> Future<V> submit(Callable<V> task)
        {
            Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
    
            InternalFutureTask<V> futureTask = new InternalFutureTask<V>(new FutureTask<V>(task));
            executorService.execute(futureTask);
            return futureTask;
        }
    
        /**
         * Submits a Runnable task for execution and returns a Future
         * representing that task.  Upon completion, this task may be
         * taken or polled.
         *
         * @param task the task to submit
         * @return a future to watch the task
         */
        public Future<?> submit(Runnable task)
        {
            Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
    
            InternalFutureTask<Void> futureTask = new InternalFutureTask<Void>(new FutureTask<Void>(task, null));
            executorService.execute(futureTask);
            return futureTask;
        }
    }
    

    相关文章

      网友评论

          本文标题:CloseableExecutorService

          本文链接:https://www.haomeiwen.com/subject/esxwettx.html