美文网首页程序猿阵线联盟-汇总各类技术干货
为了更好的理解ExecutorService,实现一个简单线程池

为了更好的理解ExecutorService,实现一个简单线程池

作者: TTRR | 来源:发表于2019-05-24 21:40 被阅读0次

    一个简单的线程池,应该具备以下能力:
    1.能够有效的管理工作线程数量。(可以通过4个参数来管理,初始化线程数,最大线程数,核心线程数,维护工作线程的时间间隔)
    2.能够管理提交的任务。(有一个队列来管理,已提交的任务,且当缓存的任务数量达到定义的上限时,应该设定一些拒绝策略告知调用者)

    所以定义以下接口:
    线程池接口定义

    public interface ThreadPool {
    
        /**
         * 提交任务到线程池
         * @param runnable
         */
        void execute(Runnable runnable);
    
        /**
         * 关闭线程池
         */
        void shutDown();
    
        /**
         * 线程池是否被关闭
         * @return
         */
        boolean isShutDown();
    
        /**
         * 得到初始化线程池数量
         * @return
         */
        int getInitSize();
    
        /**
         * 得到最大线程池数量
         * @return
         */
        int getMaxSize();
    
        /**
         * 得到核心线程池数量
         * @return
         */
        int getCoreSize();
    
        /**
         * 得到活跃线程池数量
         * @return
         */
        int getActiveSize();
    
        /**
         * 获取任务缓冲队列大小
         * @return
         */
        int getQueueListSize();
    }
    

    任务队列定义

    package threadPool;
    
    /**
     * 任务队列,应该有限制大小的参数
     *
     * 提供offer方法,用于提交runnable任务
     *
     * 提供take 方法,用于取runnable任务
     *
     */
    public interface RunnableQueue {
    
        /**
         * 提交任务,提交的任务首先会进入缓冲队列
         * @param runnable
         */
        void offer(Runnable runnable);
    
        /**
         * 取任务,从队列中取
         * @return
         */
        Runnable take() throws InterruptedException;
    
        /**
         * 获取缓冲队列大小
         * @return
         */
        int getQueueSize();
    }
    
    

    创建线程的工厂定义:

    package threadPool;
    
    public interface ThreadFactory {
        Thread createThread(Runnable runnable);
    }
    
    

    拒绝策略接口定义

    package threadPool;
    
    public interface DenyPolicy {
        void reject(Runnable runnable, ThreadPool threadPool);
    
        //该拒绝策略直接将任务抛弃,就不管
        class DiscarDenyPolicy implements DenyPolicy{
            @Override
            public void reject(Runnable runnable, ThreadPool threadPool) {
                // nothing doing something
            }
        }
    
        //该拒绝策略,当任务满时,抛出异常
        class AbortDenyPoliy implements DenyPolicy{
            @Override
            public void reject(Runnable runnable, ThreadPool threadPool) {
                throw new RuntimeException("this runnable "+runnable+"will be abort");
            }
        }
    
        //该拒绝策略,被抛弃的任务就在当前线程中运行
        class RunnerDenyPoliy implements DenyPolicy{
            @Override
            public void reject(Runnable runnable, ThreadPool threadPool) {
                if (!threadPool.isShutDown()){
                    runnable.run();
                }
            }
        }
    }
    
    

    以下是接口的简单实现
    任务队列的实现

    package threadPool;
    
    import java.util.LinkedList;
    
    public class LinkedRunnableQueue implements RunnableQueue{
    
        private int limitQueueSize;
    
        private DenyPolicy denyPolicy;
    
        private ThreadPool threadPool;
    
        private LinkedList<Runnable> taskList = new LinkedList<Runnable>();
    
        public LinkedRunnableQueue(int limitQueueSize,DenyPolicy denyPolicy,ThreadPool threadPool){
            this.denyPolicy = denyPolicy;
            this.limitQueueSize = limitQueueSize;
            this.threadPool = threadPool;
        }
    
        @Override
        public void offer(Runnable runnable) {
            synchronized (taskList){
                int size = taskList.size();
                if (size >= limitQueueSize){
                    //当队列满了,就拒绝
                    denyPolicy.reject(runnable,threadPool);
                }else {
                    //加入队尾
                    taskList.addLast(runnable);
                    //唤醒等待的线程
                    taskList.notifyAll();
                }
            }
        }
    
        @Override
        public Runnable take() throws InterruptedException{
            synchronized (taskList){
                //若队列为空,则让线程等待
                while (taskList.isEmpty()){
                    try {
                        taskList.wait();
                    } catch (InterruptedException e) {
                        throw e;
                    }
                }
                return taskList.removeFirst();
            }
        }
    
        @Override
        public int getQueueSize() {
            return taskList.size();
        }
    }
    
    

    线程工厂的简单实现

    package threadPool;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class DefaultThreadFactory implements ThreadFactory{
    
        private static AtomicInteger GROUP_COUNTER = new AtomicInteger(1);
    
        private static ThreadGroup threadGroup = new ThreadGroup("MyThread-Pool-"+GROUP_COUNTER.getAndDecrement());
    
        private static AtomicInteger COUNTER = new AtomicInteger(0);
    
        @Override
        public Thread createThread(Runnable runnable) {
            return new Thread(threadGroup,runnable,threadGroup.getName()+"-thread-"+COUNTER.getAndDecrement());
        }
    }
    
    

    注入到线程池中的线程需要实现的逻辑单元,即Runnable的实现

    package threadPool;
    
    public class InternalTask implements Runnable{
    
        private RunnableQueue runnableQueue;
    
        private volatile boolean running = true;
    
        public InternalTask(RunnableQueue runnableQueue){
            this.runnableQueue = runnableQueue;
        }
    
        //当前任务为running 且没有被中断,则不断的从队列中取runnable 并执行
        @Override
        public void run() {
            while (running && !Thread.currentThread().isInterrupted()){
                try {
                    Runnable take = runnableQueue.take();
                    take.run();
                } catch (Exception e) {
                    running = false;
                    e.printStackTrace();
                }
            }
        }
    
        //停止当前任务
        public void stop(){
            this.running = false;
        }
    }
    
    

    线程池的实现:

    package threadPool;
    
    import java.util.ArrayDeque;
    import java.util.Queue;
    import java.util.concurrent.TimeUnit;
    
    public class BasicThreadPool implements ThreadPool{
    
        //初始化线程数
        private int initSize;
    
        //最大线程数
        private int maxSize;
    
        //核心线程数
        private int coreSize;
    
        //当前活跃的线程数量
        private int currentActiveSize;
    
        //维护线程数量的时间间隔,秒级
        private long keepAliveTime;
    
        private TimeUnit timeUnit;
    
        private ThreadFactory threadFactory;
    
        private Thread thread;
    
        //当前线程池的状态,是否被关闭
        private volatile boolean isShutDown = false;
    
        //任务队列
        private RunnableQueue runnableQueue;
    
        //工作线程队列
        private Queue<ThreadTask> threadTasks = new ArrayDeque<ThreadTask>();
    
        public BasicThreadPool(int initSize,int maxSize,int coreSize,int taskQueueLimit){
            this(initSize,maxSize,coreSize,10,TimeUnit.SECONDS,new DefaultThreadFactory(),
                    new DenyPolicy.DiscarDenyPolicy(),taskQueueLimit);
        }
    
        public BasicThreadPool (int initSize,int maxSize,int coreSizeSize,
                                int keepAliveTime,TimeUnit timeUnit,
                                ThreadFactory threadFactory,DenyPolicy denyPolicy,
                                int taskQueueLimit){
            this.initSize = initSize;
            this.maxSize = maxSize;
            this.coreSize = coreSizeSize;
            this.keepAliveTime = keepAliveTime;
            this.timeUnit = timeUnit;
            this.threadFactory = threadFactory;
            if (this.threadFactory == null)
                this.threadFactory = new DefaultThreadFactory();
    
            this.runnableQueue = new LinkedRunnableQueue(taskQueueLimit,denyPolicy,this);
    
            //调用初始化方法
            this.init();
        }
    
        private void init(){
            //TODO 初始化线程池
            //启动线程维护
            this.thread = new Thread(()->{
                //在线程池没有被中断,且未被打断的情况下维护
                while (!isShutDown && !Thread.currentThread().isInterrupted()){
                    try {
                        timeUnit.sleep(keepAliveTime);
                    } catch (InterruptedException e) {
                        isShutDown = true; //线程被中断
                        break;
                    }
                    synchronized (this){
                        if (isShutDown)
                            break;
    
                        System.out.println("维护线程数量");
                        //当前任务队列还有需要处理的任务,且 活跃的线程数 小于 核心线程数,则继续扩容
                        if (runnableQueue.getQueueSize() > 0 && currentActiveSize < coreSize){
                            for (int i=initSize;i<coreSize;i++){
                                newThread();
                            }
                        }
    
    
                        if (runnableQueue.getQueueSize() > 0 && currentActiveSize < maxSize){
                            for (int i=coreSize;i<maxSize;i++){
                                newThread();
                            }
                        }
    
                        //当前没有在等待的任务,且活跃的线程数,大于核心线程数,则释放任务线程
                        if (runnableQueue.getQueueSize() == 0 && currentActiveSize > coreSize){
                            for (int i=coreSize;i<currentActiveSize;i++){
                                removeThread();
                            }
                        }
                    }
                }
            });
            this.thread.start();
    
            //根据初始化线程数量,进入初始化
            for (int i=0;i<initSize;i++){
                newThread();
            }
        }
    
        @Override
        public void execute(Runnable runnable) {
            if (this.isShutDown)
                throw new IllegalStateException("thread pool is destroyed");
            this.runnableQueue.offer(runnable);
        }
    
        @Override
        public void shutDown() {
            synchronized (this){
                if (isShutDown)
                    return;
    
                isShutDown = true;
                threadTasks.forEach(threadTask -> {
                    threadTask.internalTask.stop();
                    threadTask.thread.interrupt();
                    this.currentActiveSize--;
                });
    
                thread.interrupt();
            }
        }
    
        @Override
        public boolean isShutDown() {
            return isShutDown;
        }
    
        @Override
        public int getInitSize() {
            if (isShutDown)
                throw new IllegalStateException("thread pool is destroyed");
            return this.initSize;
        }
    
        @Override
        public int getMaxSize() {
            if (isShutDown)
                throw new IllegalStateException("thread pool is destroyed");
            return this.maxSize;
        }
    
        @Override
        public int getCoreSize() {
            if (isShutDown)
                throw new IllegalStateException("thread pool is destroyed");
            return this.coreSize;
        }
    
        @Override
        public int getActiveSize() {
            synchronized (this){
                return currentActiveSize;
            }
        }
    
        @Override
        public int getQueueListSize() {
            if (isShutDown)
                throw new IllegalStateException("thread pool is destroyed");
            return this.runnableQueue.getQueueSize();
        }
    
        private void newThread(){
            InternalTask internalTask = new InternalTask(this.runnableQueue);
            Thread thread = this.threadFactory.createThread(internalTask);
            ThreadTask threadTask = new ThreadTask(internalTask,thread);
            threadTasks.offer(threadTask);
            this.currentActiveSize ++;
            thread.start();
        }
    
        private void removeThread(){
            //从线程池中移除某一个线程
            ThreadTask threadTask = threadTasks.remove();
            threadTask.internalTask.stop();
            this.currentActiveSize --;
        }
    
        //为thread与InternalTask的一个组合
        private static class ThreadTask{
            InternalTask internalTask;
            Thread thread;
    
            public ThreadTask(InternalTask task,Thread thread){
                this.internalTask = task;
                this.thread = thread;
            }
        }
    }
    
    

    一个简单的线程池,就这么实现啦。。。。。

    相关文章

      网友评论

        本文标题:为了更好的理解ExecutorService,实现一个简单线程池

        本文链接:https://www.haomeiwen.com/subject/jstozqtx.html