美文网首页
Java线程池 ThreadPool

Java线程池 ThreadPool

作者: maven_hz | 来源:发表于2017-08-16 17:05 被阅读0次

线程池中线程都可复用,大大提高了效率。线程的创建和销毁是很耗费资源的

  • 线程池的ExecutorService 服务 有execute和submit方法 。submit除了可以传入Runable。还可以传入Callable。故可以有返回值
  • Callable 类似Runnable 但是Callable可以有返回值。实现Callable的线程结束后可接收
  • Executors为Executor,ExecutorService,ScheduledExecutorService,ThreadFactory和Callable类提供了一些工具方法,类似于集合中的Collections类的功能。
  • 固定数量的线程池 ExecutorService service = Executors.newFixedThreadPool(4):
        ExecutorService service = Executors.newFixedThreadPool(4);
        MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
        MyTask t2 = new MyTask(80001, 130000);
        MyTask t3 = new MyTask(130001, 170000);
        MyTask t4 = new MyTask(170001, 200000);
        
        Future<List<Integer>> f1 = service.submit(t1);
        Future<List<Integer>> f2 = service.submit(t2);
        Future<List<Integer>> f3 = service.submit(t3);
        Future<List<Integer>> f4 = service.submit(t4);
  • 不固定数量线程池 ExecutorService service = Executors.newCachedThreadPool(); 线程结束后默认60秒之后在线程池中销毁
  • 单线程的线程池。ExecutorService service = Executors.newSingleThreadExecutor(); 可保证任务执行顺序
  • 定时任务线程池。ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
        /**
         *  scheduleAtFixedRate方法四个参数
         *  Runnable command,
         *  long initialDelay,任务开始时间
         *  long period,每隔多少时间执行
         *  TimeUnit unit,时间单位
         */
        service.scheduleAtFixedRate(()->{
            try {
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        }, 0, 500, TimeUnit.MILLISECONDS);
  • 可执行其他线程任务WorkStealingPool :ExecutorService service = Executors.newWorkStealingPool(); 该线程池中每个线程都维护自己的任务队列。当自己的任务队列执行完成时,会帮助其他线程执行其中的任务。主动找活干。PS:它底层是ForkJoinPool的实现。用守护线程daemon的方式来执行。JDK1.8之后新加
  • ForkJoinPool 分任务进行计算再合并结果。计算任务需要基础RecursiveTask: class AddTask extends RecursiveTask<Long> 并重写compute()方法 方法内进行递归调用。 RecursiveTask(递归任务) JDK1.7之后新加。 代码示例:
    class AddTask extends RecursiveTask<Long> {

        int start, end;

        AddTask(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected Long compute() {

            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                return sum;
            }

            int middle = start + (end-start)/2;

            AddTask subTask1 = new AddTask(start, middle);
            AddTask subTask2 = new AddTask(middle, end);//递归
            subTask1.fork(); //启动新线程
            subTask2.fork();

            return subTask1.join() + subTask2.join(); //返回计算结果
        }

    }
  • 所有线程池类内部实现都是ThreadPoolExecutor 除了ForkJoinPool。ThreadPoolExecutor 线程池的通用类 jdk源码中的ThreadPoolExecutor:
public ThreadPoolExecutor(int corePoolSize, //线程数量
                              int maximumPoolSize,//最多线程数
                              long keepAliveTime,//线程空闲销毁的时间
                              TimeUnit unit,//线程空闲销毁的时间单位
                              BlockingQueue<Runnable> workQueue) {//任务容器。阻塞式容器
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
  • 从各类pool的源码可以看见。使用的都是ThreadPoolExecutor
    /**
    *  FixedThreadPool 固定数量
    */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    /**
    *  CachedThreadPool 不固定数量,空闲60秒后销毁
    */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    /**
    *  SingleThreadExecutor  单个线程,保证了顺序执行。
    */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  • parallel Stream API : nums.parallelStream().forEach(...);多线程执行任务.效率很高。底层为ForkJoinPool的实现 。JDK1.8之后的特性 示例.质数的计算

public class ParallelStreamAPI {
    public static void main(String[] args) {
        List<Integer> nums = new ArrayList<>();
        Random r = new Random();
        for(int i=0; i<10000; i++) nums.add(1000000 + r.nextInt(1000000));
        
        start = System.currentTimeMillis();
        nums.parallelStream().forEach(ClassName::isPrime);
        end = System.currentTimeMillis();
        System.out.println(end - start);
        }
        
        static boolean isPrime(int num) {
        for(int i=2; i<=num/2; i++) {
            if(num % i == 0) return false;
        }
        return true;
        }
}

相关文章

网友评论

      本文标题:Java线程池 ThreadPool

      本文链接:https://www.haomeiwen.com/subject/vpairxtx.html