美文网首页
Executor框架

Executor框架

作者: zlcook | 来源:发表于2017-08-20 21:50 被阅读33次
  • Executor接口
  • ExecutorService规定了Executor的生命周期 (待写)
  • ThreadPoolExecutor是ExecutorService的实现类
image.png
  • 线程池不建议使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

  • Executors 返回的线程池对象的弊端如下:
    1)FixedThreadPool 和 SingleThreadPool:
    允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
    2)CachedThreadPool 和 ScheduledThreadPool:
    允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

  • 对于延迟任务、周期任务使用ScheduledThreadPoolExecutor来代替Timer。

    • Timer使用绝对时间而不是相对时间,在执行定时任务时只会创建一个线程。如果某个任务执行时间过长,会破坏其它TimerTask的定时精确性。
    • Timer中的任何一个TimerTask抛出一个未检查异常,就会取消整个Timer,尚未被调度的TimerTask将不会被执行。
    • ScheduledThreadPoolExecutor配合DelayQueue来代替Timer。

ThreadPoolExecutor构造函数

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

参数解释

  • corePoolSize:线程池基本大小,即没有任务执行时的大小,但是在创建ThreadPoolExecutor初期,线程并不会立即启动,而是等到有任务提交时才会启动,除非调用prestartAllCoreThreads

  • maximumPoolSize:线程池最大大小。表示可同时活动的线程数量的上限。

  • keepAliveTime:线程存活时间,如果某个线程的空闲时间超出了存活时间,那么被标记为可回收,但是,只有线程池的当前大小超过了基本大小这个线程才会被终止。

  • unit:时间单位,和keepAliveTime组成具体时间,比如keepAliveTime为10,unit= TimeUnit.SECONDS,就表示存活时间为10秒。

  • workQueue:工作队列,用于存放提交的任务,用于被线程执行。

    • BlockingQueue的其中4个实现类:SynchronousQueue(同步队列),ArrayBlockingQueue(数组阻塞队列FIFO),LinkedBlockingQueue(链表阻塞队列FIFO),PriorityBlockingQueue(优先级阻塞队列)。后2个可以是有界也可以是无界,第二个是有界队列,第一个是好比容量只有一的阻塞队列。
    • SynchronousQueue不是一个真正队列,而是一种在线程之间进行移交的机制,要将一个元素放入SynchronousQueue中(put方法),必须有另一个线程正在等待接受这个元素(take方法),如果没有线程在等待,那么就会阻塞。在线程池中如果当前线程池中的线程数量没有达到最大值,就创建新的线程,否则就根据饱和策略(handler)来执行。该队列适合消费者多的情况,针对线程池就是适合无界的线程池(maximumPoolSize为int最大值)
  • threadFactory:构造新线程的工厂,具体看:处理非正常的线程终止中使用ThreadFactory来定制Thread。

  • handler:饱和策略,当有界工作队列被填满后,并且没有可用的线程(线程池也是有界的),饱和策略发挥作用。如果某个任务被提交到一个已被关闭的Executor时(调用了shutdown方法),也会用到饱和策略。

    • 四种策略。中止(Abort):该策略抛出未检查异常RejectedExecutionException,调用者可以捕获这个异常,然后根据需求处理;抛弃(Discard):抛弃任务;抛弃最旧的(Discard-Oldest):抛弃下一个将被执行的任务,然后重提提交新的任务,如果工作队列是优先队列(优先级)那么将抛弃优先级最高的任务;调用者运行(Caller-Runs):将任务转交给由调用execute的线程执行该任务,即主线程。
  • 注:如果线程数量等于线程池基本大小值corePoolSize,那么只有当工作队列被填满后,ThreadPoolExecutor才会创建新线程来执行队列中的任务

    • corePoolSize为0、maximumPoolSize为3,并且工作队列为有界队列,并且大小为5,那么只有每当5个任务都填满队列后,才会创建一个线程执行,最多只有3个线程同时作业。Executors.newCachedThreadPool()返回的ThreadPoolExecutor的corePoolSize为0,maximumPoolSize为MAX_VALUE,workQueue为SynchronousQueue。
  • image.png

    下实现一种平缓的性能下降。

案例

  • 1.创建固定大小线程池,有界队列,调用者运行饱和策略
  • image.png
    1. 没有定义饱和策略,但是通过使用Semaphore(信号量)来控制任务的提交速率,信号量大小设置为线程池大小加上可排队列任务的数量,从而控制正在执行的和等待执行的任务数量。
package net.jcip.examples;
import java.util.concurrent.*;
import net.jcip.annotations.*;

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
        }
    }
}

扩展ThreadPoolExecutor

  • ThreadPoolExecutor提供了beforeExecute、afterExecute和terminated,这些方法可以用于扩展ThreadPoolExecutor的行为。

    • 其中beforeExecute、afterExecute在每个任务线程中执行如下,runWorker会在在任务线程的run方法中执行。
public runWorker(Worker w){
......
 try {
                   beforeExecute(wt, task);
                   Throwable thrown = null;
                   try {
                       task.run();
                   } catch (RuntimeException x) {
                       thrown = x; throw x;
                   } catch (Error x) {
                       thrown = x; throw x;
                   } catch (Throwable x) {
                       thrown = x; throw new Error(x);
                   } finally {
                       afterExecute(task, thrown);
                   }
               } finally {
                   task = null;
                   w.completedTasks++;
                   w.unlock();
               }
....
}
  • afterExecute在beforeExecute抛出RuntimeException,或者任务完成后带有一个Error都不会被调用。
  • terminated在线程池关闭时调用,可以用来释放Executor分配的资源,或者执行发送通知,记录日志或手机finalize统计信息等操作。

案例

  • 给线程池添加统计信息
  • 测量任务的运行时间,记录已处理任务数和总的处理时间,并通过terminated来输入包含平均任务时间的日志消息。
public class MyAppThread extends Thread {
    public static final String DEFAULT_NAME = "MyAppThread";
    private static volatile boolean debugLifecycle = false;
    private static final AtomicInteger created = new AtomicInteger();
    private static final AtomicInteger alive = new AtomicInteger();
    private static final Logger log = Logger.getAnonymousLogger();

    public MyAppThread(Runnable r) {
        this(r, DEFAULT_NAME);
    }

    public MyAppThread(Runnable runnable, String name) {
        super(runnable, name + "-" + created.incrementAndGet());
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            public void uncaughtException(Thread t,
                                          Throwable e) {
                log.log(Level.SEVERE,
                        "UNCAUGHT in thread " + t.getName(), e);
            }
        });
    }

    public void run() {
        // Copy debug flag to ensure consistent value throughout.
        boolean debug = debugLifecycle;
        if (debug) log.log(Level.FINE, "Created " + getName());
        try {
            alive.incrementAndGet();
            super.run();
        } finally {
            alive.decrementAndGet();
            if (debug) log.log(Level.FINE, "Exiting " + getName());
        }
    }

    public static int getThreadsCreated() {
        return created.get();
    }

    public static int getThreadsAlive() {
        return alive.get();
    }

    public static boolean getDebug() {
        return debugLifecycle;
    }

    public static void setDebug(boolean b) {
        debugLifecycle = b;
    }
}

递归算法的并行化

  • 如果在循环体中包含一些密集型计算,或者需要执行可能阻塞的I/O操作,那么只要每次迭代是独立的,都可以对其进行并行化。(计算密集型和I/O操作对Cpu利用率不同,所以线程池大小也不一样)
  • 下面给出来了2个串行转并行的操作。
public abstract class TransformingSequential {

  //1.将串行转并行
    void processSequentially(List<Element> elements) {
        for (Element e : elements)
            process(e);
    }
    void processInParallel(Executor exec, List<Element> elements) {
        for (final Element e : elements)
            exec.execute(new Runnable() {
                public void run() {
                    process(e);
                }
            });
    }

    public abstract void process(Element e);

 //2.将串行递归转化为并行递归:树的深度优先遍历
    public <T> void sequentialRecursive(List<Node<T>> nodes,
                                        Collection<T> results) {
        for (Node<T> n : nodes) {
            results.add(n.compute());
            sequentialRecursive(n.getChildren(), results);
        }
    }

    public <T> void parallelRecursive(final Executor exec,
                                      List<Node<T>> nodes,
                                      final Collection<T> results) {
        for (final Node<T> n : nodes) {
            exec.execute(new Runnable() {
                public void run() {
                    results.add(n.compute());
                }
            });
            parallelRecursive(exec, n.getChildren(), results);
        }
    }

  //等待通过并行方式的计算结果
    public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
            throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
        parallelRecursive(exec, nodes, resultQueue);
        exec.shutdown();
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        return resultQueue;
    }

    interface Element {
    }

    interface Node <T> {
        T compute();

        List<Node<T>> getChildren();
    }
}

相关文章

网友评论

      本文标题:Executor框架

      本文链接:https://www.haomeiwen.com/subject/crifdxtx.html