美文网首页
实现一个自己简单线程池

实现一个自己简单线程池

作者: _少年不知愁 | 来源:发表于2020-12-05 19:00 被阅读0次

1.线程池的作用

降低资源消耗的,复用线程,减少对象创建及销毁的资源;
提高线程的管理,统一分配,调优和监控;
提高响应速度,任务到达可以复用原先的线程,不需要等到线程创建才能执行;
功能的扩展,提供定时执行,单线程及并发数的控制;

2.初入jdk线程池核心类

image.png

MyExecutor

/**
 * @author summit
 * @since 2020/12/5 10:44
 *
 * @see java.util.concurrent.Executor
 * @see ExecutorService
 *
 * @see java.util.concurrent.AbstractExecutorService
 */
public interface MyExecutor {

    /**
     *  接收执行任务
     *
     * @param command 任务
     */
    void execute(Runnable command);

}

MyExecutorService

public interface MyExecutorService extends MyExecutor {

    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    void shutdown();
    List<Runnable> shutdownNow();
}

MyAbstractExecutorService

public abstract class MyAbstractExecutorService implements MyExecutorService {

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        FutureTask<T> futureTask = new FutureTask<>(task);
        execute(futureTask);
        return futureTask;
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        FutureTask<T> futureTask = new FutureTask<>(task, null);
        execute(futureTask);
        return futureTask;
    }

    @Override
    public void shutdown() {

    }

    @Override
    public List<Runnable> shutdownNow() {

        return null;
    }

}

MytThreadPoolExecutor

/**
 * @author summit
 * @see java.util.concurrent.ThreadPoolExecutor
 * @since 2020/12/5 10:45
 */
public class MytThreadPoolExecutor extends MyAbstractExecutorService {

    private volatile int corePoolSize;

    private volatile int maxNumPoolSize;

    private final AtomicInteger ctl = new AtomicInteger(0);

    private LinkedBlockingQueue<Runnable> workQueue;

    private volatile long keepAliveTime;

    private volatile boolean allowCoreThreadTimeOut;

    public MytThreadPoolExecutor(int corePoolSize, int maxNumPoolSize,
        LinkedBlockingQueue<Runnable> blockingQueue) {
        this.corePoolSize = corePoolSize;
        this.maxNumPoolSize = maxNumPoolSize;
        this.workQueue = blockingQueue;
    }

    public MytThreadPoolExecutor(int corePoolSize, int maxNumPoolSize,
        LinkedBlockingQueue<Runnable> workQueue, long keepAliveTime,
        boolean allowCoreThreadTimeOut) {
        this.corePoolSize = corePoolSize;
        this.maxNumPoolSize = maxNumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = keepAliveTime;
        if (keepAliveTime > 0) {
            allowCoreThreadTimeOut = true;
        }
        this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
    }

    @Override
    public void execute(Runnable task) {

        if (task == null) {
            throw new NullPointerException();
        }
        int c = ctl.get();
        if (c < corePoolSize) {
            addWorker(task, true);
        } else if (workQueue.offer(task)) {
            //放入等待队列
            addWorker(null, false);
        } else {
            // 拒绝策略
            reject(task);
        }
    }

    static class RejectExecutedHandler {

        public void reject(Runnable runnable) {
            throw new RejectedExecutionException("任务处理不了:" + runnable);
        }
    }

    private void reject(Runnable task) {
        new RejectExecutedHandler().reject(task);
    }

    private void addWorker(Runnable task, Boolean coreFlag) {
        if (coreFlag) {
            ctl.incrementAndGet();
        }
        Worker worker = new Worker(task);
        worker.getThread().start();
    }

    @EqualsAndHashCode(callSuper = true)
    @Data
    class Worker extends ReentrantLock implements Runnable {

        private Runnable firstTask;

        private Thread thread;

        public Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            thread = new Thread(this);
        }


        @Override
        public void run() {
            runWorker(this);
        }

        private void runWorker(Worker w) {
            try {
                w.lock();
                Runnable task = w.firstTask;
                if (task != null || ((task = getTask()) != null)) {
                    task.run();
                }
            } finally {
                processWorkerExit(w);
                w.unlock();
            }
        }

        private void processWorkerExit(Worker w) {
            addWorker(null, false);
        }

        private Runnable getTask() {
            try {
                if (workQueue.isEmpty()) {
                    return null;
                }
                return allowCoreThreadTimeOut ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
                    : workQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    }
}

测试

public class MainTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        myThreadPool();
    }

    public static void myThreadPool() throws ExecutionException, InterruptedException {
        MytThreadPoolExecutor executor = new MytThreadPoolExecutor(0,1,
            new LinkedBlockingQueue<>(100));
        // for (int i = 0; i < 10 ; i++) {
        //     executor.execute(new Runnable() {
        //         @Override
        //         public void run() {
        //             System.out.println("myThreadPool start=======");
        //         }
        //     });
        // }

        for (int i = 0; i < 10 ; i++) {
            Future<Object> f = executor.submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    System.out.println("callable start========");
                    return "callable";
                }
            });
            System.out.println("main : ="+f.get());
        }

    }



        public static void jdkThreadPool() {

        //    ThreadPoolExecutor

        ExecutorService executor = Executors.newCachedThreadPool();
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("start work by thread pool");
            }
        });

    }
}

输出结果

callable start
main : =callable
callable start
main : =callable
callable start
main : =callable
callable start
main : =callable
callable start
main : =callable
callable start
main : =callable
callable start
main : =callable
callable start
main : =callable
callable start
main : =callable
callable start
main : =callable

相关文章

网友评论

      本文标题:实现一个自己简单线程池

      本文链接:https://www.haomeiwen.com/subject/oeggwktx.html