美文网首页Springboot
java 实现自定义线程池

java 实现自定义线程池

作者: 真老根儿 | 来源:发表于2018-07-23 20:30 被阅读18次

    java 实现自定义线程池

    定义线程池接口

    public interface ThreadPool<Job extends Runnable> {
    
        // 执行一个Job
        void execute(Job job);
    
        // 关闭线程池
        void shutdown();
    
        // 添加工作者线程
        void addWorkers(int num);
    
        // 减少工作者线程
        void removeWorker(int num);
    
        // 得到正在等待执行的任务数量
        int getJobSize();
    }
    
    

    线程池接口的默认实现

    public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    
        // 线程最大限制
        private static int MAX_WORKER_NUMBERS = 10;
    
        //线程默认数量
        private static int DEFAULT_WORKER_NUMBERS = 5;
    
        // 线程最小数量
        private static int MIN_WORKER_NUMBERS = 1;
    
        // 工作列表
        private final LinkedList<Job> jobs = new LinkedList<>();
    
        // 工作者列表
        private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());
    
        // 当前工作者的线程数量
        private int workerNum = DEFAULT_WORKER_NUMBERS;
    
        private AtomicLong threadNum = new AtomicLong();
    
        public DefaultThreadPool() {
            initializeWorkers(DEFAULT_WORKER_NUMBERS);
        }
    
        public DefaultThreadPool(int num) {
            workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
            initializeWorkers(workerNum);
        }
    
    
        // 初始化线程工作者
        private void initializeWorkers(int num) {
            for (int i = 0; i < num; i++) {
                Worker worker = new Worker();
                workers.add(worker);
                Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
                thread.start();
            }
        }
    
        @Override
        public void execute(Job job) {
            if (job != null) {
                synchronized (jobs) {
                    //添加一个工作  然后通知
                    jobs.addLast(job);
                    jobs.notify();
                }
            }
        }
    
        @Override
        public void shutdown() {
            workers.forEach(Worker::shutdown);
        }
    
        @Override
        public void addWorkers(int num) {
            synchronized (jobs) {
                // 限制新增的worker数量不能超过最大值
                if (num + this.workerNum > MAX_WORKER_NUMBERS) {
                    num = MAX_WORKER_NUMBERS - this.workerNum;
                }
                initializeWorkers(num);
                this.workerNum += num;
            }
        }
    
        @Override
        public void removeWorker(int num) {
    
            synchronized (jobs) {
                if (num >= this.workerNum) {
                    throw new IllegalArgumentException("beyond workNum");
                }
                //按照给定的数量停止worker
                int count = 0;
                while (count < num) {
                    Worker worker = workers.get(count);
                    if (workers.remove(worker)) {
                        worker.shutdown();
                        count++;
                    }
                }
                this.workerNum -= count;
            }
        }
    
        @Override
        public int getJobSize() {
            return jobs.size();
        }
    
        // 工作者 负责消费任务
        class Worker implements Runnable {
    
            // 是否工作
            private volatile boolean running = true;
    
            @Override
            public void run() {
                while (running) {
                    Job job;
                    synchronized (jobs) {
                        while (jobs.isEmpty()) {
                            try {
                                jobs.wait();
                            } catch (InterruptedException e) {
                                // 感知到外部对WorkerThread的中斷操作,返回
                                Thread.currentThread().interrupt();
                                return;
                            }
                        }
                        job = jobs.removeFirst();
                    }
                    if (job != null) {
                        try {
                            job.run();
                        } catch (Exception ex) {
                            //忽略job执行中的异常
                        }
                    }
    
                }
    
            }
    
            public void shutdown() {
    
                running = false;
    
            }
        }
    }
    

    示例摘抄于《Java并发变成的艺术》4.4.3线程池技术及其示例

    相关文章

      网友评论

        本文标题:java 实现自定义线程池

        本文链接:https://www.haomeiwen.com/subject/vkqsmftx.html