美文网首页
java线程池——自己实现简单线程池

java线程池——自己实现简单线程池

作者: small瓜瓜 | 来源:发表于2019-07-09 17:49 被阅读0次

    java中我们常用到各种池,比如线程池、数据库连接池等,各种池其目的之一就是为了提高资源的利用率。很多时候初学者都是直接使用java提供的api,这样很方便。为了更好地使用提供的api,还是需要更深入的了解它的原理。下面代码清单是一个简单的自己实现的线程池

    public interface ThreadPool {
        void execute(Runnable runnable);
        void shutdown();
        void addWorkers(int num);
        void removeWorker(int num);
        int getJobSize();
    }
    
    import java.util.LinkedList;
    import java.util.concurrent.atomic.AtomicLong;
    
    public class MyPoolThread implements ThreadPool {
    
        private static final int MAX_WORKER_NUMBERS = 30;
        private static final int MIN_WORKER_NUMBERS = 1;
        private static final int DEFAULT_WORKER_NUMBERS = 10;
    
        private final LinkedList<Worker> workers = new LinkedList<>();
        private final LinkedList<Runnable> jobs = new LinkedList<>();
    
        private AtomicLong nextId = new AtomicLong(0);
        private int workerNum;
    
        public MyPoolThread(int num) {
            int workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : 
    num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
            initializeWorkers(workerNum);
        }
    
        public MyPoolThread() {
            this(DEFAULT_WORKER_NUMBERS);
        }
    
        @Override
        public void execute(Runnable runnable) {
            synchronized (jobs) {
                jobs.add(runnable);
                jobs.notify();
            }
        }
    
        @Override
        public void shutdown() {
            synchronized (workers) {
                removeWorker(workerNum);
            }
        }
    
        @Override
        public void addWorkers(int num) {
            synchronized (workers) {
                if (num > 0 && num + workers.size() <= MAX_WORKER_NUMBERS) {
                    initializeWorkers(num);
                } else if (num <= 0) {
                    throw new IllegalArgumentException("num = " + num + "不能小于 1");
                } else {
                    throw new IllegalArgumentException("工作者数量已到达最大值");
                }
            }
        }
    
        private void initializeWorkers(int num) {
            for (int i = 0; i < num; i++) {
                Worker worker = new Worker();
                workers.add(worker);
                Thread thread = new Thread(worker, "ThreadPool-Worker-" + nextId.getAndAdd(1));
                thread.start();
            }
            workerNum += num;
        }
    
        @Override
        public void removeWorker(int num) {
            synchronized (workers) {
                checkNum(num, 1, workerNum);
                for (int i = 0; i < num; i++) {
                    Worker worker = workers.removeLast();
                    worker.cancel();
                    worker.thread.interrupt();
                    worker = null;
                }
                workerNum -= num;
            }
        }
    
        private void checkNum(int num, int min, int max) {
            if (num > max || num < min) {
                throw new IllegalArgumentException("num = " + 
    num + "应该大于等于" + min + "并且小于等于" + max);
            }
        }
    
    
        @Override
        public int getJobSize() {
            return jobs.size();
        }
    
        class Worker implements Runnable {
    
            private boolean running = true;
            private Thread thread;
    
            boolean cancel() {
                if (!running) {
                    throw new RuntimeException("worker is cancel");
                }
                running = false;
                return true;
            }
    
            @Override
            public void run() {
                thread = Thread.currentThread();
                while (running) {
                    Runnable job;
                    synchronized (jobs) {
                        if (jobs.size() == 0) {
                            try {
                                jobs.wait();
                            } catch (InterruptedException e) {
                                setEx(thread, e);
                            }
                        }
                        job = jobs.remove();
                    }
                    try {
                        executeJob(job);
                    } catch (Throwable e) {
                        setEx(thread, e);
                    }
                }
            }
    
            private void executeJob(Runnable job) {
                job.run();
            }
        }
    
        private void setEx(Thread thread, Throwable e) {
    //  异常处理
        }
    }
    
    下面是主方法
    public static void main(String[] args) {
            MyPoolThread myPoolThread = new MyPoolThread();
            for (int i = 0; i < 50; i++) {
                int j = i;
                myPoolThread.execute(() -> {
                    System.out.println(j+":我是消息");
                });
            }
            myPoolThread.shutdown();
        }
    

    可以看到这里线程池还是有许多的问题了,不过作为了解其基本原理,已经够了。这个线程池在初始化的时候就初始化了一定数目的线程。并且启动了它们,通过synchronized锁的notify/wait机制实现一旦有任务进来,就会有工作线程去执行。

    相关文章

      网友评论

          本文标题:java线程池——自己实现简单线程池

          本文链接:https://www.haomeiwen.com/subject/gyktkctx.html