美文网首页
线程池2

线程池2

作者: 永远的太阳0123 | 来源:发表于2018-09-19 11:43 被阅读0次
    1. 线程池
      所有的线程池中都有一个任务队列,使用的是BlockingQueue<Runnable>作为任务的载体。
      active threads - 正在执行的任务
      queued tasks - 任务队列
      completed tasks - 结束的任务

    2. FixedThreadPool
      容量固定的线程池。创建时需要传入线程池的核心容量(也是最大容量)。
      使用场景:大多数情况下使用的线程池都是FixedThreadPool。OS系统和硬件有线程支持的上限,不能随意地、无限制地提供线程池。线程池默认的容量上限是Integer.MAX_VALUE。
      常见的线程池容量:PC - 200。服务器:1000~10000。

    示例:

    public class Test3_FixedThreadPool {
    
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(5);
            for (int i = 0; i < 6; i++) {
                executorService.execute(new Runnable() {
                    public void run() {
                        try {
                            TimeUnit.MILLISECONDS.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName() + " - test executor");
                    }
                });
            }
            // 打印线程池信息
            System.out.println(executorService);
            // 优雅关闭。不是强行关闭,而是不再处理新的任务,将已接收的任务处理完毕后关闭
            executorService.shutdown();
            // 判读是否已经结束, 相当于回收了资源。
            System.out.println(executorService.isTerminated());
            // 判断是否已经关闭, 即是否调用过shutdown方法
            System.out.println(executorService.isShutdown());
            // 打印线程池信息
            System.out.println(executorService);
    
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println(executorService.isTerminated());
            System.out.println(executorService.isShutdown());
            System.out.println(executorService);
        }
    
    }
    

    结果:

    java.util.concurrent.ThreadPoolExecutor@42a57993[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
    false
    true
    java.util.concurrent.ThreadPoolExecutor@42a57993[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
    pool-1-thread-3 - test executor
    pool-1-thread-1 - test executor
    pool-1-thread-5 - test executor
    pool-1-thread-2 - test executor
    pool-1-thread-4 - test executor
    pool-1-thread-3 - test executor
    true
    true
    java.util.concurrent.ThreadPoolExecutor@42a57993[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
    
    1. CachedThreadPool
      没有容量限制的线程池。核心容量为0,最大容量为Integer.MAX_VALUE,生命周期为60秒。
      容量管理策略:如果线程池中的线程空闲达到60秒,线程自动释放。新任务优先使用空闲的线程,如果没有空闲的线程,则新建线程。例如,当执行第二个任务时第一个任务已经执行完毕,会复用执行第一个任务的线程,而不是新建线程。
      应用场景: 内部应用或测试应用。 内部应用,有条件的内部数据瞬间处理时应用,如:电信平台夜间执行数据整理(有把握在短时间内处理完所有工作,且对硬件和软件有足够的信心)。 测试应用,在测试的时候,尝试得到硬件或软件的最高负载量,用于提供FixedThreadPool容量的指导。

    示例一:

    public class Test {
    
        public static void main(String[] args) {
            ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                cachedThreadPool.execute(new Runnable() {
                    public void run() {
                        System.out.println(Thread.currentThread().getName());
                    }
                });
                cachedThreadPool.execute(new Runnable() {
                    public void run() {
                        System.out.println(Thread.currentThread().getName());
                    }
                });
            }
            cachedThreadPool.shutdown();
        }
    
    }
    

    结果:

    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-2
    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-2
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-2
    pool-1-thread-1
    

    示例二:

    public class Test4_CachedThreadPool {
    
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            System.out.println(executorService);
    
            for (int i = 0; i < 5; i++) {
                executorService.execute(new Runnable() {
                    public void run() {
                        try {
                            TimeUnit.MILLISECONDS.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName() + " - test executor");
                    }
                });
            }
    
            System.out.println(executorService);
    
            try {
                TimeUnit.SECONDS.sleep(65);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println(executorService);
        }
    
    }
    

    结果:

    java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
    java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
    pool-1-thread-1 - test executor
    pool-1-thread-2 - test executor
    pool-1-thread-4 - test executor
    pool-1-thread-3 - test executor
    pool-1-thread-5 - test executor
    java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 5]
    
    1. ScheduledThreadPool
      计划任务线程池,即可以自动执行计划任务的线程池。
      创建时需要传入线程池的核心容量,最大容量是Integer.MAX_VALUE,生命周期永久。
      scheduleAtFixedRate(runnable, start_limit, limit, timeunit)方法:
      (1)runnable - 要执行的任务。
      (2)start_limit - 程序开始执行后,时隔多久第一次执行任务。
      (3)limit - 多次执行任务的时间间隔。可以理解为隔了limit,下一个任务开始启动。
      (4)timeunit - 间隔时间的时间单位。
      使用场景: 执行计划任务时,ScheduledThreadPool和DelaydQueue有选择地使用。例如:电信行业中的数据整理,每分钟整理,每小时整理,每天整理等。

    示例:

    public class Test5_ScheduledThreadPool {
    
        public static void main(String[] args) {
            ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
            System.out.println(scheduledExecutorService);
            // 定时完成任务。 scheduleAtFixedRate(runnable, start_limit, limit, timeunit)方法
            scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName());
                }
            }, 0, 300, TimeUnit.MILLISECONDS);
        }
    
    }
    

    结果:

    java.util.concurrent.ScheduledThreadPoolExecutor@70dea4e[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-2
    pool-1-thread-3
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-3
    pool-1-thread-3
    pool-1-thread-3
    pool-1-thread-3
    ······
    
    1. SingleThreadExceutor
      核心容量和最大容量都为1的线程池,生命周期永久。
      使用场景:保证任务顺序时使用。例如: 游戏大厅中的公共频道聊天,秒杀。

    示例:

    public class Test6_SingleThreadExecutor {
    
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            System.out.println(executorService);
            for (int i = 0; i < 5; i++) {
                executorService.execute(new Runnable() {
                    public void run() {
                        try {
                            TimeUnit.MILLISECONDS.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName() + " - test executor");
                    }
                });
            }
        }
    
    }
    

    结果:

    java.util.concurrent.Executors$FinalizableDelegatedExecutorService@70dea4e
    pool-1-thread-1 - test executor
    pool-1-thread-1 - test executor
    pool-1-thread-1 - test executor
    pool-1-thread-1 - test executor
    pool-1-thread-1 - test executor
    
    1. ForkJoinPool
      ForkJoinPool - 分支合并线程池(mapduce类似的设计思想)。 可以使用递归完成复杂任务。要求可分支合并的任务必须是ForkJoinTask类型的子类型。ForkJoinTask类型提供了两个抽象子类型,RecursiveTask有返回结果的分支合并任务,RecursiveAction无返回结果的分支合并任务。
      compute方法:任务的执行逻辑。
      ForkJoinPool没有所谓的容量。默认都是1个线程。根据任务自动的分支新的子线程。当子线程任务结束后,自动合并。所说的自动分支和自动合并是根据fork和join两个方法实现的。
      使用场景: 主要是做科学计算或天文计算的。数据分析的。

    示例:

    public class Test7_ForkJoinPool {
    
        final static int[] numbers = new int[1000000];
        final static int MAX_SIZE = 50000;
        final static Random r = new Random();
    
        static {
            for (int i = 0; i < numbers.length; i++) {
                numbers[i] = r.nextInt(1000);
            }
        }
    
        public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
            // 普通方法
            long result = 0L;
            for (int i = 0; i < numbers.length; i++) {
                result += numbers[i];
            }
            System.out.println(result);
            // 使用ForkJoinPool
            ForkJoinPool pool = new ForkJoinPool();
            AddTask task = new AddTask(0, numbers.length);
            Future<Long> future = pool.submit(task);
            System.out.println(future.get());
        }
    
    }
    
    class AddTask extends RecursiveTask<Long> {
        int begin;
        int end;
    
        public AddTask(int begin, int end) {
            this.begin = begin;
            this.end = end;
        }
        
        protected Long compute() {
            if ((end - begin) < Test7_ForkJoinPool.MAX_SIZE) {
                long sum = 0L;
                for (int i = begin; i < end; i++) {
                    sum += Test7_ForkJoinPool.numbers[i];
                }
                // System.out.println("form " + begin + " to " + end + " sum is : " + sum);
                return sum;
            } else {
                int middle = begin + (end - begin) / 2;
                AddTask task1 = new AddTask(begin, middle);
                AddTask task2 = new AddTask(middle, end);
                task1.fork();// 分支新的子线程。
                task2.fork();
                return task1.join() + task2.join();// 合并。join是一个阻塞方法,一定会获取到任务的结果
            }
        }
    }
    
    1. ThreadPoolExecutor
      除了ForkJoinPool,四种常用的线程池底层都是使用ThreadPoolExecutor实现的。
      其中一个构造方法:public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)
      (1)int corePoolSize - 核心容量。线程池中至少保持多少线程,核心线程不会被回收。
      (2)int maximumPoolSize - 最大容量。线程池中最多含有多少线程。
      (3)long keepAliveTime - 生命周期,即当线程空闲多久之后自动回收。0为永久。
      (4)TimeUnit unit - 声明周期的单位。
      (5)BlockingQueue<Runnable> workQueue - 任务队列,阻塞队列,它的泛型必须是Runnable。
      使用场景: 默认提供的线程池不满足条件时使用。例如:初始线程数4,最大线程数200,线程空闲周期30秒。

    示例:

    public class Test8_ThreadPoolExecutor {
    
        public static void main(String[] args) {
            // 模拟fixedThreadPool。 核心线程5个,最大容量5个,线程的生命周期永久。
            ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>());
    
            for (int i = 0; i < 6; i++) {
                service.execute(new Runnable() {
                    public void run() {
                        try {
                            TimeUnit.MILLISECONDS.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName() + " - test executor");
                    }
                });
            }
    
            System.out.println(service);
            service.shutdown();
            System.out.println(service.isTerminated());
            System.out.println(service.isShutdown());
            System.out.println(service);
    
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println(service.isTerminated());
            System.out.println(service.isShutdown());
            System.out.println(service);
    
        }
    
    }
    

    结果:

    java.util.concurrent.ThreadPoolExecutor@42a57993[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
    false
    true
    java.util.concurrent.ThreadPoolExecutor@42a57993[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
    pool-1-thread-1 - test executor
    pool-1-thread-4 - test executor
    pool-1-thread-5 - test executor
    pool-1-thread-2 - test executor
    pool-1-thread-3 - test executor
    pool-1-thread-1 - test executor
    true
    true
    java.util.concurrent.ThreadPoolExecutor@42a57993[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
    

    相关文章

      网友评论

          本文标题:线程池2

          本文链接:https://www.haomeiwen.com/subject/xzzbnftx.html