美文网首页
如何例用Executor创建后台线程池

如何例用Executor创建后台线程池

作者: 朱兰婷 | 来源:发表于2020-08-18 14:25 被阅读0次

    Executor

    执行已提交的Runnable任务的对象接口。这个接口提供了一种将任务提交和任务运行机制(包括线程使用、调度等的详细信息)分离的方法。

    public interface Executor {
    
        /**
         * 在将来的某些时候执行给定任务。根据Executor的实现判定是在一个新线程还是线程池还是调用线 
         * 程中执行。
         */
        void execute(Runnable command);
    }
    

    通常用Executor来代替显示的创建Thread。如要在单独的线程执行某项任务时,使用

        public void test() {
            Executor executor = new ThreadPerTaskExecutor();
            executor.execute(new RunnableTask1());
        }
    
        class ThreadPerTaskExecutor implements Executor {
            public void execute(Runnable r) {
                new Thread(r).start();
            }
        }
    

    而不是

    new Thread(new RunnableTask()).start()
    

    很多Executor的实现都对任务的执行方式和时间强加的限制,如下面定义了一个顺序执行任务的Executor:

    class SerialExecutor implements Executor {
            final Queue<Runnable> tasks = new ArrayDeque<>();
            final Executor executor;
            Runnable active;
    
            SerialExecutor(Executor executor) {
                this.executor = executor;
            }
    
            public synchronized void execute(final Runnable r) {
                tasks.add(() -> {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                });
                if (active == null) {
                    scheduleNext();
                }
            }
    
            protected synchronized void scheduleNext() {
                if ((active = tasks.poll()) != null) {
                    executor.execute(active);
                }
            }
        }
    

    java.util.concurrent包中提供的Executor的实现ExecutorService是一个更广泛的被使用的接口。ThreadPoolExecutor提供一个可扩展的线程池实现。Executors类为这些Executor提供了方便使用的工厂方法。

    下面为java.util.concurrent包中Executor框架的主要类关系图: Executor框架.png

    ExecutorService

    1. 一个Executor;
    2. 提供shutdown()方法终止Executor的执行;
    3. 提供submit()方法返回一个Future来追踪异步任务的执行进度;
    4. 提供invokeAll()方法返回一个Future列表来追踪多个异步任务的执行进度。

    如果我们现在创建了一个异步执行的网络任务:

        class NetworkService implements Runnable {
            private final ServerSocket serverSocket;
            private final ExecutorService pool;
    
            public NetworkService(int port, int poolSize)
                    throws IOException {
                serverSocket = new ServerSocket(port);
                pool = Executors.newFixedThreadPool(poolSize);
            }
    
            public void run() { // run the service
                try {
                    for (; ; ) {
                        pool.execute(new Handler(serverSocket.accept()));
                    }
                } catch (IOException ex) {
                    pool.shutdown();
                }
            }
        }
    
        class Handler implements Runnable {
            private final Socket socket;
    
            Handler(Socket socket) {
                this.socket = socket;
            }
    
            public void run() {
                // read and service request on socket
            }
        }
    

    可以调用shutdown()方法终止它的执行:

        void shutdownAndAwaitTermination(ExecutorService pool) {
            pool.shutdown(); // Disable new tasks from being submitted
            try {
                // Wait a while for existing tasks to terminate
                if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                    pool.shutdownNow(); // Cancel currently executing tasks
                    // Wait a while for tasks to respond to being cancelled
                    if (!pool.awaitTermination(60, TimeUnit.SECONDS))
                        System.err.println("Pool did not terminate");
                }
            } catch (InterruptedException ie) {
                // (Re-)Cancel if current thread also interrupted
                pool.shutdownNow();
                // Preserve interrupt status
                Thread.currentThread().interrupt();
            }
        }
    
    ScheduledExecutorService
    1. 一个ExecutorService;
    2. 其scheduleXXX()方法创建并执行ScheduledFuture;
    3. 该ScheduledFuture可以在给定的延迟后变为启动状态,并开始执行任务;
    4. 该ScheduledFuture可以定期执行任务;
    5. 通过这个ScheduledFuture可以get任务执行结果,可以cancel这个任务。

    如需要创建一个在一小时内每隔10s就响一次的报警器:

        class BeeperControl {
            private final ScheduledExecutorService scheduler =
                    Executors.newScheduledThreadPool(1);
    
            public void beepForAnHour() {
                final Runnable beeper = () -> System.out.println("beep");
    
                final ScheduledFuture<?> beeperHandle =
                        scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
    
                scheduler.schedule(() -> {
                    beeperHandle.cancel(true);
                }, 60 * 60, SECONDS);
            }
        }
    
    AbstractExecutorService
    1. 一个ExecutorService;
    2. 其submit、invokeAny、invokeAll方法在执行任务前,把任务封装成RunnableFuture,方便跟踪执行进度。
        /**
         * Returns a {@code RunnableFuture} for the given runnable and default
         * value.
         *
         * @param runnable the runnable task being wrapped
         * @param value the default value for the returned future
         * @param <T> the type of the given value
         * @return a {@code RunnableFuture} which, when run, will run the
         * underlying runnable and which, as a {@code Future}, will yield
         * the given value as its result and provide for cancellation of
         * the underlying task
         * @since 1.6
         */
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    

    ExecutorService的实现

    ThreadPoolExecutor

    1. 一个AbstractExecutorService;
    2. 使用线程池中的某个线程执行提交的任务,该线程池通常由Executors中的工厂方法创建;
    3. FIFO;
    4. 其execute的设计思想为:
    if(threadCount < 池中需要在空闲状态也要保持的线程数){
          new thread;
          把当前任务做为新建线程的第一个任务执行;
    } else if(任务放入等待队列){
         在队列中等待在运行的线程将它取出并执行;
    } else if(threadCount < 池中最多可拥有的线程数){ //此时意味着等待队列已满,放入失败
         new thread;
         把当前任务做为新建线程的第一个任务执行;
    } else {
         拒绝该任务的执行,如抛出异常;
    }
    
    1. 线程池中维持corePoolSize数量的线程数,如果有线程在线程池shutdown之前因为执行失败而中断,则在执行后续任务时会新建一个替代它的线程。

      具体原理如下: ThreadPoolExecutor.png
    其中定义了线程池的control state(ctl): ThreadPoolExecutor线程池状态.png
    各个ctl之间的关系如下: ThreadPoolExecutor runState.png

    所以shutdown()代表停止接收新任务但仍需执行现有任务,shutdownNow()代表停止接收新任务也不再执行现有任务。

    ThreadPoolExecutor在实现时用到了BlockQueue和AbstracQueuedSynchronizer。

    BlockQueue比较常见的实现为LinkedBlockingQueue。其原理如下: LinkedBlockingQueue.png AbstractQueuedSynchronizer.png

    可以看到Lock和BlockQueue是非常实用的。
    Lock替代了synchronized,功能更强大使用也更简便,如下:

        class BoundedBuffer {
            final Lock lock = new ReentrantLock();
            final Condition notFull = lock.newCondition();
            final Condition notEmpty = lock.newCondition();
    
            final Object[] items = new Object[100];
            int putptr, takeptr, count;
    
            public void put(Object x) throws InterruptedException {
                lock.lock();
                try {
                    while (count == items.length)
                        notFull.await();
                    items[putptr] = x;
                    if (++putptr == items.length) putptr = 0;
                    ++count;
                    notEmpty.signal();
                } finally {
                    lock.unlock();
                }
            }
    
            public Object take() throws InterruptedException {
                lock.lock();
                try {
                    while (count == 0)
                        notEmpty.await();
                    Object x = items[takeptr];
                    if (++takeptr == items.length) takeptr = 0;
                    --count;
                    notFull.signal();
                    return x;
                } finally {
                    lock.unlock();
                }
            }
        }
    

    BlockingQueue非常适用于生产-消费者模型,如下:

    class Producer implements Runnable {
        private final BlockingQueue queue;
        Producer(BlockingQueue q) { queue = q; }
        public void run() {
            try {
                while (true) { queue.put(produce()); }
            } catch (InterruptedException ex) { ... handle ...}
        }
        Object produce() { ... }
    }
    
    class Consumer implements Runnable {
        private final BlockingQueue queue;
    
        Consumer(BlockingQueue q) {
            queue = q;
        }
    
        public void run() {
            try {
                while (true) {
                    consume(queue.take());
                }
            } catch (InterruptedException ex) { ...handle ...}
        }
    
        void consume(Object x) { ...}
    }
    
    class BlockingQueueTest {
        void main() {
            BlockingQueue q = new SomeQueueImplementation();
            LockTest.Producer p = new LockTest.Producer(q);
            Consumer c1 = new Consumer(q);
            Consumer c2 = new Consumer(q);
            new Thread(p).start();
            new Thread(c1).start();
            new Thread(c2).start();
        }
    }
    

    ScheduledThreadPoolExecutor

    1. 一个可以执行延迟任务和周期性任务的ThreadPoolExecutor;
    2. 如果提交的任务在运行前被取消,则该任务会在延迟结束后被取消;
    3. 如果多个任务有相同的执行时间,采用FIFO;

    因为功能不同,其实现和ThreadPoolExecutor也不一样:

    1. maximumPoolSize == corePoolSize;且等待任务队列大小没有限制为Integer.MAX_VALUE;
    2. 设置了shutdown策略来决定哪些任务在线程池进入SHUTDOWN状态后不需要再执行;
    3. 采用DelayedWorkQueue而非LinkedBlockingQueue,加快取消速度(时间复杂度从O(n)降至O(log n));
    4. 在插入队列时将所有任务封装成ScheduledFutureTask给DelayedWorkQueue使用;
    具体实现原理如下: ScheduledThreadPoolExecutor.png

    ForkJoinPool

    1. 继承AbstractExecutorService;
    2. 将所有提交的或执行的任务封装成ForkJoinTask执行;
    3. 需要将任务分解成足够小的子任务执行,思路如下:
    解决(问题):
        if 问题足够小:
            直接解决问题 (顺序算法)
        else:
            for 部份 in 细分(问题)
                fork 子任务来解决(部份)
            join 在前面的循环中生成的所有子任务
            return 合并的结果
    
    fork-join model.png
    1. 每个Worker都有其自己的本地任务队列;
    2. 每个Worker都可以去其它Worker的任务队列中窃取任务执行;


      work stealing实现方式.png
    3. workQueues保存了WorkQueue列表,其中奇数索引代表内部任务(unshared queues,本地任务列表),偶数索引代表外部任务(shared queues,窃取的任务列表),因此workQueues的大小始终为2的幂次方。
    ForkJoinPool workQueues.png

    其具体实现原理如下:


    ForkJoinPool.png

    Executors

    提供java.util.concurrent包中的Executor、ExecutorService、ScheduledExecutorService、ThreadFactory的工厂方法。

    newFixedThreadPool()

    创建一个指定corePoolSize和maximumPoolSize都为nThreads的ThreadPoolExecutor。

    适用于负载较重的并行运算。

    newWorkStealingPool()

    创建ForkJoinPool,指定处理还没joined的forked任务的规则为FIFO(上面介绍中的ForkJoinPool默认采用LIFO)。

    适用于“分而治之”递归运算计算密集的运算。

    newScheduledThreadPool()

    创建一个指定corePoolSize的ScheduledThreadPoolExecutor。

    适用于需要定期或周期性执行的运算。

    newSingleThreadExecutor()

    创建一个仅仅只能拥有一个执行线程的ThreadPoolExecutor。

    适用于串行运算。

    newCachedThreadPool()

    创建一个线程数量不限的ThreadPoolExecutor(corePoolSize=0, maximumPoolSize=Integer.MAX_VALUE)。

    适用于负载较轻的数量繁多的短期运算。

    newSingleThreadScheduledExecutor()

    创建一个仅仅只能拥有一个执行线程的ScheduledThreadPoolExecutor。

    适用于串行的需要定期或周期性执行的运算。

    MoreExecutors

    提供java.util.concurrent包中的Executor、ExecutorService、ThreadFactory的工厂方法。

    newSequentialExecutor(Executor delegate)

    delegate:实际执行任务的底层Executor

    创建一个实际由delegate执行任务的SequentialExecutor。

    SequentialExecutor:由delegate委拖任务执行程序实际执行任务的采用FIFO规则串行执行任务的Executor。

    适用于需要按FIFO规则执行的串行运算。

    SequentialExecutor.png

    listeningDecorator(ExecutorService delegate)

    delegate:实际执行任务的底层Executor

    创建一个实际由delegate执行任务的ListeningExecutorService。

    ListeningExecutorService:可以在任务完成后再执行给定任务的ExecutorService。

    适用于需要将运算链接在一起的场景。

    ListeningExecutorService.png

    应用

    创建一个在非UI线程执行并行任务的Executor

        @Provides
        @Annotations.NonUiParallel
        @Singleton
        static ExecutorService provideNonUiThreadPool() {
            return Executors.newFixedThreadPool(
                    5,
                    runnable -> {
                        Log.i("DialerExecutorModule.newThread", "creating low priority thread");
                        Thread thread = new Thread(runnable, "DialerExecutors-LowPriority");
                        // Java thread priority 4 corresponds to Process.THREAD_PRIORITY_BACKGROUND (10)
                        thread.setPriority(4);
                        return thread;
                    });
        }
    

    创建一个在非UI线程执行串行任务的Executor

        static ScheduledExecutorService provideNonUiSerialExecutorService() {
            return Executors.newSingleThreadScheduledExecutor(
                    runnable -> {
                        Log.i("NonUiTaskBuilder.newThread", "creating serial thread");
                        Thread thread = new Thread(runnable, "DialerExecutors-LowPriority-Serial");
                        // Java thread priority 4 corresponds to Process.THREAD_PRIORITY_BACKGROUND (10)
                        thread.setPriority(4);
                        return thread;
                    });
        }
    

    创建一个在UI线程执行延迟或周期性任务的Executor

        static ScheduledExecutorService provideUiSerialExecutorService() {
            return Executors.newSingleThreadScheduledExecutor(
                    runnable -> {
                        Log.i("DialerExecutorModule.newThread", "creating serial thread");
                        Thread thread = new Thread(runnable, "DialerExecutors-HighPriority-Serial");
                        // Java thread priority 5 corresponds to Process.THREAD_PRIORITY_DEFAULT (0)
                        thread.setPriority(5);
                        return thread;
                    });
        }
    

    创建一个在UI线程执行的可以注册listener的轻量级Executor

        @Provides
        @Annotations.UiParallel
        @Singleton
        static ExecutorService provideUiThreadPool() {
            return (ExecutorService) AsyncTask.THREAD_POOL_EXECUTOR;
        }
    
        static ListeningExecutorService provideLightweightExecutor(@Annotations.UiParallel ExecutorService delegate) {
            return MoreExecutors.listeningDecorator(delegate);
        }
    

    创建一个在非UI线程异步执行运算的可以注册listener的Executore

        static ListeningExecutorService provideBackgroundExecutor(
                @Annotations.NonUiParallel ExecutorService delegate) {
            return MoreExecutors.listeningDecorator(delegate);
        }
    

    总结

    Executor框架把任务的提交和执行解耦,可以简化并发编程。线程能过线程的复用,可以降低资源消耗,提高响应速度等。

    相关文章

      网友评论

          本文标题:如何例用Executor创建后台线程池

          本文链接:https://www.haomeiwen.com/subject/dqanrktx.html