美文网首页
十三、多线程执行器

十三、多线程执行器

作者: blank_white | 来源:发表于2020-07-19 20:58 被阅读0次

    十三、多线程执行器

    FutureTask 和 Callable

    可以用来启动一个需要很长时间的计算任务

    
            // Callable 与 Runnable 类似,但是有返回值
            Callable<Integer> callable=()->{
                int sum=0;
                for (int i = 0; i < 1000000000; i++) {
                    sum+=i;
                }
                return sum;
            };
    
    
            // FutureTask 可以将 Callable 包装成实现了 Runnable 和 Future 接口的对象
            FutureTask<Integer> futureTask = new FutureTask<>(callable);
    
            // 开启线程运行任务
            Thread thread=new Thread(futureTask);
            thread.start();
    
            
            // 获取任务的执行结果,未计算完会阻塞
            // 如果计算线程被中断,抛出InterruptedException
            futureTask.get();
            //
            try {
                // 设置最长阻塞时间,超时还没得出结果会抛出异常
                futureTask.get(10L, TimeUnit.SECONDS);
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
    
            // 如果计算还没开始,取消该任务。如果计算处于运行中,且参数为 true ,则计算会被中断
            futureTask.cancel(true);
    
            futureTask.isDone();
            futureTask.isCancelled();
    
            System.out.println(futureTask.get());
    
    线程池

    线程创建是有消耗的,可以提前创建一些线程,需要的时候直接使用。维护这些线程的池子,也就是线程池

    执行器

    通过 Executors 的工厂方法获得线程池的执行器 ExecutorService

    用 ExecutorService 来执行线程并得到一个 Future

    用 Future 监视线程执行状况

    
            // 执行器 
            ExecutorService executorService = Executors.newCachedThreadPool();
            // 必要时创建新线程,空闲线程会被保留 60秒
            Executors.newCachedThreadPool();
            // 包含固定数量的线程,空闲线程会一直被保留,参数为线程数
            Executors.newFixedThreadPool(32);
            // 只有一个线程的“池”,会按顺序执行提交的任务
            Executors.newSingleThreadExecutor();
            // 用于预定执行一些任务而构建的固定线程池,支持定时及周期性的执行任务,参数为线程数
            Executors.newScheduledThreadPool(32);
            // ScheduledThreadPool 的单线程“池”版
            Executors.newSingleThreadScheduledExecutor();
    
    
            Runnable runnable=()->{};
            Callable callable=()->1;
            Future future;
            // 提交一个 Runnable 或者 Callable 对象给 Executor 并返回一个 future 来监控执行
            // 调用 future.get() 会返回 null ,runnable 没返回值
            future=executorService.submit(runnable);
            // 调用 future.get() 会在任务完成后,把第二个参数传入的对象返回
            future=executorService.submit(runnable,new Object());
    
            future=executorService.submit(callable);
    
    
            // 当不在需要提交任何任务的时候调用,关闭执行器,不再接受新任务,当所有任务完成后,线程池的线程死亡
            executorService.shutdown();
            // 会取消尚未开始的任务,并且试图中断正在运行的线程
            executorService.shutdownNow();
    
    
    预定和周期性的执行任务
    
            ScheduledExecutorService executorService = Executors.newScheduledThreadPool(12);
            
            Runnable runnable=()->{};
            Callable callable=()->1;
            // 在指定延迟后执行任务 , 在 10s 后执行任务
            executorService.schedule(runnable,10, TimeUnit.SECONDS);
            executorService.schedule(callable,10, TimeUnit.SECONDS);
            // 在指定延迟后周期性的执行任务, 10s 后开始,每天都在这个时间执行任务
            executorService.scheduleAtFixedRate(runnable,10,60*60*24,TimeUnit.SECONDS);
            // 在指定延迟后,固定间隔的执行任务, 10s 后开始,执行完任务隔 1 小时后,再次执行任务
            executorService.scheduleWithFixedDelay(runnable,10,60*60*1, TimeUnit.SECONDS);
    
    任务组

    可以控制一组任务的执行

    invokeAny : 任一任务得出结果,即返回结果

    ExecutorCompletionService:可以先取出有结果的任务

    
            ExecutorService executorService = Executors.newFixedThreadPool(20);
            Callable<Integer> callable1=()->1;
            Callable<Integer> callable2=()->2;
            //Callable<Integer> callable2=()->{while (true);};
    
            List<Callable<Integer>> callables=new ArrayList();
            callables.add(callable1);
            callables.add(callable2);
    
            // 执行集合中的所有任务,且任意一个执行完成得出结果,即将结果返回,但是其他未执行完的线程仍会继续执行
            // 可以用在比如说多线程分段查找,任何一个线程找到了就可以返回结果了
            Integer result = executorService.invokeAny(callables);
            // 可以设置超时时间,超时未得出结果会抛出 TimeOutException
            executorService.invokeAny(callables,1,TimeUnit.SECONDS);
    
    
            // 2,1,1,2,2,1,1,1,1,1,  且每次程序运行结果不尽相同
            for (int i = 0; i < 10; i++) {System.out.print(executorService.invokeAny(callables)+",");}
    
    
    
            // 执行集合中所有任务,并返回 Future 的集合 ,此方法会等所有任务都有结果了才返回
            List<Future<Integer>> futures = executorService.invokeAll(callables);
            // 与 invokeAny 一样也有带超时参数的版本
            executorService.invokeAll(callables,1,TimeUnit.SECONDS);
    
    
    
            // 构建一个能收集完成服务执行器,内部会维护一个阻塞队列 BlockingQueue,包含已完成执行的任务的future
            ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
            for (Callable<Integer> callable : callables) {
                executorCompletionService.submit(callable);
            }
            System.out.println("******");
            // take 方法取出一个已经执行完的任务的 future ,如果没有完成的任务,则会阻塞(底层的 BlockingQueue.take() 阻塞了)
            executorCompletionService.take().get();
            executorCompletionService.take().get();
            // 只有两个任务结果已经全部取出,要是再取就阻塞了
            //executorCompletionService.take().get();
    
            // 取出一个完成的任务结果,如果没有 返回 null
            executorCompletionService.poll();
            // 没有结果的时候会等待给定时间 (就是BlockingQueue.poll)
            executorCompletionService.poll(2,TimeUnit.SECONDS);
    
            executorService.shutdown();
    
    Fork-Join 框架

    Jave SE 7 提供了 fork-join 框架,可以将任务分段并行执行

    一个计算1-1000(不含1000)的和 ,用法如下,

    
    
    public class MyForkJoin {
    
        public static void main(String[] args) {
    
            int start=1;
            int end=1000;
            // 创建线程池
            ForkJoinPool forkJoinPool=new ForkJoinPool();
            // 创建任务,
            MyTask myTask = new MyTask(start, end);
            // 执行任务
            forkJoinPool.invoke(myTask);
            // 获得任务执行结果
            System.out.println(myTask.join());
    
            // 一般算法对比看计算结果是否准确
            System.out.println(sum(start,end));
        }
    
    
        static  int sum(int start,int end){
            int sum=0;
            for (int i = start; i < end; i++) {
                sum+=i;
            }
            return sum;
        }
    }
    // 如果需要返回计算结果 则继承 RecursiveTask<T>
    // 如果不需要生成结果,则继承 RecursiveAction
    class MyTask extends RecursiveTask<Integer> {
        // 设置分组每组长度的界限
        static int threshold = 10;
        // 分组起始
        int start;
        // 分组结束
        int end;
    
        public MyTask(int start, int end) {
            this.start = start;
            this.end = end;
        }
    
        @Override
        protected Integer compute() {
            // 小于每组长度就不分了 直接执行计算
            if (end - start <= threshold) {
                int sum = 0;
                for (int i = start; i < end; i++) {
                    sum += i;
                }
                return sum;
            } else {
                // 继续分组
                int mid = (start + end) / 2;
                MyTask first = new MyTask(start, mid);
                MyTask second = new MyTask(mid, end);
                // 执行两个子任务 , 这明明是个任务类,那么线程池是从哪里来的呢, ForkJoinWorkerThread 内有一个 pool ,通过 Thread.currentThread() 拿到线程再拿到 pool
                invokeAll(first, second);
                // 因为这个任务的逻辑是求总和,将两个子任务求和,作为本任务的结果返回
                return first.join() + second.join();
            }
        }
    }
    
    CompletableFuture

    可以在任务完成后,按顺序执行一些工作的 Future

    并没有明白具体怎么用,见 执行器-可完成的Future

    
            // public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
            CompletableFuture completableFuture = new CompletableFuture();
    
            // 每个方法都返回 CompletableFuture ,在 Future 完成后执行后面的操作,且这些方法是非阻塞的
            completableFuture.thenApply((t)->1);
            completableFuture.thenCompose((t)->1);
            //  completableFuture.thenXxxxx 还有很多方法
    
            // 可以实现一个任务完成后按顺序执行一些工作
            completableFuture.thenApply((t)->1).thenCompose((t)->1).thenAccept((t)->{});
    

    相关文章

      网友评论

          本文标题:十三、多线程执行器

          本文链接:https://www.haomeiwen.com/subject/lirqkktx.html