美文网首页爱编程,爱生活
Java Consurrency 《Thread Pool》

Java Consurrency 《Thread Pool》

作者: 熬夜的猫头鹰 | 来源:发表于2018-06-16 21:48 被阅读10次

线程池优点

Java是支持多线程的,而线程在应用当中是稀缺资源,所以在编写程序的时候需要特别注意合理的利用线程。

  • 降低资源的消耗
  • 提高响应速度
  • 线程可管理 可复用

Java的线程管理是在java.util.concurrent下,
接口Executor提供了执行已提交的 Runnable 任务的对象的方法。此接口解耦了任务提交和每个任务将如何运行的机制(包括线程使用的细节、调度等)。通常使用 Executor 而不是显式通过 new Thread(new(RunnableTask())).start()创建线程。

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

不过, Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用者的线程中立即运行已提交的任务。如:

 class DirectExecutor implements Executor {
     public void execute(Runnable r) {
         r.run();
     }
 }
 

更常见的是,任务是在某个不是调用者线程的线程中执行的。以下执行程序将为每个任务生成一个新线程

  class ThreadPerTaskExecutor implements Executor {
     public void execute(Runnable r) {
         new Thread(r).start();
     }
 }
 

Executor已知的子接口以及实现类架构图:

[图片上传失败...(image-7ee135-1529156779808)]

下面这件介绍一下子接口、子类的用途:

ExecutorService

ExecutorService 提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。

可以关闭 ExecutorService,这将导致其拒绝新任务。提供两个方法来关闭 ExecutorService。shutdown() 方法在终止前允许执行以前提交的任务,而 shutdownNow() 方法阻止等待任务启动并试图停止当前正在执行的任务。在终止时,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的 ExecutorService 以允许回收其资源。

通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法 submit 扩展了基本方法 Executor.execute(Runnable)。方法 invokeAny 和 invokeAll 是批量执行的最常用形式,它们执行任务 collection,然后等待至少一个,或全部任务完成(可使用 ExecutorCompletionService 类来编写这些方法的自定义变体)

  • 方法列表
ExecutorService#shutdown
ExecutorService#shutdownNow
ExecutorService#isShutdown
ExecutorService#isTerminated
ExecutorService#awaitTermination
ExecutorService#submit(Callable<T>)
ExecutorService#submit(Runnable, T)
ExecutorService#submit(Runnable)
ExecutorService#invokeAll(java.util.Collection<? extends Callable<T>>)
ExecutorService#invokeAll(java.util.Collection<? extends Callable<T>>, long, TimeUnit)
ExecutorService#invokeAny(java.util.Collection<? extends Callable<T>>)
ExecutorService#invokeAny(java.util.Collection<? extends Callable<T>>, long, TimeUnit)

*** submit方法返回一个Future对象,对这对象可以设置超时时间 ***

  • 实现一个线程支持执行的DEMO
public class Main {

    static ExecutorService executorService = Executors.newFixedThreadPool(5);

    public static void main(String[] args) {
        executorService.execute(new OpenTV());
    }

    static class OpenTV implements Runnable{

        @Override
        public void run() {
            System.err.println("i have open the tv");
        }
    }

}

Doc上列举的实例:

class NetworkService implements Runnable {
    private final ServerSocket serverSocket;
    private final ExecutorService pool;

    public NetworkService(int port, int poolSize)
        throws IOException {
      serverSocket = new ServerSocket(port);
      pool = Executors.newFixedThreadPool(poolSize);
    }
 
    public void run() { // run the service
      try {
        for (;;) {
          pool.execute(new Handler(serverSocket.accept()));
        }
      } catch (IOException ex) {
        pool.shutdown();
      }
    }
  }

  class Handler implements Runnable {
    private final Socket socket;
    Handler(Socket socket) { this.socket = socket; }
    public void run() {
      // read and service request on socket
    }
 }
 

下列方法分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后调用 shutdownNow(如有必要)取消所有遗留的任务:

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }
 

整体的测试类

public class ExecutorServiceMain {

    //创建一个线程池
    static ExecutorService pool = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
//        pool.execute(new OpenTV());
        Future<String> openTvFuture = pool.submit(new OpenDoor());
        System.err.println(openTvFuture.get(6L,TimeUnit.SECONDS));
        shutDown(pool);
    }

    static class OpenTV implements Runnable{

        @Override
        public void run() {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.err.println("I have open the tv");
        }
    }

    static class OpenDoor implements Callable<String>{

        @Override
        public String call() throws Exception {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "I love java";
        }
    }

    private static void shutDown(ExecutorService pool){
        pool.shutdown();
        try {
            if(!pool.awaitTermination(6,TimeUnit.SECONDS)){
                pool.shutdownNow();
            }
            if(!pool.awaitTermination(6,TimeUnit.SECONDS)){
                System.err.println("something is wrong");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            pool.shutdownNow();
            Thread.interrupted();

        }

    }


}

ScheduledExecutorService

一个 ExecutorService,可安排在给定的延迟后运行或定期执行的命令

schedule 方法使用各种延迟创建任务,并返回一个可用于取消或检查执行的任务对象。scheduleAtFixedRate 和 scheduleWithFixedDelay 方法创建并执行某些在取消前一直定期运行的任务。

用 Executor.execute(Runnable) 和 ExecutorService 的 submit 方法所提交的命令,通过所请求的 0 延迟进行安排。schedule 方法中允许出现 0 和负数延迟(但不是周期),并将这些视为一种立即执行的请求。

所有的 schedule 方法都接受相对 延迟和周期作为参数,而不是绝对的时间或日期。将以 Date 所表示的绝对时间转换成要求的形式很容易。例如,要安排在某个以后的 Date 运行,可以使用:schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)。但是要注意,由于网络时间同步协议、时钟漂移或其他因素的存在,因此相对延迟的期满日期不必与启用任务的当前 Date 相符。 Executors 类为此包中所提供的 ScheduledExecutorService 实现提供了便捷的工厂方法。

  • 方法列表
ScheduledExecutorService#schedule(Runnable, long, TimeUnit)
ScheduledExecutorService#schedule(Callable<V>, long, TimeUnit)
ScheduledExecutorService#scheduleAtFixedRate
ScheduledExecutorService#scheduleWithFixedDelay

ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。如果任务的任何一个执行遇到异常,则后续执行都会被取消。否则,只能通过执行程序的取消或终止方法来终止该任务。如果此任务的任何一个执行要花费比其周期更长的时间,则将推迟后续执行,但不会同时执行。

ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)

创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。如果任务的任一执行遇到异常,就会取消后续执行。否则,只能通过执行程序的取消或终止方法来终止该任务。

  • Demo

public class SchduledExecutorMain {

    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ScheduledFuture<String> scheduled = scheduler.schedule(new RollTheBell(), 5, TimeUnit.SECONDS);
        System.err.println(scheduled.get());

        Job job = new Job();
        ScheduledFuture<?> jobScheduled = scheduler.scheduleAtFixedRate(job, 10, 10, TimeUnit.SECONDS);
        System.err.println(jobScheduled.get());
//        ExecutorServiceMain.shutDown(scheduler);
    }


    static class RollTheBell implements Callable<String> {

        @Override
        public String call() throws Exception {
            System.out.println("我是 5秒之后执行的");
            return "I love java";
        }
    }

    static class Job implements Runnable {

        public void run() {
            System.out.println("十秒之后每十秒执行一次");
        }

    }
}


AbstractExecutorService

提供 ExecutorService 执行方法的默认实现。此类使用 newTaskFor 返回的 RunnableFuture 实现 submit、invokeAny 和 invokeAll 方法,默认情况下,RunnableFuture 是此包中提供的 FutureTask 类。例如,submit(Runnable) 的实现创建了一个关联 RunnableFuture 类,该类将被执行并返回。子类可以重写 newTaskFor 方法,以返回 FutureTask 之外的 RunnableFuture 实现。

  • 提供的方法列表

AbstractExecutorService#newTaskFor(Runnable, T)
AbstractExecutorService#newTaskFor(Callable<T>)
AbstractExecutorService#submit(Runnable)
AbstractExecutorService#submit(Runnable, T)
AbstractExecutorService#submit(Callable<T>)
AbstractExecutorService#doInvokeAny
AbstractExecutorService#invokeAny(java.util.Collection<? extends Callable<T>>)
AbstractExecutorService#invokeAny(java.util.Collection<? extends Callable<T>>, long, TimeUnit)
AbstractExecutorService#invokeAll(java.util.Collection<? extends Callable<T>>)
AbstractExecutorService#invokeAll(java.util.Collection<? extends Callable<T>>, long, TimeUnit)


ThreadPoolExecutor

ThreadPoolExecutor继承了AbstractExecutorService,总共提供了四个构造函数


ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory)
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, RejectedExecutionHandler)
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory, RejectedExecutionHandler)

构造函数主要提供以下几个参数:

  • corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。

  • BlockingQueue

    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
  • maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
  • ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
  • RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。
    • AbortPolicy:直接抛出异常。
    • CallerRunsPolicy:只用调用者所在线程来运行任务。
    • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    • DiscardPolicy:不处理,丢弃掉。
      当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
  • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
  • TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

线程池提交任务

  • 一种是无返回值

pool.execute(new Runnable() {
            @Override
            public void run() {
                // do something
            }
        });

  • 一种是有返回值

我们也可以使用submit 方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。

Future<Object> future = executor.submit(harReturnValuetask);
try {
     Object s = future.get();
} catch (InterruptedException e) {
    // 处理中断异常
} catch (ExecutionException e) {
    // 处理无法执行任务异常
} finally {
    // 关闭线程池
    executor.shutdown();
}

线程池的关闭

public static void shutDown(ExecutorService pool){
        pool.shutdown();
        try {
            if(!pool.awaitTermination(6,TimeUnit.SECONDS)){
                pool.shutdownNow();
            }
            if(!pool.awaitTermination(6,TimeUnit.SECONDS)){
                System.err.println("something is wrong");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            pool.shutdownNow();
            Thread.interrupted();

        }

    }

线程池的工作原理分析

[图片上传失败...(image-4dd8a2-1529156779808)]

从上图我们可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:

  • 首先线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
  • 其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
  • 最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。

合理的配置线程池

要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:

  • 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
  • 任务的优先级:高,中和低。
  • 任务的执行时间:长,中和短。
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能小的线程,如配置Ncpu+1个线程的线程池。IO密集型任务则由于线程并不是一直在执行任务,则配置尽可能多的线程,如2Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()* 方法获得当前设备的CPU个数。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。

依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。

建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。

线程池的监控

通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用

  • taskCount:线程池需要执行的任务数量。
  • completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
  • largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
  • getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不加
  • getActiveCount:获取活动的线程数。

通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。如:

protected void beforeExecute(Thread t, Runnable r) { }


参考:

  • Java并发编程实战
  • JDK1.6源码
  • infoq

相关文章

网友评论

    本文标题:Java Consurrency 《Thread Pool》

    本文链接:https://www.haomeiwen.com/subject/ovtieftx.html