美文网首页
ExecutorService|CompletionServic

ExecutorService|CompletionServic

作者: King斌 | 来源:发表于2023-07-23 10:25 被阅读0次

    这段时间对业务系统做了个性能测试,其中使用了较多线程池的技术,故此做一个技术总结。

    这次总结的内容比较多,主要是四个:

    ExecutorService
    CompletionService
    Runnable
    Callable
    前两个是线程池相关接口,后两个是多线程相关接口。在最后,我会说明什么情况下使用哪个接口,这两类接口如何搭配使用。

    Tips:个人拙见,如有不对,请多多指正。

    一、ExecutorService
    ExecutorService是一个接口,继承自Executor。ExecutorService提供了一些常用操作和方法,但是ExecutorService是一个接口,无法实例化。
    不过,Java提供了一个帮助类Executors,可以快速获取一个ExecutorService对象,并使用ExecutorService接口的一些方法。

    Executors帮助类提供了多个构造线程池的方法,常用的分为两类:

    直接执行的
    newCachedThreadPool
    newFixedThreadPool
    newSingleThreadExecutor
    延迟或定时执行的
    newScheduledThreadPool
    newSingleThreadScheduledExecutor
    Executors为每种方法提供了一个线程工厂重载。

    (一)newCachedThreadPool
    创建一个默认的线程池对象,里面的线程和重用,且在第一次使用的时候才创建。可以理解为线程优先模式,来一个创一个线程,直到线程处理完成后,再处理其他的任务。
    Code:

    package com.macro.boot.javaBuiltThreadPool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;
    
    public class MyExecutorService {
        public static void main(String[] args) {
            // 1. 使用帮助类
    //        ExecutorService executorService = Executors.newCachedThreadPool();
    
            // 2. 提交任务
    /*        for (int i = 0; i < 20; i++) {
                executorService.submit(new MyRunnable(i));
            }*/
    
            // 3. 重载方法测试
            test();
        }
    
        private static void test() {
            // 1. 使用帮助类
            ExecutorService executorService = Executors.newCachedThreadPool(
                    new ThreadFactory() {
                        int n = 1;
    
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "线程正在执行 --->" + n++);
                        }
                    }
            );
    
            // 2. 提交任务
            for (int i = 0; i < 20; i++) {
                executorService.submit(new MyRunnable(i));
            }
        }
    }
    
    /**
     * 1. 线程类
     */
    class MyRunnable implements Runnable {
        private int id;
    
        public MyRunnable(int id) {
            this.id = id;
        }
    
        @Override
        public void run() {
            String name = Thread.currentThread().getName();
            System.out.println(name + "正在执行..." + "--->" + id);
        }
    }
    

    输出:几乎是一下子就执行了,newCachedThreadPool会创建和任务数同等匹配的线程,直到处理完成任务的线程可以处理新增的任务。

    (二)newFixedThreadPool
    Code:创建一个可重用固定线程数量的线程池

    package com.macro.boot.javaBuiltThreadPool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;
    
    /**
     * 创建一个可固定重用次数的线程池
     */
    public class MyNewFixedThreadPool {
        public static void main(String[] args) {
    /*        // nThreads:线程数量
            ExecutorService es = Executors.newFixedThreadPool(5);
            for (int i = 0; i < 10; i++) {
                es.submit(new MyRunnable(i));
            }*/
            test();
        }
    
        private static void test() {
            ExecutorService es = Executors.newFixedThreadPool(5, new ThreadFactory() {
                int n = 1;
    
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "线程" + n++);
                }
            });
            // 提交任务
            for (int i = 0; i < 10; i++) {
                es.submit(new MyRunnable(i));
            }
        }
    }
    

    (三)newSingleThreadExecutor
    只有一个线程(线程安全)

    package com.macro.boot.javaBuiltThreadPool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;
    
    public class MyNewSingleThreadExecutor {
        public static void main(String[] args) throws InterruptedException {
    /*        ExecutorService es = Executors.newSingleThreadExecutor();
            for (int i = 0; i < 10; i++) {
                es.submit(new MyRunnable(i));
            }*/
            test();
        }
    
        private static void test() throws InterruptedException {
            ExecutorService es = Executors.newSingleThreadExecutor(new ThreadFactory() {
                int n = 1;
    
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "线程" + n++);
                }
            });
            for (int i = 0; i < 10; i++) {
                Thread.sleep(100);
                es.submit(new MyRunnable(i));
            }
        }
    }
    

    (四)newScheduledThreadPool
    怎么理解这个线程池的延迟时间?很简单,第一次执行的开始时间,加上延迟的时间,就是第二次执行的时间。

    package com.macro.boot.ScheduledExecutorService;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class MyScheduledExecutor {
        public static void main(String[] args) {
            ScheduledExecutorService sec = Executors.newScheduledThreadPool(4);
            for (int i = 0; i < 10; i++) {
                sec.schedule(new MyRunnable(i), 1, TimeUnit.SECONDS);
            }
            System.out.println("开始执行。。。");
            sec.shutdown();
        }
    }
    
    class MyRunnable implements Runnable {
        private int id;
    
        @Override
        public String toString() {
            return "MyRunnable{" +
                    "id=" + id +
                    '}';
        }
    
        public MyRunnable(int id) {
            this.id = id;
        }
    
        @Override
        public void run() {
            String name = Thread.currentThread().getName();
            System.out.println(name + "执行了任务" + id);
        }
    }
    

    (五)newSingleThreadScheduledExecutor
    newSingleThreadScheduledExecutor和newScheduledThreadPool的区别是,newSingleThreadScheduledExecutor的第二次执行时间,等于第一次开始执行的时间,加上执行线程所耗费的时间,再加上延迟时间,即等于第二次执行的时间。

    二、CompletionService
    CompletionService是一个接口。
    当我们使用ExecutorService启动多个Callable时,每个Callable返回一个Future,而当我们执行Future的get方法获取结果时,会阻塞线程直到获取结果。
    而CompletionService正是为了解决这个问题,它是Java8的新增接口,它的实现类是ExecutorCompletionService。CompletionService会根据线程池中Task的执行结果按执行完成的先后顺序排序,任务先完成的可优先获取到。
    Code:

    package com.macro.boot.completions;
    
    import java.util.concurrent.*;
    
    public class CompletionBoot {
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            // 实例化线程池
            ExecutorService es = Executors.newCachedThreadPool();
            ExecutorCompletionService<Integer> ecs = new ExecutorCompletionService<>(es);
    
            for (int i = 0, j = 3; i < 20; i++) {
                ecs.submit(new CallableExample(i, j));
            }
            for (int i = 0; i < 20; i++) {
                // take:阻塞方法,从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会阻塞,直到有任务完成返回结果。
                Integer integer = ecs.take().get();
                // 从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会返回null,该方法不会阻塞。
                // Integer integer = ecs.poll().get();
                System.out.println(integer);
            }
            // 不要忘记关闭线程池
            es.shutdown();
        }
    }
    class CallableExample implements Callable<Integer> {
        /**
         * 使用构造方法获取变量
         * */
        private int a;
        private int b;
    
        public CallableExample(int a, int b) {
            this.a = a;
            this.b = b;
        }
    
        @Override
        public Integer call() throws Exception {
            return a + b;
        }
    
        @Override
        public String toString() {
            return "CallableExample{" +
                    "a=" + a +
                    ", b=" + b +
                    '}';
        }
    }
    

    三、Runnable
    Runnable和Callable两者都是接口,但是也有区别:

    实现Callable接口的任务线程能返回执行结果;而实现Runnable接口的任务线程不能返回结果;(重点)
    Callable接口的call()方法允许抛出异常;而Runnable接口的run()方法的异常只能在内部消化,不能继续上抛;
    Code:

    class MyRunnable02 implements Runnable {
        private int i;
    
        public MyRunnable02(int i) {
            this.i = i;
        }
    
        @Override
        public void run() {
            String name = Thread.currentThread().getName();
            System.out.println(name + "执行了... ---> " + i);
        }
    
        @Override
        public String toString() {
            return "MyRunnable{" +
                    "i=" + i +
                    '}';
        }
    }
    

    四、Callable
    Code:

    class CallableExample implements Callable<Integer> {
        /**
         * 使用构造方法获取变量
         * */
        private int a;
        private int b;
    
        public CallableExample(int a, int b) {
            this.a = a;
            this.b = b;
        }
    
        @Override
        public Integer call() throws Exception {
            return a + b;
        }
    
        @Override
        public String toString() {
            return "CallableExample{" +
                    "a=" + a +
                    ", b=" + b +
                    '}';
        }
    }
    

    五、Example
    本次Demo:使用线程池,循环查询数据库500次。
    在最开始的时候,是使用ExecutorServer + Future.get(因为查询数据库肯定需要获取结果,所以必须要用Callable,并且get到结果集)。但是get的阻塞操作,实在是太影响速度了,虽然考虑了两种手段去解决,但是都不了了之。
    Code:(只贴线程池的代码,线程类和获取连接的类就不放了)

    private void executorServerStart() throws SQLException, ClassNotFoundException, ExecutionException, InterruptedException {
            // get con
            TDConUtils tdConUtils = new TDConUtils();
            Connection con = tdConUtils.getCon();
            Statement statement = con.createStatement();
    
            // SQL
            String sql = "select last_row(value_double) from db1.tb1;";
    
            // ThreadPool
            ExecutorService es = Executors.newCachedThreadPool();
    
            // for each
            int count = 500;
            for (int i = 0; i < count; i++) {
                Future<ResultSet> submit = es.submit(new MyThread(i, con, sql));
                ResultSet resultSet = submit.get();
                // print
                while (resultSet.next()) {
                    System.out.printf("输出:时间:%s,值:%f \n", resultSet.getTimestamp(1)
                            , resultSet.getDouble(2));
                }
            }
            es.shutdown();
    
            // close resources
            tdConUtils.close(con, statement);
        }
    

    运行时间:8000ms +
    改CompletionService:
    Code:

    private void completionServerStart() throws SQLException, ClassNotFoundException, InterruptedException, ExecutionException {
            // get con
            TDConUtils tdConUtils = new TDConUtils();
            Connection con = tdConUtils.getCon();
            Statement statement = con.createStatement();
    
            // SQL
            String sql = "select last_row(value_double) from db1.tb1;";
    
            // ThreadPool
            ExecutorService es = Executors.newCachedThreadPool();
    
            //构建ExecutorCompletionService,与线程池关联
            ExecutorCompletionService<ResultSet> ecs = new ExecutorCompletionService<ResultSet>(es);
            // for each
            int count = 500;
    
            for (int i = 0; i < count; i++) {
                ecs.submit(new MyThread(i, con, sql));
            }
            for (int i = 0; i < count; i++) {
                // 通过take获取Future结果,此方法会阻塞
                ResultSet resultSet = ecs.take().get();
                while (resultSet.next()) {
                    System.out.printf("输出:时间:%s,值:%f \n", resultSet.getTimestamp(1)
                            , resultSet.getDouble(2));
                }
            }
    
            es.shutdown();
            tdConUtils.close(con, statement);
        }
    

    运行时间:300+ms

    六、使用小结
    分情况。
    如果需要获取结果:线程使用Callable;
    如果需要异步获取结果:线程池使用CompletionService。
    如果不需要获取结果:线程使用Runnable;
    如果需要阻塞获取结果:线程池使用ExecutorService。

    相关文章

      网友评论

          本文标题:ExecutorService|CompletionServic

          本文链接:https://www.haomeiwen.com/subject/opbludtx.html