美文网首页Java多线程系列
多线程并发框架使用四

多线程并发框架使用四

作者: 丹青水 | 来源:发表于2018-03-23 10:08 被阅读0次
    Executor

    不同于 Thread 类将任务和执行耦合在一起, executor 将任务本身和执行任务分离,接口只有一个方法 void execute(Runnable command);

    ExecutorService

    ExecutorService 接口 对 Executor 接口进行了扩展,提供了返回 Future 对象,终止,关闭线程池等方法。当调用 shutDown 方法时,线程池会停止接受新的任务,但会完成正在 pending 中的任务。
    Future 对象提供了异步执行,这意味着无需等待任务执行的完成,只要提交需要执行的任务,然后在需要时检查 Future 是否已经有了结果,如果任务已经执行完成,就可以通过 Future.get() 方法获得执行结果。Future.get() 方法是一个阻塞式的方法,如果调用时任务还没有完成,会等待直到任务执行结束。通过 ExecutorService.submit() 方法返回的 Future 对象,还可以取消任务的执行。Future 提供了 cancel() 方法用来取消执行 pending 中的任务。相关的方法如下图。


    Executors

    Executors 是一个工具类,类似于 Collections,是线程池的工厂类。
    1.newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

    2.newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

    3.newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。

    4.newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    自定义线程池

    ThreadPoolExecuto

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    
    例子1(异步执行获取结果)
    public class ExecutorTest {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(2);
            List<Future<String>> resultList = new ArrayList<Future<String>>();
            // 创建10个任务并执行
            for (int i = 0; i < 100; i++) {
                // 使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中
                Future<String> future = executorService.submit(new TaskWithResult(i));
                // 将任务执行结果存储到List中
                resultList.add(future);
            }
            executorService.shutdown();
            // 遍历任务的结果
            for (Future<String> fs : resultList) {
                try {
                    System.out.println(fs.get()); // 打印各个线程(任务)执行的结果
                } catch (InterruptedException e) {
    
                } catch (ExecutionException e) {
                }
            }
            System.out.println("所有执行完了");
        }
    }
    class TaskWithResult implements Callable<String> {
        private int id;
        public TaskWithResult(int id) {
            this.id = id;
        }
        @Override
        public String call() throws Exception {
            System.out.println("call()方法被自动调用,干活!!!             " + Thread.currentThread().getName());
            Thread.sleep(1000);
            return "call()方法被自动调用,任务的结果是:" + id + "    " + Thread.currentThread().getName();
        }
    }
    
    例子2(定时任务)
    public class ScheduledThreadPoolDemo {
        public static  void main (String[] args)throws Exception{
            ScheduledExecutorService executor= Executors.newScheduledThreadPool(1);
            executor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run()  {
                    System.out.println("over");
                }
            }, 0, 1, TimeUnit.SECONDS);
        }
    }
    
    例子三(获取结果无阻塞)

    当向Executor提交批处理任务时,并且希望在它们完成后获得结果,如果用FutureTask,你可以循环获取task,并用future.get()去获取结果,但是如果这个task没有完成,你就得阻塞在这里,这个实效性不高,其实在很多场合,其实你拿第一个任务结果时,此时结果并没有生成并阻塞,其实在阻塞在第一个任务时,第二个task的任务已经早就完成了,显然这种情况用future task不合适的,效率也不高的。

    public static void main(String[] args) throws ExecutionException {
        /**
             *CompletionService与ExecutorService最主要的区别在于 
             *前者submit的task不一定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装, 
             *内部维护一个保存Future对象的BlockingQueue。 
             *只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。 
             *它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。 
             *所以,先完成的必定先被取出。这样就减少了不必要的等待时间。 
        **/
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
        // 创建10个任务并执行
        for (int i = 0; i < 100; i++) {
            // 使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中
             completionService.submit(new TaskWithResult(i));
        }
        // 遍历任务的结果
        for (int i = 0; i < 100; i++) {
            try {
                Future<String> future = completionService.take();
                System.out.println(future.get()); // 打印各个线程(任务)执行的结果
            } catch (InterruptedException e) {
    
            }
        }
        executorService.shutdown();
        System.out.println("所有执行完了");
    }
    }
    class TaskWithResult implements Callable<String> {
        private int id;
        public TaskWithResult(int id) {
            this.id = id;
        }
        @Override
        public String call() throws Exception {
            System.out.println("call()方法被自动调用,干活!!!             " + Thread.currentThread().getName());
            Thread.sleep(1000);
            return "call()方法被自动调用,任务的结果是:" + id + "    " + Thread.currentThread().getName();
        }
    }
    

    相关文章

      网友评论

        本文标题:多线程并发框架使用四

        本文链接:https://www.haomeiwen.com/subject/oqzvqftx.html