美文网首页
java多线程与高并发(八)线程池

java多线程与高并发(八)线程池

作者: 小偷阿辉 | 来源:发表于2021-04-27 09:24 被阅读0次

    1.回顾

    前面一节介绍了并发容器和队列的内容,基于上次介绍关于队列的内容,才能更好的了解线程池的原理
    开始介绍线程池之前,先看一道华为面试题:
    两个线程,第一个线程从1到26,第二个线程从A到Z,交替顺序输出
    LockSupport实现

    package com.learn.thread.seven;
    
    import java.util.concurrent.locks.LockSupport;
    
    /**
     * @author zglx
     *
     * 面试题,交替输出
     */
    public class TestQuestion {
        static Thread t1 = null;
        static Thread t2 = null;
    
        public static void main(String[] args) {
            char[] a1 = "12345678".toCharArray();
            char[] a2 = "abcdefgh".toCharArray();
            t1 = new Thread(() -> {
                for (char item : a1) {
                    System.out.println(item);
                    // 叫醒t2
                    LockSupport.unpark(t2);
                    // t1阻塞
                    LockSupport.park();
                }
            },"t1");
            t2 = new Thread(() -> {
                for (char item : a2) {
                    // t2阻塞
                    LockSupport.park();
                    // 打印值
                    System.out.println(item);
                    // 叫醒t2
                    LockSupport.unpark(t1);
    
                }
            },"t2");
            t1.start();
            t2.start();
        }
    }
    
    
    

    wait 和 nofity的实现

    package com.learn.thread.seven;
    
    import java.util.concurrent.locks.LockSupport;
    
    public class TestQueue2 {
    
        private static final Object object = new Object();
    
        public static void main(String[] args) {
            char[] a1 = "12345678".toCharArray();
            char[] a2 = "abcdefgh".toCharArray();
            new Thread(() -> {
                synchronized (object) {
                    for (char item : a1) {
                        System.out.println(item);
                        try {
                            object.notify();
                            object.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    // 最终两个线程终归有一个是wait的,阻塞在这里不懂
                    object.notify();
                }
            },"t1").start();
            new Thread(() -> {
                synchronized (object) {
                    for (char item : a2) {
                        System.out.println(item);
                        try {
                            object.notify();
                            object.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    // 为什么这里要唤醒
                    object.notify();
                }
    
            },"t2").start();
        }
    }
    
    
    

    第二个线程比第一个线程先执行

    package com.learn.thread.seven;
    
    /**
     * @author zglx
     * 这里用cas自旋的方式去实现第二个线程比第一个线程先执行
     */
    public class TestQueue3 {
        private static volatile boolean status = false;
        private static final Object object = new Object();
        public static void main(String[] args) {
            char[] a1 = "12345678".toCharArray();
            char[] a2 = "abcdefgh".toCharArray();
            new Thread(() -> {
                synchronized (object) {
                    // 第一个线程上来就等待
                    while (!status) {
                        try {
                            object.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    for (char item : a1) {
                        System.out.println(item);
                        try {
                            object.notify();
                            object.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    object.notify();
                }
            }).start();
    
            new Thread(() -> {
                synchronized (object) {
                    for (char item : a2) {
                        System.out.println(item);
                        status = true;
                        try {
                            object.notify();
                            object.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    object.notify();
                }
            }).start();
        }
    }
    

    Condition 实现Synchronized

    package com.learn.thread.seven;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class TestQueue5 {
        static char[] a1 = "12345678".toCharArray();
        static char[] a2 = "abcdefgh".toCharArray();
        private static ReentrantLock reentrantLock = new ReentrantLock();
        private static Condition condition = reentrantLock.newCondition();
        public static void main(String[] args) {
            new Thread(() -> {
                try {
                    reentrantLock.lock();
                    for (char item : a1) {
                        System.out.println(item);
                        // signal 相当于nofity
                        condition.signal();
                        // await 相当于 wait
                        condition.await();
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                    reentrantLock.unlock();
                }
            }).start();
            new Thread(() -> {
                try {
                    reentrantLock.lock();
                    for (char item : a2) {
                        System.out.println(item);
                        // signal 相当于nofity
                        condition.signal();
                        // await 相当于 wait
                        condition.await();
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                    reentrantLock.unlock();
                }
                condition.signal();
    
            }).start();
        }
    }
    

    但是一个Condition 就是一个等待队列,既然有两个线程,那么就完全可以用生产者消费者的模式去实现,那么就需要两个Condition

    package com.learn.thread.seven;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class TestQueue6 {
        static char[] a1 = "12345678".toCharArray();
        static char[] a2 = "abcdefgh".toCharArray();
        private static ReentrantLock reentrantLock = new ReentrantLock();
        private static Condition condition1 = reentrantLock.newCondition();
        private static Condition condition2 = reentrantLock.newCondition();
        public static void main(String[] args) {
            new Thread(() -> {
                try {
                    reentrantLock.lock();
                    for (char item : a1) {
                        System.out.println(item);
                        // signal 相当于nofity
                        condition2.signal();
                        // await 相当于 wait
                        condition1.await();
                    }
                    condition2.signal();
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                    reentrantLock.unlock();
                }
            }).start();
            new Thread(() -> {
                try {
                    reentrantLock.lock();
                    for (char item : a2) {
                        System.out.println(item);
                        // signal 相当于nofity
                        condition1.signal();
                        // await 相当于 wait
                        condition2.await();
                    }
                    condition1.signal();
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                    reentrantLock.unlock();
                }
            }).start();
        }
    }
    

    自旋锁实现

    线程取两个值t1,t2,定义一个ReadyToRun的变量
    刚开始是t1,相当于一个信号灯r,某一时刻只能取一个值,不能同时取到两个线程
    程序上来就判断是不是t1 如果不是就占用cpu等待,如果一看是t1就打印值,然后把r的值改成t2,打印完之后r又变成了t1。就这么交替的玩法
    注意信号灯r要为volatile,保证线程之间的可见性,我们把信号灯设置为枚举,防止它取别的值

    package com.learn.thread.seven;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class TestQueue7 {
        enum ReadyToRun{
            T1,
            T2
        }
        // 保证线程可见性,并且保证只能是两个值
        static volatile ReadyToRun readyToRun = ReadyToRun.T1;
        static char[] a1 = "12345678".toCharArray();
        static char[] a2 = "abcdefgh".toCharArray();
        public static void main(String[] args) {
            new Thread(() -> {
                try {
                    for (char item : a1) {
                        // cas 无锁化,如果不是T1,什么都不做
                        while (readyToRun != ReadyToRun.T1) {
    
                        }
                        System.out.println(item);
                        // 信号灯交替给t2
                        readyToRun = ReadyToRun.T2;
    
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                }
            }).start();
            new Thread(() -> {
                try {
                    for (char item : a2) {
                        while (readyToRun != ReadyToRun.T2) {
    
                        }
    
                        System.out.println(item);
                        // 信号灯交换给T1
                        readyToRun = ReadyToRun.T1;
    
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                }
            }).start();
        }
    }
    
    

    BlockingQueue的玩法

    前面说到过Queue 的玩法,可以支持多线程阻塞操作,有put和take操作,put满的时候会被阻塞住,take 的时候,如果没有就会阻塞

    实例两个BlockingQueue,都是ArrayBlockingQueue数组实现的,并且长度都是1
    相当于两个容器,这两个容器里头放两个值,这两个值比如说我第一个线程打印出了1我就往第一个容器里放一个ok,然后另外一个线程就盯着这个容器,有值了就立马打印A,并且同样往另外一个容器放一个ok,这样第一个线程监听的容器就有值,打印出2,如此循环下去。

    package com.learn.thread.seven;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    public class TestQueue8 {
        // 保证线程可见性,并且保证只能是两个值
        static char[] a1 = "12345678".toCharArray();
        static char[] a2 = "abcdefgh".toCharArray();
        private static BlockingQueue blockingQueue1 = new ArrayBlockingQueue(1);
        private static BlockingQueue blockingQueue2 = new ArrayBlockingQueue(1);
        public static void main(String[] args) {
            new Thread(() -> {
                try {
                    for (char item : a1) {
                        System.out.println(item);
                        // 赋值给第二个容器
                        blockingQueue2.put("ok");
                        // 开始监听
                        blockingQueue1.take();
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                }
            }).start();
            new Thread(() -> {
                try {
                    for (char item : a2) {
                        blockingQueue2.take();
                        System.out.println(item);
                        blockingQueue1.put("ok");
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                }
            }).start();
        }
    }
    
    

    TransferQueue 优化

    之前说过TransferQueue 可以跟exchange一样做线程数据交换
    第一个线程上来就take 等待第二个线程给值,拿到值打印,又经过transferQueue传1给第二个线程
    第二个线程上来就经过transfer 把自己变量扔进去,第一个线程立马take到了,拿出来打印。然后等待第一个线程给值。如此循环

    package com.learn.thread.seven;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedTransferQueue;
    import java.util.concurrent.TransferQueue;
    
    public class TestQueue9 {
        // 保证线程可见性,并且保证只能是两个值
        static char[] a1 = "12345678".toCharArray();
        static char[] a2 = "abcdefgh".toCharArray();
        private static TransferQueue<Character> transferQueue = new LinkedTransferQueue<>();
        public static void main(String[] args) {
            new Thread(() -> {
                try {
                    for (char item : a1) {
                        // 取值
                        System.out.println(transferQueue.take());
                        // 传值
                        transferQueue.transfer(item);
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                }
            }).start();
            new Thread(() -> {
                try {
                    for (char item : a2) {
                        transferQueue.transfer(item);
                        System.out.println(transferQueue.take());
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                }
            }).start();
        }
    }
    

    2.线程池

    线程池有几个接口,第一个接口是Executor,第二个是ExecutorService继承自Executor,他们的最终子类才是ThreadPoolExecutor,如下图


    线程池继承关系

    Executor 执行者 有一个方法叫做执行,执行的东西就是Runnable 是一个接口,可以有很多实现。因此可以说这里只是定义了任务的执行是Runnable,而执行交给了实现Executor的类。这里不像我们以前定义一个Thread,new 一个Thread然后去重写它的Run方法,然后start分开执行。

    //
    // Source code recreated from a .class file by IntelliJ IDEA
    // (powered by FernFlower decompiler)
    //
    
    package java.util.concurrent;
    
    public interface Executor {
        void execute(Runnable var1);
    }
    

    ExecutorService 是Executor的继承,它除了父类execute方法外,还完善了整个任务执行器的一个生命周期。拿线程池举例,一个线程池里面一堆线程就是一堆的工人,执行完一个任务后这个线程该怎么结束,ExecutorService就定义了这么一些个方法

    package com.learn.thread.five;
    
    
    import java.util.List;
    import java.util.Collection;
    import java.util.concurrent.*;
    public interface ExecutorService extends Executor {
    
        // 结束
        void shutdown();
    
        // 马上结束
        List<Runnable> shutdownNow();
    
        // 是否结束了
        boolean isShutdown();
    
        // 是不是整体都执行完了
        boolean isTerminated();
    
        // 等着结束,等多长时间,时间到了还不结束的话他就返回false
        boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException;
    
        // 提交任务,返回结果放在Future中
        <T> Future<T> submit(Callable<T> task);
    
        // 提交任务,返回结果放在Future中
        <T> Future<T> submit(Runnable task, T result);
    
        // 提交任务,返回结果放在Future中
        Future<?> submit(Runnable task);
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
                throws InterruptedException;
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
                throws InterruptedException;
    
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
                throws InterruptedException, ExecutionException;
    
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    主要看任务submit方法,这里涉及了入参Callable和出参Future

    2.1.Callable

    /*
     * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     *
     * This code is free software; you can redistribute it and/or modify it
     * under the terms of the GNU General Public License version 2 only, as
     * published by the Free Software Foundation.  Oracle designates this
     * particular file as subject to the "Classpath" exception as provided
     * by Oracle in the LICENSE file that accompanied this code.
     *
     * This code is distributed in the hope that it will be useful, but WITHOUT
     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     * version 2 for more details (a copy is included in the LICENSE file that
     * accompanied this code).
     *
     * You should have received a copy of the GNU General Public License version
     * 2 along with this work; if not, write to the Free Software Foundation,
     * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     *
     * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     * or visit www.oracle.com if you need additional information or have any
     * questions.
     */
    
    /*
     * This file is available under and governed by the GNU General Public
     * License version 2 only, as published by the Free Software Foundation.
     * However, the following notice accompanied the original version of this
     * file:
     *
     * Written by Doug Lea with assistance from members of JCP JSR-166
     * Expert Group and released to the public domain, as explained at
     * http://creativecommons.org/publicdomain/zero/1.0/
     */
    
    package java.util.concurrent;
    
    /**
     * A task that returns a result and may throw an exception.
     * Implementors define a single method with no arguments called
     * {@code call}.
     *
     * <p>The {@code Callable} interface is similar to {@link
     * java.lang.Runnable}, in that both are designed for classes whose
     * instances are potentially executed by another thread.  A
     * {@code Runnable}, however, does not return a result and cannot
     * throw a checked exception.
     *
     * <p>The {@link Executors} class contains utility methods to
     * convert from other common forms to {@code Callable} classes.
     *
     * @see Executor
     * @since 1.5
     * @author Doug Lea
     * @param <V> the result type of method {@code call}
     */
    @FunctionalInterface
    public interface Callable<V> {
        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }
    
    

    里边有一个call方法,并且带有返回值,跟run方法一样,只不过run方法没有返回值。

    2.2.Future

    Future 代表的就是线程池执行完runnable 后返回的结果,Future表示的就是未来的执行结果,如果任务还没有被执行完就调用get方法,就会造成线程阻塞。
    其实更灵活的用法就是FutureTask即是一个Future 同时又是一个Task,因为Callable只能是一个Task 执行一个任务,但是不能作为一个Future来用FutureTask 实现了RunnbaleFuture ,RunnbaleFuture实现了Runnable 有实现了Future,所以它是一个任务又是一个Future,后面WorkStealingPool,ForkJoinPool 基本都会用到FutureTask。

    package com.learn.thread.seven;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.FutureTask;
    
    public class TestFuture {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            FutureTask<Integer> future = new FutureTask<Integer>(() -> {
                Thread.sleep(1000);
                return 100;
            });
            new Thread(future).start();
            // 会造成阻塞
            System.out.println(future.get());
        }
    }
    

    小结

    Callable类似于Runnable 但是有返回值
    了解了Future 用于存储执行的将来才会产生的结果
    FutureTask他是Future 加上Runnable 既可以执行也可以存结果
    CompletableFuture 管理多个Future 的结果
    CompletableFuture
    它用来组合各种不同的任务,等这个任务执行完产生一个结果后进行一个组合,基于java8之前将的价格查询器,用小程序再模拟一个。

    定义三个future分别代表淘宝、京东、天猫,用了CompletableFuture的一个方法叫supplyAsync产生了一个异步任务
    每一个任务去不同的商家拉取数据,什么时候拉取完了就放在一个Future中,但是要求三个商家都拉取数据完了才给最后的展示
    你可以使用allOf方法相当于这里面的所有任务完成之后,最后join,你才能继续往下执行。
    除了allOf 方法,还提供了一些lamdba表示的写法

    package com.learn.thread.seven;
    
    import java.io.IOException;
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    
    public class TestCompletableFuture {
        public static void main(String[] args) throws IOException {
            long start = System.currentTimeMillis();
            CompletableFuture<Double> futurem = CompletableFuture.supplyAsync(() -> {
                return priceOfM();
            });
            CompletableFuture<Double> futuret = CompletableFuture.supplyAsync(() -> {
                return priceOfT();
            });
            CompletableFuture<Double> futureb = CompletableFuture.supplyAsync(() -> {
                return priceOfB();
            });
            // 用join
            CompletableFuture.allOf(futureb, futurem, futuret).join();
    
            // 用lamdba
            CompletableFuture.supplyAsync(TestCompletableFuture::priceOfM)
                    .thenApply(String::valueOf)
                    .thenApply(str -> "price" + str)
                    .thenAccept(System.out::println);
            long end = System.currentTimeMillis();
            System.out.println(end - start);
            System.in.read();
        }
        private static double priceOfM() {
            delay();
            System.out.println(1.00);
            return 1.00;
        }
    
        private static double priceOfB() {
            delay();
            System.out.println(2.00);
            return 2.00;
        }
    
        private static double priceOfT() {
            delay();
            System.out.println(3.00);
            return 3.00;
        }
    
        private static void delay() {
            int time = new Random().nextInt(500);
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    3.ThreadPoolExector和ForkJoinPool

    我们来了解线程池,线程池从目前的JDK来说有两种类型,第一种就是普通的线程池ThreadPoolExector,第二种是ForkJoinPool,这两种是不同的类型的线程池,能做的事情也不一样。
    Fork成为分叉,合并称为join,这是ForkJoinPool的一个特性,我们先来讲讲ThreadPoolExector
    ThreadPoolExector 的父类是AbstractExecutorService,而AbstractExecutorService的父类是ExecutorService,ExecutorService的父类是Executor,所以ThreadPoolExector相当于线程池的一个执行器,你可以往这个池子里丢任务。

    阿里开发手册要求线程是自定义的,下面看看如何自定义一个线程池
    线程池有很多构造方法,来看看主要的七个参数
    java多线程开发时,常常用到线程池技术,这篇文章是对创建java线程池时的七个参数的详细解释。

    从源码中可以看出,线程池的构造函数有7个参数,分别是corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。下面会对这7个参数一一解释。

    一、corePoolSize 线程池核心线程大小

    线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize。

    二、maximumPoolSize 线程池最大线程数量

    一个任务被提交到线程池以后,首先会找有没有空闲存活线程,如果有则直接将任务交给这个空闲线程来执行,如果没有则会缓存到工作队列(后面会介绍)中,如果工作队列满了,才会创建一个新线程,然后从工作队列的头部取出一个任务交由新线程来处理,而将刚提交的任务放入工作队列尾部。线程池不会无限制的去创建新线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize指定。

    三、keepAliveTime 空闲线程存活时间

    一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定

    四、unit 空闲线程存活时间单位

    keepAliveTime的计量单位

    五、workQueue 工作队列

    新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中提供了四种工作队列:

    ①ArrayBlockingQueue

    基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。

    ②LinkedBlockingQuene

    基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。

    ③SynchronousQuene

    一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。

    ④PriorityBlockingQueue

    具有优先级的无界阻塞队列,优先级通过参数Comparator实现。

    六、threadFactory 线程工厂

    创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等

    七、handler 拒绝策略

    当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢。这里的拒绝策略,就是解决这个问题的,jdk中提供了4中拒绝策略:

    ①CallerRunsPolicy

    该策略下,在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,则直接抛弃任务。

    ②AbortPolicy

    该策略下,直接丢弃任务,并抛出RejectedExecutionException异常。

    ③DiscardPolicy

    该策略下,直接丢弃任务,什么都不做。

    ④DiscardOldestPolicy

    该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列

    到此,构造线程池时的七个参数,就全部介绍完毕了
    一般来说都是需要保存来的CallerRuns

    package com.learn.thread.seven;
    
    import com.learn.thread.one.T;
    
    import java.io.IOException;
    import java.util.concurrent.*;
    
    public class TestTask {
        static class Task implements Runnable{
            private int i;
            public Task(int i) {
                this.i = i;
            }
    
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "task" + i);
                try {
                    System.in.read();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,4,60, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy());
    
            for (int i = 0; i < 8; i++) {
                poolExecutor.execute(new Task(i));
            }
            System.out.println(poolExecutor.getQueue());
            poolExecutor.execute(new Task(100));
            System.out.println(poolExecutor.getQueue());
            poolExecutor.shutdown();
        }
    }
    

    相关文章

      网友评论

          本文标题:java多线程与高并发(八)线程池

          本文链接:https://www.haomeiwen.com/subject/vxllrltx.html