美文网首页
实现一个自己简单线程池

实现一个自己简单线程池

作者: _少年不知愁 | 来源:发表于2020-12-05 19:00 被阅读0次

    1.线程池的作用

    降低资源消耗的,复用线程,减少对象创建及销毁的资源;
    提高线程的管理,统一分配,调优和监控;
    提高响应速度,任务到达可以复用原先的线程,不需要等到线程创建才能执行;
    功能的扩展,提供定时执行,单线程及并发数的控制;

    2.初入jdk线程池核心类

    image.png

    MyExecutor

    /**
     * @author summit
     * @since 2020/12/5 10:44
     *
     * @see java.util.concurrent.Executor
     * @see ExecutorService
     *
     * @see java.util.concurrent.AbstractExecutorService
     */
    public interface MyExecutor {
    
        /**
         *  接收执行任务
         *
         * @param command 任务
         */
        void execute(Runnable command);
    
    }
    

    MyExecutorService

    public interface MyExecutorService extends MyExecutor {
    
        <T> Future<T> submit(Callable<T> task);
        <T> Future<T> submit(Runnable task, T result);
        void shutdown();
        List<Runnable> shutdownNow();
    }
    

    MyAbstractExecutorService

    public abstract class MyAbstractExecutorService implements MyExecutorService {
    
        @Override
        public <T> Future<T> submit(Callable<T> task) {
            FutureTask<T> futureTask = new FutureTask<>(task);
            execute(futureTask);
            return futureTask;
        }
    
        @Override
        public <T> Future<T> submit(Runnable task, T result) {
            FutureTask<T> futureTask = new FutureTask<>(task, null);
            execute(futureTask);
            return futureTask;
        }
    
        @Override
        public void shutdown() {
    
        }
    
        @Override
        public List<Runnable> shutdownNow() {
    
            return null;
        }
    
    }
    

    MytThreadPoolExecutor

    /**
     * @author summit
     * @see java.util.concurrent.ThreadPoolExecutor
     * @since 2020/12/5 10:45
     */
    public class MytThreadPoolExecutor extends MyAbstractExecutorService {
    
        private volatile int corePoolSize;
    
        private volatile int maxNumPoolSize;
    
        private final AtomicInteger ctl = new AtomicInteger(0);
    
        private LinkedBlockingQueue<Runnable> workQueue;
    
        private volatile long keepAliveTime;
    
        private volatile boolean allowCoreThreadTimeOut;
    
        public MytThreadPoolExecutor(int corePoolSize, int maxNumPoolSize,
            LinkedBlockingQueue<Runnable> blockingQueue) {
            this.corePoolSize = corePoolSize;
            this.maxNumPoolSize = maxNumPoolSize;
            this.workQueue = blockingQueue;
        }
    
        public MytThreadPoolExecutor(int corePoolSize, int maxNumPoolSize,
            LinkedBlockingQueue<Runnable> workQueue, long keepAliveTime,
            boolean allowCoreThreadTimeOut) {
            this.corePoolSize = corePoolSize;
            this.maxNumPoolSize = maxNumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = keepAliveTime;
            if (keepAliveTime > 0) {
                allowCoreThreadTimeOut = true;
            }
            this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
        }
    
        @Override
        public void execute(Runnable task) {
    
            if (task == null) {
                throw new NullPointerException();
            }
            int c = ctl.get();
            if (c < corePoolSize) {
                addWorker(task, true);
            } else if (workQueue.offer(task)) {
                //放入等待队列
                addWorker(null, false);
            } else {
                // 拒绝策略
                reject(task);
            }
        }
    
        static class RejectExecutedHandler {
    
            public void reject(Runnable runnable) {
                throw new RejectedExecutionException("任务处理不了:" + runnable);
            }
        }
    
        private void reject(Runnable task) {
            new RejectExecutedHandler().reject(task);
        }
    
        private void addWorker(Runnable task, Boolean coreFlag) {
            if (coreFlag) {
                ctl.incrementAndGet();
            }
            Worker worker = new Worker(task);
            worker.getThread().start();
        }
    
        @EqualsAndHashCode(callSuper = true)
        @Data
        class Worker extends ReentrantLock implements Runnable {
    
            private Runnable firstTask;
    
            private Thread thread;
    
            public Worker(Runnable firstTask) {
                this.firstTask = firstTask;
                thread = new Thread(this);
            }
    
    
            @Override
            public void run() {
                runWorker(this);
            }
    
            private void runWorker(Worker w) {
                try {
                    w.lock();
                    Runnable task = w.firstTask;
                    if (task != null || ((task = getTask()) != null)) {
                        task.run();
                    }
                } finally {
                    processWorkerExit(w);
                    w.unlock();
                }
            }
    
            private void processWorkerExit(Worker w) {
                addWorker(null, false);
            }
    
            private Runnable getTask() {
                try {
                    if (workQueue.isEmpty()) {
                        return null;
                    }
                    return allowCoreThreadTimeOut ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
                        : workQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return null;
            }
        }
    }
    

    测试

    public class MainTest {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            myThreadPool();
        }
    
        public static void myThreadPool() throws ExecutionException, InterruptedException {
            MytThreadPoolExecutor executor = new MytThreadPoolExecutor(0,1,
                new LinkedBlockingQueue<>(100));
            // for (int i = 0; i < 10 ; i++) {
            //     executor.execute(new Runnable() {
            //         @Override
            //         public void run() {
            //             System.out.println("myThreadPool start=======");
            //         }
            //     });
            // }
    
            for (int i = 0; i < 10 ; i++) {
                Future<Object> f = executor.submit(new Callable<Object>() {
                    @Override
                    public Object call() throws Exception {
                        System.out.println("callable start========");
                        return "callable";
                    }
                });
                System.out.println("main : ="+f.get());
            }
    
        }
    
    
    
            public static void jdkThreadPool() {
    
            //    ThreadPoolExecutor
    
            ExecutorService executor = Executors.newCachedThreadPool();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("start work by thread pool");
                }
            });
    
        }
    }
    

    输出结果

    callable start
    main : =callable
    callable start
    main : =callable
    callable start
    main : =callable
    callable start
    main : =callable
    callable start
    main : =callable
    callable start
    main : =callable
    callable start
    main : =callable
    callable start
    main : =callable
    callable start
    main : =callable
    callable start
    main : =callable

    相关文章

      网友评论

          本文标题:实现一个自己简单线程池

          本文链接:https://www.haomeiwen.com/subject/oeggwktx.html