线程池

作者: arkliu | 来源:发表于2022-12-16 09:08 被阅读0次

    ThreadPoolExecutor参数解释

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
    
    • corePoolSize 核心线程数
    • maximumPoolSize 最大线程数
    • keepAliveTime 最大空闲时间
    • unit 时间单位
    • workQueue 阻塞队列
    • threadFactory 创建线程的工厂类
    • handler 饱和处理机制

    线程池的执行流程

    image.png

    ThreadPoolExecutor demo

    package com.test.pool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ExePollTest {
    
        public static void main(String[] args) {
            // 自定义线程池
            // 最大线程该如何定义:
            // 1. cpu密集型:cpu几核就是几,可以保持cpu效率最高
            System.out.println(Runtime.getRuntime().availableProcessors());
            // 2. io密集型:判断程序中,十分消耗io的线程的两倍
            ExecutorService service = new ThreadPoolExecutor(
                        2,
                        5,
                        3,
                        TimeUnit.SECONDS,
                        new LinkedBlockingDeque<>(3),
                        Executors.defaultThreadFactory(),
                        new ThreadPoolExecutor.DiscardOldestPolicy()
                    );
            try {
                // 最大承载线程数:队列大小+最大线程数量
                // AbortPolicy策略:超过RejectedExecutionException
                // CallerRunsPolicy策略: 超过最大承载线程数的部分,由调用该线程的线程执行
                // DiscardOldestPolicy策略: 超过最大承载线程数,不会抛出异常,丢掉任务
                // DiscardOldestPolicy策略: 超过最大承载线程数,尝试和最早的线程竞争,不会抛出异常
                for (int i = 1; i <= 9 ; i++) {
                    // 使用了线程池之后,使用线程池来创建线程
                    service.execute(new Runnable() {
                        
                        @Override
                        public void run() {
                            System.out.println(Thread.currentThread().getName()+"  ok");
                        }
                    });
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 关闭线程池
                service.shutdown();
            }
        }
    }
    
    

    java内置线程池

    执行ExecutorService

    execute(Runnable)

    ExecutorService fixedExecutor = Executors.newSingleThreadExecutor();
    fixedExecutor.execute(new MyRunnable(1));
    fixedExecutor.shutdown();
    

    submit(Runnable)

    ExecutorService fixedExecutor = Executors.newSingleThreadExecutor();
    Future future = fixedExecutor.submit(new MyRunnable(1));
    try {
        //如果任务执行完成,future.get()方法会返回一个null,future.get()可能阻塞
        System.out.println(future.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    fixedExecutor.shutdown();
    

    submit(Callable)

    ExecutorService fixedExecutor = Executors.newSingleThreadExecutor();
    Future<String> future = fixedExecutor.submit(new Callable<String>() {
    
        @Override
        public String call() throws Exception {
            return "hello world..";
        }
    });
    try {
        //如果任务执行完成,future.get()方法会返回一个null,future.get()可能阻塞
        System.out.println(future.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    fixedExecutor.shutdown();
    
    image.png

    invokeAny

    invokeAny会返回所有Callable任务中第一个得到执行完的Callable的结果作为返回值

    // 1. 创建3个固定线程的线程池
    ExecutorService fixedExecutor = Executors.newFixedThreadPool(3);
    // 2. 创建listCallable 并初始化三个Callable
    List<Callable<String>>listCallable = new ArrayList<Callable<String>>();
    listCallable.add(new Callable<String>() {
    
        @Override
        public String call() throws Exception {
            System.out.println("first runs..");
            return "first";
        }
    });
    
    listCallable.add(new Callable<String>() {
    
        @Override
        public String call() throws Exception {
            System.out.println("second runs..");
            return "second";
        }
    });
    
    listCallable.add(new Callable<String>() {
    
        @Override
        public String call() throws Exception {
            System.out.println("third runs..");
            return "third";
        }
    });
    String future = null;
    try {
        //3. 执行invokeAny,会返回上面listCallable的任意一个Callable的返回值
        future = fixedExecutor.invokeAny(listCallable);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    System.out.println(future);
    fixedExecutor.shutdown();
    
    image.png

    invokeAll

    invokeAll会返回一个Future的List,其中对应着每个Callable任务执行后的Future对象

    // 1. 创建3个固定线程的线程池
    ExecutorService fixedExecutor = Executors.newFixedThreadPool(3);
    // 2. 创建listCallable 并初始化三个Callable
    List<Callable<String>>listCallable = new ArrayList<Callable<String>>();
    listCallable.add(new Callable<String>() {
    
        @Override
        public String call() throws Exception {
            System.out.println("first runs..");
            return "first";
        }
    });
    
    listCallable.add(new Callable<String>() {
    
        @Override
        public String call() throws Exception {
            TimeUnit.SECONDS.sleep(2);
            System.out.println("second runs..");
            return "second";
        }
    });
    
    listCallable.add(new Callable<String>() {
    
        @Override
        public String call() throws Exception {
            System.out.println("third runs..");
            return "third";
        }
    });
    List<Future<String>> futures = null;
    try {
        futures = fixedExecutor.invokeAll(listCallable);
        
        for (Future<String> future : futures) {
            System.out.println("result:"+future.get());
        }
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    fixedExecutor.shutdown();
    
    image.png

    newCachedThreadPool

    newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。看下面栗子:

    新建一个MyRunnable

    class MyRunnable implements Runnable {
        private int id;
        
        public MyRunnable(int id) {
            this.id = id;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+" runs..."+id);
        }
        
        @Override
        public String toString() {
            return ""+id;
        }
    }
    

    测试

    public static void testCachedThreadPool01() {
            //1. 使用工程类获取线程池对象
            ExecutorService cacheExecutor = Executors.newCachedThreadPool();
            //2. 提交任务
            for (int i = 1; i <= 10; i++) {
                cacheExecutor.execute(new MyRunnable(i));
            }
        }
    
    image.png
    // 使用工厂类获取线程池对象
        public static void testCachedThreadPool02() {
            ExecutorService cacheExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
                int n = 1;
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "线程"+(n++));
                }
            });
            //2. 提交任务
            for (int i = 1; i <= 10; i++) {
                cacheExecutor.execute(new MyRunnable(i));
            }
        }
    
    image.png

    newFixedThreadPool

    最多n个线程将处于活动状态。如果提交了n个以上的线程,那么它们将保持在队列中,直到线程可用

    public static void testnewFixedThreadPool01() {
            // 最多创建3个线程
            ExecutorService fixedExecutor = Executors.newFixedThreadPool(3);
            //2. 提交任务
            for (int i = 1; i <= 10; i++) {
                fixedExecutor.submit(new MyRunnable(i));
            }
        }
    
    image.png

    工厂类获取线程池对象

    // 使用工厂类获取线程池对象
        public static void testnewFixedThreadPool02() {
            ExecutorService cacheExecutor = Executors.newFixedThreadPool(3, new ThreadFactory() {
                int n = 1;
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "线程"+(n++));
                }
            });
            //2. 提交任务
            for (int i = 1; i <= 10; i++) {
                cacheExecutor.execute(new MyRunnable(i));
            }
        }
    
    image.png

    newSingleThreadExecutor

    创建一个核心线程,并且最大线程也是1个,同时它具有一个无边界的阻塞队列LinkedBlockingQueue

    public static void testnewSingleThreadExecutor01() {
            ExecutorService fixedExecutor = Executors.newSingleThreadExecutor();
            //2. 提交任务
            for (int i = 1; i <= 10; i++) {
                fixedExecutor.submit(new MyRunnable(i));
            }
        }
    
    image.png
    // 使用工厂类获取线程池对象
        public static void testnewSingleThreadExecutor02() {
            ExecutorService cacheExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
                int n = 1;
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "线程"+(n++));
                }
            });
            //2. 提交任务
            for (int i = 1; i <= 10; i++) {
                cacheExecutor.execute(new MyRunnable(i));
            }
        }
    
    image.png

    shutdown方法

    shutdown将线程池的状态设置为SHUTWDOWN状态,正在执行的任务会继续执行下去,没有被执行的则中断,并抛出RejectedExecutionException异常

    ExecutorService singleExecutor = Executors.newSingleThreadExecutor();
    singleExecutor.execute(()->{
        System.out.println("thread1 runs..");
    });
    singleExecutor.shutdown();
    // 上面已经shutdown了,后续加入进来执行的线程,将抛出RejectedExecutionException
    singleExecutor.execute(()->{
        System.out.println("thread2 runs..");
    });
    
    image.png

    shutdownNow

    shutdownNow则是将线程池的状态设置为STOP,正在执行的任务则被停止,没被执行任务的则返回

    ExecutorService singleExecutor = Executors.newSingleThreadExecutor();
    for (int i = 1; i <= 10; i++) {
        singleExecutor.submit(new MyRunnable(i));
    }
    List<Runnable>list = singleExecutor.shutdownNow();
    for (Runnable runnable : list) {
        System.out.println("线程:"+runnable+" 被取消了");
    }
    
    image.png

    延迟或定期执行任务

    延迟两秒执行任务

    ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(3);
            scheduledExecutor.schedule(()->{
                System.out.println("scheduledExecutor runs..");
            }, 2, TimeUnit.SECONDS);
    

    间隔执行

    ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(3, new ThreadFactory() {
        int n = 1;
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "线程:"+(n++));
        }
    });
    scheduledExecutor.scheduleAtFixedRate(new MyRunnable(1), 2, 2, TimeUnit.SECONDS);
    
    2.gif

    相关文章

      网友评论

          本文标题:线程池

          本文链接:https://www.haomeiwen.com/subject/dzesxdtx.html