美文网首页
Java线程池原理以及自定义线程池

Java线程池原理以及自定义线程池

作者: 在下喵星人 | 来源:发表于2019-07-25 23:07 被阅读0次

    当你需要同时限制应用程序中运行的线程数时,线程池非常有用。 启动新线程会产生性能开销,每个线程也会为其堆栈等分配一些内存。

    可以将任务传递给线程池,而不是为每个任务启动并发执行的新线程。 只要线程池有任何空闲线程,任务就会分配给其中一个线程并执行。 在内部,任务被插入到阻塞队列中,池中的线程从该阻塞队列中出队。 当新任务插入队列时,其中一个空闲线程将成功将其出列并执行它。 线程池中的其余空闲线程将被阻塞,等待出列任务。

    从上述所知,一个基本的线程池需要具有

    1. 一个存储线程的容器(容器可以使用队列,链表等数据结构),当有任务时,就从容器中拿出一个线程,来执行任务。
    2. 一个存储任务的阻塞队列。(阻塞队列可以控制任务提交的最大数)
    3. 线程池对外暴露一个execute(Runnable task)方法,用以外界向线程池中提交任务。

    自定义阻塞队列

    import java.util.LinkedList;
    import java.util.List;
    
    public class BlockingQueue<T> {
    
        /**
         *     使用链表实现一个阻塞队列(数据结构定义数据存储和获取方式,所以只要满足这两点,阻塞队列可以用链表,也可以使用数组等来实现)
         */
        private List<T> queue = new LinkedList();
        /**
         * limit用来限制提交任务的最大数,默认10
         */
        private int limit = 10;
    
        public BlockingQueue(int limit) {
            this.limit = limit;
        }
    
        /**
         *
         * @param item
         *
         *  enqueue是一个同步方法,当任务到达上限,便会调用wait方法进行阻塞,否则将任务放入队列中,并唤醒dequeue()任务线程
         */
        public synchronized void enqueue(T item){
            while (this.queue.size() == this.limit) {
                this.wait();
            }
            if (this.queue.size() <= limit) {
                this.notifyAll();
            }
            this.queue.add(item);
        }
    
    
        /**
         *
         * @return
         *
         *     dequeue也是一个同步方法,当队列中没有任务时便会调用wait方法进入阻塞,当任务到达最大容量是唤醒其他dequeue()线程
         *     ,并出列一个任务。
         */
        public synchronized T dequeue() {
            while (this.queue.size() == 0) {
                this.wait();
            }
            if (this.queue.size() == this.limit) {
                this.notifyAll();
            }
    
            return this.queue.remove(0);
        }
    
     public synchronized int size(){
            return queue.size();
        }
    }
    
    
    
    

    新建一个线程池线程类,用来执行提交的任务。结构体中传入任务队列,run()方中发现taskQueue有任务时,获取任务并执行,没有任务就阻塞。

    public class PoolThread extends Thread {
    
    
        private  BlockingQueue taskQueue = null;
    
        private boolean isStopped = false;
    
        public PoolThread(BlockingQueue taskQueue) {
            this.taskQueue = taskQueue;
        }
    
        public void run(){
            while(!isStopped() && !Thread.currentThread().isInterrupted()){
                try{
                    //从任务队列获取任务并执行
                    Runnable runnable = (Runnable) taskQueue.dequeue();
                    runnable.run();
                } catch(Exception e){
                    isStopped = true;
                    break;
                }
            }
        }
    
        public synchronized void doStop(){
            isStopped = true;
            this.interrupt();
        }
    
        public synchronized boolean isStopped(){
            return isStopped;
        }
    }
    

    新建线程池类

    public interface Service {
    
        //关闭线程池
        void shutdown();
    
        //查看线程池是否已经被shutdown
        boolean isShutdown();
    
      //提交任务到线程池
        void execute(Runnable runnable);
    }
    
    import java.util.ArrayDeque;
    import java.util.Queue;
    
    public class ThreadPool  implements Service {
    
        /**
         * 任务队列,用来存储提交的任务
         */
        private BlockingQueue<Runnable> taskQueue = null;
    
        /**
         * 线程池中存储线程的容器。
         */
        private Queue<PoolThread> threads = new ArrayDeque<PoolThread>();
        
        private boolean isShutdown = false;
        
        public ThreadPool(int initSize, int maxNoOfTasks){
            taskQueue = new BlockingQueue<Runnable>(maxNoOfTasks);
    
            //初始化线程池
            for (int i = 0; i < initSize; i++) {
                threads.add(new PoolThread(taskQueue));
            }
            
            //启动线程池线程
            threads.forEach(thread -> thread.start());
        }
        
         @Override
        public synchronized void execute(Runnable task)  {
            if (this.isStopped){
                throw new IllegalStateException("ThreadPool is stopped");
            }
            //任务入列
            taskQueue.enqueue(task);
        }
        
        @Override
        public synchronized void shutdown(){
            this.isShutdown= true;
            threads.forEach(thread -> thread.doStop());
        }
    
        @Override
        public boolean isShutdown() {
            return isShutdown;
        }
    }
    

    至此,一个简单的线程池便完成。新建一个线程池测试类

    import java.util.concurrent.TimeUnit;
    
    public class ThreadPoolTest {
    
        public static void main(String[] args) throws InterruptedException {
    
            final ThreadPool threadPool = new ThreadPool(5 , 20);
    
            //定义20个任务并且提交到线程池
            for (int i = 0; i < 20; i++) {
                threadPool.execute(() ->{
                    try {
                        TimeUnit.SECONDS.sleep(10);
                        System.out.println(Thread.currentThread().getName() + " is running add done");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
    
            while (true){
                System.out.println("---------------------------------");
                TimeUnit.SECONDS.sleep(5);
            }
        }
    }
    

    打印每次输出5条记录,共输出4次

    ---------------------------------
    ---------------------------------
    Thread-3 is running add done
    Thread-1 is running add done
    Thread-0 is running add done
    Thread-4 is running add done
    Thread-2 is running add done
    ---------------------------------
    ---------------------------------
    Thread-2 is running add done
    Thread-4 is running add done
    Thread-0 is running add done
    Thread-1 is running add done
    Thread-3 is running add done
    ---------------------------------
    ---------------------------------
    Thread-0 is running add done
    Thread-1 is running add done
    Thread-3 is running add done
    Thread-2 is running add done
    Thread-4 is running add done
    ---------------------------------
    ---------------------------------
    Thread-2 is running add done
    Thread-1 is running add done
    Thread-3 is running add done
    Thread-0 is running add done
    Thread-4 is running add done
    ---------------------------------
    

    当执行完任务后,使用visualvm工具或jstack命令获取线程快照,可以看到有5个线程池中的线程

    "Thread-4" #16 prio=5 os_prio=0 tid=0x00000000207b0000 nid=0x2b7c in Object.wait() [0x000000002141e000]
       java.lang.Thread.State: WAITING (on object monitor)
            at java.lang.Object.wait(Native Method)
            - waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
            at java.lang.Object.wait(Object.java:502)
            at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
            - locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
            at com.customthreadpool.PoolThread.run(PoolThread.java:18)
    
       Locked ownable synchronizers:
            - None
    
    "Thread-3" #15 prio=5 os_prio=0 tid=0x00000000207ad000 nid=0x56d0 in Object.wait() [0x000000002131f000]
       java.lang.Thread.State: WAITING (on object monitor)
            at java.lang.Object.wait(Native Method)
            - waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
            at java.lang.Object.wait(Object.java:502)
            at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
            - locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
            at com.customthreadpool.PoolThread.run(PoolThread.java:18)
    
       Locked ownable synchronizers:
            - None
    
    "Thread-2" #14 prio=5 os_prio=0 tid=0x00000000207ab800 nid=0x4cbc in Object.wait() [0x000000002121f000]
       java.lang.Thread.State: WAITING (on object monitor)
            at java.lang.Object.wait(Native Method)
            - waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
            at java.lang.Object.wait(Object.java:502)
            at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
            - locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
            at com.customthreadpool.PoolThread.run(PoolThread.java:18)
    
       Locked ownable synchronizers:
            - None
    
    "Thread-1" #13 prio=5 os_prio=0 tid=0x00000000207a9800 nid=0x3670 in Object.wait() [0x000000002111f000]
       java.lang.Thread.State: WAITING (on object monitor)
            at java.lang.Object.wait(Native Method)
            - waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
            at java.lang.Object.wait(Object.java:502)
            at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
            - locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
            at com.customthreadpool.PoolThread.run(PoolThread.java:18)
    
       Locked ownable synchronizers:
            - None
    
    "Thread-0" #12 prio=5 os_prio=0 tid=0x00000000207a9000 nid=0x4d84 in Object.wait() [0x000000002101f000]
       java.lang.Thread.State: WAITING (on object monitor)
            at java.lang.Object.wait(Native Method)
            - waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
            at java.lang.Object.wait(Object.java:502)
            at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
            - locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
            at com.customthreadpool.PoolThread.run(PoolThread.java:18)
    
    

    从线程快照可以看到,线程池的线程名称使用系统默认名称,但在实际编码中通常都会按我们规范定义系统名称,所以我们使用工厂模式对线程的创建进行重构。

    使用工厂模式有一下好处

    1. 对象的创建如果比较复杂,需要经过一系列的初始化。使用工厂模式,可以屏蔽这过程。
    2. 把同一类事物归于一个框架之下。比如A和B,他们需要自己定义线程池线程创建,但规定他们都要实现工厂接口,便可以把他们控制在同一框架之下。
    3. 解耦。(只要是不直接创建目标对象,基本上都可以叫解耦或者对修改关闭对扩展开放)

    新建线程工厂接口

    @FunctionalInterface
    public interface ThreadFactory {
        Thread createThread(Runnable runnable);
    }
    

    重构后的线程池类如下:

    import java.util.ArrayDeque;
    import java.util.Queue;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class ThreadPool  implements Service {
    
        /**
         * 任务队列,用来存储提交的任务
         */
        private BlockingQueue<Runnable> taskQueue = null;
    
        /**
         * 线程池中存储线程的容器。
         */
        private Queue<ThreadTask> threads = new ArrayDeque<ThreadTask>();
    
    
        /**
         * 默认线程工厂
         */
        private static final ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();
    
        private boolean isShutdown = false;
    
    
        public ThreadPool(int initSize, int maxNoOfTasks){
            taskQueue = new BlockingQueue<Runnable>(maxNoOfTasks);
    
            //初始化线程池
            for (int i = 0; i < initSize; i++) {
                newThread();
            }
        }
    
        private void newThread(){
            PoolThread poolThread = new PoolThread(taskQueue);
            Thread thread = DEFAULT_THREAD_FACTORY.createThread(poolThread);
            ThreadTask threadTask = new ThreadTask(thread , poolThread);
            threads.add(threadTask);
            thread.start();
        }
        /**
         * 工厂模式屏蔽对象创建的过程
         */
        private static class DefaultThreadFactory implements ThreadFactory{
    
            private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);
    
            private static final ThreadGroup group = new ThreadGroup("customThreadPool-" + GROUP_COUNTER.getAndDecrement());
    
            private static final AtomicInteger COUNTER = new AtomicInteger(0);
    
            @Override
            public Thread createThread(Runnable runnable) {
                return new Thread(group , runnable , "thread-pool-" + COUNTER.getAndDecrement());
            }
        }
    
        /**
         * ThreadTask 只是PoolThread和Thread的组合,因为后面关闭线程还需要用到poolThread的doStop方法
         */
        private static class ThreadTask{
    
            Thread thread;
            PoolThread poolThread;
    
            public ThreadTask(Thread thread , PoolThread poolThread){
                this.thread = thread;
                this.poolThread = poolThread;
            }
        }
    
         @Override
        public synchronized void execute(Runnable task) {
            if (this.isShutdown){
                throw new IllegalStateException("ThreadPool is stopped");
            }
            //任务入列
            taskQueue.enqueue(task);
        }
    
      @Override
        public synchronized void shutdown(){
            this.isShutdown = true;
            threads.forEach(threadTask -> threadTask.poolThread.doStop());
        }
    
        @Override
        public boolean isShutdown() {
            return isShutdown;
        }
    }
    

    运行测试类,结果如下图所示


    image.png

    dump文件如下所示


    image.png

    到目前为如果线程任务队列到达上限,便会调用wait方法进行阻塞,我们可以自定义拒接策略,使处理更灵活。

    public interface DenyPolicy<T> {
    
        void reject(T runnable, ThreadPool threadPool);
    
        //该拒接策略会直接将任务丢弃
        class DiscardDenyPolicy implements DenyPolicy<Runnable>{
    
            @Override
            public void reject(Runnable runnable, ThreadPool threadPool) {
                System.out.println(runnable + "do nothing");
            }
        }
    
        //该拒绝策略会向任务提交者抛出异常
        class AbortDenyPolicy implements DenyPolicy<Runnable>{
    
            @Override
            public void reject(Runnable runnable, ThreadPool threadPool) {
                throw new RunnbaleDenyException("The runnbale " + runnable + " will be abort.");
            }
        }
    
        //该拒绝策略会使用任务在提交者所在的线程中执行任务
        class RunnerDenyPolicy implements DenyPolicy<Runnable>{
    
            @Override
            public void reject(Runnable runnable, ThreadPool threadPool) {
                if (!threadPool.isShutdown()){
                    runnable.run();
                }
            }
        }
    }
    
    public class RunnbaleDenyException extends RuntimeException {
    
        public RunnbaleDenyException(String message) {
            super(message);
        }
    }
    
    
    • reject 为拒接方法
    • DiscardDenyPolicy 策略会直接丢弃掉Runnable任务。
    • AbortDenyPolicy 策略会抛出RunnbaleDenyException异常。
    • RunnerDenyPolicy 策略,交给调用者的线程直接运行runnable,而不会被加入到线程池中。

    重构阻塞队列,当队列中的值超出最大容量时使用拒接策略。

    重构后的阻塞队列

    import java.util.LinkedList;
    import java.util.List;
    
    public class BlockingQueue<T> {
    
        /**
         *     使用链表实现一个阻塞队列(数据结构定义数据存储和获取方式,所以只要满足这两点,阻塞队列可以用链表,也可以使用数组等来实现)
         */
        private List<T> queue = new LinkedList();
        /**
         * limit用来限制提交任务的最大数,默认10
         */
        private int limit = 10;
    
    
        /**
         * 拒接策略
         */
        private DenyPolicy denyPolicy;
    
        private ThreadPool threadPool;
    
    
        public BlockingQueue(int limit , DenyPolicy denyPolicy , ThreadPool threadPool) {
            this.limit = limit;
            this.denyPolicy = denyPolicy;
            this.threadPool = threadPool;
        }
    
        /**
         *
         * @param item
         *  enqueue是一个同步方法,当任务到达上限,便会调用wait方法进行阻塞,否则将任务放入队列中,并唤醒dequeue()任务线程
         */
        public synchronized void enqueue(T item) {
            //若果队列到达最大容量,调用拒接策略
            if (this.queue.size() >= this.limit) {
                denyPolicy.reject(item , threadPool);
            }
            if (this.queue.size() <= limit) {
                this.notifyAll();
            }
            this.queue.add(item);
        }
    
    
        /**
         *
         * @return
         *
         *     dequeue也是一个同步方法,当队列中没有任务时便会调用wait方法进入阻塞,当任务到达最大容量是唤醒其他dequeue()线程
         *     ,并出列一个任务。
         */
        public synchronized T dequeue(){
            while (this.queue.size() == 0) {
                this.wait();
            }
            if (this.queue.size() == this.limit) {
                this.notifyAll();
            }
            return this.queue.remove(0);
        }
    
     public synchronized int size(){
            return queue.size();
        }
    }
    
    

    线程池类修改如下两点ThreadPool.class

    ...
    public class ThreadPool implements Service{
     /**
         * 默认使用丢弃策略
         */
        private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();
    
        public ThreadPool(int noOfThreads , int maxNoOfTasks){
            taskQueue = new BlockingQueue<Runnable>(maxNoOfTasks , DEFAULT_DENY_POLICY , this);
    
            //初始化线程池
            for (int i = 0; i < noOfThreads; i++) {
                newThread();
            }
        }
    }
    ...
    

    运行测试类测试类,可以看到当任务到达最大容量时,就会有任务被抛弃


    image.png

    目前初始化线程池时,只指定了初始线程数init,并不能很好的管理线程池线程数量。继续对线程池进行扩展。

    • 新增两个控制线程池线程数量的参数。线程池自动扩充时最大的线程池数量max,线程池空闲时需要释放线程但是也要维护一定数量的活跃线程数量或者核心数量core。有了这init , max , core三个参数就能很好的控制线程池中线程数量,三者之间的关系init <= core <= max。
    • 新增参数Keepedalive时间,该时间主要决定线程各个重要参数自动维护的时间间隔。

    重构后的线程池类

    import java.util.ArrayDeque;
    import java.util.Queue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class ThreadPool implements Service{
    
        /**
         * 初始化线程数量
         */
        private final int initSize;
    
        /**
         *   线程池最大线程数量
         */
        private final int maxSzie;
    
        /**
         *     线程池核心线程数量
         */
        private final int coreSize;
    
        /**
         *   当前活跃的线程数量
         */
        private int activeCount;
    
        private final long keepAliveTime;
    
        private final TimeUnit timeUnit;
    
        private  InternalTask internalTask;
    
        /**
         *     创建线程所需的工厂
         */
        private final ThreadFactory threadFactory;
    
    
        /**
         * 任务队列,用来存储提交的任务
         */
        private BlockingQueue<Runnable> taskQueue = null;
    
        /**
         * 线程池中存储线程的容器。
         */
        private Queue<ThreadTask> threads = new ArrayDeque<ThreadTask>();
    
    
        /**
         * 默认线程工厂
         */
        private static final ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();
    
        private boolean isShutdown = false;
    
        /**
         * 默认使用丢弃策略
         */
        private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();
    
    
        public ThreadPool(int initSize , int maxSize , int coreSize , int maxNoOfTasks){
            this(initSize , maxSize , coreSize , DEFAULT_THREAD_FACTORY , maxNoOfTasks , DEFAULT_DENY_POLICY , 10 , TimeUnit.SECONDS);
        }
    
        public ThreadPool(int initSize , int maxSize , int coreSize , ThreadFactory threadFactory , int maxNoOfTasks
                          , DenyPolicy<Runnable> denyPolicy , long keepAliveTime , TimeUnit timeUnit){
    
            this.initSize = initSize;
            this.maxSzie = maxSize;
            this.coreSize = coreSize;
            this.threadFactory = threadFactory;
            this.taskQueue = new  BlockingQueue<Runnable>(maxNoOfTasks , DEFAULT_DENY_POLICY , this);
            this.keepAliveTime = keepAliveTime;
            this.timeUnit = timeUnit;
    
            init();
    
        }
    
    
    
        private void init(){
            //初始化线程池
            for (int i = 0; i < initSize; i++) {
                newThread();
            }
    
            //启动内部维护线程
            internalTask =  new InternalTask();
            internalTask.start();
        }
    
        private void newThread(){
            PoolThread poolThread = new PoolThread(taskQueue);
            Thread thread = DEFAULT_THREAD_FACTORY.createThread(poolThread);
            ThreadTask threadTask = new ThreadTask(thread , poolThread);
            activeCount++;
            threads.add(threadTask);
            thread.start();
        }
    
        private void removeThread(){
            //从线程池中移除某个线程
            ThreadTask threadTask = threads.remove();
            threadTask.poolThread.stop();
            this.activeCount--;
        }
        /**
         * 工厂模式屏蔽对象创建的过程
         */
        private static class DefaultThreadFactory implements ThreadFactory{
    
            private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);
    
            private static final ThreadGroup group = new ThreadGroup("customThreadPool-" + GROUP_COUNTER.getAndDecrement());
    
            private static final AtomicInteger COUNTER = new AtomicInteger(0);
    
            @Override
            public Thread createThread(Runnable runnable) {
                return new Thread(group , runnable , "thread-pool-" + COUNTER.getAndDecrement());
            }
        }
    
        /**
         * ThreadTask 只是PoolThread和Thread的组合,因为后面关闭线程还需要用到poolThread的doStop方法
         */
        private static class ThreadTask{
    
            Thread thread;
            PoolThread poolThread;
    
            public ThreadTask(Thread thread , PoolThread poolThread){
                this.thread = thread;
                this.poolThread = poolThread;
            }
        }
    
    
        @Override
        public synchronized void execute(Runnable task)  {
            if (this.isShutdown){
                throw new IllegalStateException("ThreadPool is stopped");
            }
            //任务入列
            taskQueue.enqueue(task);
        }
    
        @Override
        public int getInitSize() {
            if (isShutdown){
                throw new IllegalStateException("The thread pool is destory");
            }
            return this.initSize;
        }
    
        @Override
        public int getMaxSize() {
            if (isShutdown){
                throw new IllegalStateException("The thread pool is destory");
            }
            return this.maxSzie;
        }
    
        @Override
        public int getCoreSize() {
            if (isShutdown){
                throw new IllegalStateException("The thread pool is destory");
            }
            return this.coreSize;
        }
    
        @Override
        public int getQueueSize() {
            if (isShutdown){
                throw new IllegalStateException("The thread pool is destory");
            }
            return taskQueue.size();
        }
    
        @Override
        public int getActiveCount() {
            synchronized (this){
                return this.activeCount;
            }
        }
    
        @Override
        public synchronized void shutdown(){
            this.isShutdown = true;
            threads.forEach(threadTask -> threadTask.poolThread.doStop());
            internalTask.interrupt();
        }
    
        @Override
        public boolean isShutdown() {
            return isShutdown;
        }
    
    
    
        class InternalTask extends Thread{
            @Override
            public void run() {
                //run方法继承自Thread,主要用于维护线程数量,比如扩容,回收等工作
                while (!isShutdown&&!isInterrupted()){
                    try {
                        timeUnit.sleep(keepAliveTime);
                    } catch (InterruptedException e) {
                        isShutdown = true;
                        break;
                    }
                    synchronized (ThreadPool.this){
                        if (isShutdown){
                            break;
                        }
                        //当前队列中任务尚未处理,并且activeCount< coreSize则继续扩容
                        if (taskQueue.size() > 0 && activeCount <coreSize){
                            for (int i = initSize; i < coreSize ; i++){
                                newThread();
                            }
                            //continue的目的在于不想让线程的扩容直接打到maxsize
                            continue;
                        }
    
                        //当前的队列中有任务尚未处理,并且activeCount < maxSize则继续扩容
                        if (taskQueue.size() > 0 && activeCount < maxSzie){
                            for (int i = coreSize; i < maxSzie ; i++){
                                newThread();
                            }
                        }
    
                        //如果任务队列中没有任务,则需要回收,回收至coreSize即可
                        if (taskQueue.size() == 0 && activeCount > coreSize ){
                            for (int i = coreSize ; i < activeCount ; i++){
                                removeThread();
                            }
                        }
                    }
                }
            }
        }
    }
    

    线程池类中主要新增了如下参数

      /**
         * 初始化线程数量
         */
        private final int initSize;
    
        /**
         *   线程池最大线程数量
         */
        private final int maxSzie;
    
        /**
         *     线程池核心线程数量
         */
        private final int coreSize;
    
        /**
         *   当前活跃的线程数量
         */
        private int activeCount;
    
        private final long keepAliveTime;
    
        private final TimeUnit timeUnit;
    
        /**
         *     创建线程所需的工厂
         */
        private final ThreadFactory threadFactory;
    
       private  InternalTask internalTask;
    
    

    重写了两个构造函数

     public ThreadPool(int initSize , int maxSize , int coreSize , int maxNoOfTasks){
            this(initSize , maxSize , coreSize , DEFAULT_THREAD_FACTORY , maxNoOfTasks , DEFAULT_DENY_POLICY , 10 , TimeUnit.SECONDS);
        }
    
        public ThreadPool(int initSize , int maxSize , int coreSize , ThreadFactory threadFactory , int maxNoOfTasks
                          , DenyPolicy<Runnable> denyPolicy , long keepAliveTime , TimeUnit timeUnit){
    
            this.initSize = initSize;
            this.maxSzie = maxSize;
            this.coreSize = coreSize;
            this.threadFactory = threadFactory;
            this.taskQueue = new  BlockingQueue<Runnable>(maxNoOfTasks , DEFAULT_DENY_POLICY , this);
            this.keepAliveTime = keepAliveTime;
            this.timeUnit = timeUnit;
    
            init();
           
        }
    

    新增一个线程类,用于维护内部状态

     class InternalTask extends Thread{
            @Override
            public void run() {
                //run方法继承自Thread,主要用于维护线程数量,比如扩容,回收等工作
                while (!isShutdown&&!isInterrupted()){
                    try {
                        timeUnit.sleep(keepAliveTime);
                    } catch (InterruptedException e) {
                        isShutdown = true;
                        break;
                    }
                    synchronized (ThreadPool.this){
                        if (isShutdown){
                            break;
                        }
                        //当前队列中任务尚未处理,并且activeCount< coreSize则继续扩容
                        if (taskQueue.size() > 0 && activeCount <coreSize){
                            for (int i = initSize; i < coreSize ; i++){
                                newThread();
                            }
                            //continue的目的在于不想让线程的扩容直接打到maxsize
                            continue;
                        }
    
                        //当前的队列中有任务尚未处理,并且activeCount < maxSize则继续扩容
                        if (taskQueue.size() > 0 && activeCount < maxSzie){
                            for (int i = coreSize; i < maxSzie ; i++){
                                newThread();
                            }
                        }
    
                        //如果任务队列中没有任务,则需要回收,回收至coreSize即可
                        if (taskQueue.size() == 0 && activeCount > coreSize ){
                            for (int i = coreSize ; i < activeCount ; i++){
                                removeThread();
                            }
                        }
                    }
                }
            }
        }
    

    以及一系列辅助方法

    public interface Service {
    .....
      //获取线程池的初始化大小
        int getInitSize();
    
        //获取线程池最大的线程数
        int getMaxSize();
    
        //获取线程池核心线程梳理
        int getCoreSize();
    
        //获取线程池中活跃线程的数量大小
        int getQueueSize();
    
        //获取线程池中用于缓存任务队列的大小
        int getActiveCount();
    .....
    }
    
    @Override
        public int getInitSize() {
            if (isShutdown){
                throw new IllegalStateException("The thread pool is destory");
            }
            return this.initSize;
        }
    
        @Override
        public int getMaxSize() {
            if (isShutdown){
                throw new IllegalStateException("The thread pool is destory");
            }
            return this.maxSzie;
        }
    
        @Override
        public int getCoreSize() {
            if (isShutdown){
                throw new IllegalStateException("The thread pool is destory");
            }
            return this.coreSize;
        }
    
        @Override
        public int getQueueSize() {
            if (isShutdown){
                throw new IllegalStateException("The thread pool is destory");
            }
            return taskQueue.size();
        }
    
        @Override
        public int getActiveCount() {
            synchronized (this){
                return this.activeCount;
            }
        }
    

    执行测试类

    import java.util.concurrent.TimeUnit;
    
    public class ThreadPoolTest {
    
        public static void main(String[] args) throws InterruptedException {
    
            final ThreadPool threadPool = new ThreadPool(2 , 6 , 4 , 1000);
    
            //定义20个任务并且提交到线程池
            for (int i = 0; i < 20; i++) {
                threadPool.execute(() ->{
                    try {
                        TimeUnit.SECONDS.sleep(10);
                        System.out.println(Thread.currentThread().getName() + " is running add done");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
    
            while (true){
                System.out.println("getActiveCount: " + threadPool.getActiveCount());
                System.out.println("getQueueSize: " + threadPool.getQueueSize());
                System.out.println("getCoreSize: " + threadPool.getCoreSize());
                System.out.println("getMaxSize: "+ threadPool.getMaxSize());
                System.out.println("======================================");
                TimeUnit.SECONDS.sleep(5);
            }
        }
    }
    

    会有如下输出,activeCount数量会增长到与maxSize一直,最后会保持与coreSize相等

    getActiveCount: 2
    getQueueSize: 18
    getCoreSize: 4
    getMaxSize: 6
    ======================================
    getActiveCount: 2
    getQueueSize: 18
    getCoreSize: 4
    getMaxSize: 6
    ======================================
    thread-pool--1 is running add done
    thread-pool-0 is running add done
    getActiveCount: 4
    getQueueSize: 14
    getCoreSize: 4
    getMaxSize: 6
    ======================================
    getActiveCount: 4
    getQueueSize: 14
    getCoreSize: 4
    getMaxSize: 6
    ======================================
    thread-pool--2 is running add done
    thread-pool--3 is running add done
    thread-pool--1 is running add done
    thread-pool-0 is running add done
    getActiveCount: 6
    getQueueSize: 8
    getCoreSize: 4
    getMaxSize: 6
    ======================================
    getActiveCount: 6
    getQueueSize: 8
    getCoreSize: 4
    getMaxSize: 6
    ======================================
    thread-pool--2 is running add done
    thread-pool--4 is running add done
    thread-pool--3 is running add done
    thread-pool--5 is running add done
    thread-pool--1 is running add done
    thread-pool-0 is running add done
    getActiveCount: 6
    getQueueSize: 2
    getCoreSize: 4
    getMaxSize: 6
    ======================================
    getActiveCount: 6
    getQueueSize: 2
    getCoreSize: 4
    getMaxSize: 6
    ======================================
    thread-pool--2 is running add done
    thread-pool--3 is running add done
    thread-pool--4 is running add done
    thread-pool--5 is running add done
    thread-pool-0 is running add done
    thread-pool--1 is running add done
    getActiveCount: 6
    getQueueSize: 0
    getCoreSize: 4
    getMaxSize: 6
    ======================================
    getActiveCount: 6
    getQueueSize: 0
    getCoreSize: 4
    getMaxSize: 6
    ======================================
    thread-pool--2 is running add done
    thread-pool--3 is running add done
    getActiveCount: 5
    getQueueSize: 0
    getCoreSize: 4
    getMaxSize: 6
    ======================================
    getActiveCount: 5
    getQueueSize: 0
    getCoreSize: 4
    getMaxSize: 6
    ======================================
    getActiveCount: 4
    getQueueSize: 0
    getCoreSize: 4
    getMaxSize: 6
    ======================================
    getActiveCount: 4
    getQueueSize: 0
    getCoreSize: 4
    getMaxSize: 6
    ======================================
    

    到这里,一个功能比较完善的线程池就已经完成了
    代码地址: github

    参考

    相关文章

      网友评论

          本文标题:Java线程池原理以及自定义线程池

          本文链接:https://www.haomeiwen.com/subject/thbilctx.html