美文网首页
java线程池原理

java线程池原理

作者: tengwind | 来源:发表于2018-05-31 17:40 被阅读0次

        线程池的其实就是为了利用减少线程创建和销毁,因为创建线程需要分配1M的左右的空间,销毁线程都是需要消耗系统资源的,而且不控制线程的数量,一直创建,对于CPU不停的调度,切换上线文也是很耗时的,所以就有了线程池。
        线程池会创建指定数量的线程,这些线程会不停的运行提交给线程池的任务,重复利用这些线程的资源。Java中的线程池实现类是ThreadPoolExecutor类。举个简单的例子。

      static AtomicInteger threadId = new AtomicInteger(1);
        public static void main(String[] args) throws InterruptedException {
            ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(5, new ThreadFactory() {
                //自定义线程池创建工厂,为了实现线程编号的查看
                public Thread newThread(Runnable r) {
                    return new Thread(r, "pool-thread-id-" + threadId.getAndIncrement());
                }
            });
            for (int i = 0; i < 10; i++) {
                threadPool.submit(new Runnable() {
                    public void run() {
                        //打印出执行这个任务的线程名称
                        System.out.println("执行线程名称" + Thread.currentThread().getName());
                    }
                });
            }
            //关闭线程池
            threadPool.shutdown();
        }
    

       利用建立线程的工具类Executors,创建固定大小为5的线程池对象,后续会讲解这个方法。newFixedThreadPool中,还传入了一个ThreadFactory对象,这个对象是为了创建线程池内的工作线程的工厂类,一般不传入,会使用默认Executors中的静态内部类DefaultThreadFactory,这里为了演示,特意实现了一下。
       我们向线程池中提交了10个任务,每个任务是打印执行当前代码的线程名称,执行的结果如下:


    执行结果

    可以看出,线程池内的线程被重复使用了。这就是线程池为了重复利用线程资源,完成相应的任务。
    上面使用代码介绍了线程池的使用,如果直接看jdk里面的线程池实现,由于实现源码还是比较复杂的,可以先来简单实现一个简单的线程池,掌握一下基本原理。
      在实现线程池之前,可能有几个问题会想到。
    1.如果保证工作线程一直运行
    2.如果提交的任务超过了线程池设置的大小,存储到哪里了

    public class MyThreadPool {
        //任务添加到阻塞队列中
        private BlockedQueue<Runnable> blockedQueue;
        //核心线程大小
        private int corePoolSize;
        //当前核心线程个数
        private AtomicInteger currentCorePoolSize = new AtomicInteger(0);
        private volatile boolean poolRunning;
        private HashSet<Worker> workers = new HashSet<Worker>();
        private ReentrantLock lock = new ReentrantLock();
        public MyThreadPool(int corePoolSize) {
            this.corePoolSize = corePoolSize;
            blockedQueue = new BlockedQueue<Runnable>();
            poolRunning = true;
        }
        //提交任务的方法
        public void submit(Runnable task) throws InterruptedException {
            if (!poolRunning) {
                throw new IllegalStateException("线程池非运行状态");
            }
            //1. 如果当前工作线程个数没有达到核心线程,增加工作线程
            if (currentCorePoolSize.get() < corePoolSize) {
                if(addWorker(task)){
                    currentCorePoolSize.incrementAndGet();
                }
            } else if (blockedQueue.add(task)) {//2. 将任务放到阻塞队列
                System.out.println("任务添加到队列");
            }
        }
        private Runnable getTask() throws InterruptedException {
            return blockedQueue.remove();
        }
        private boolean addWorker(Runnable task) {
            //创建工作线程,并运行提交的任务。
            Worker worker = new Worker(task);
            lock.lock();
            try {
                //添加到worker集合中
                workers.add(worker);
                //运行当前worker线程
                worker.thread.start();
                return true;
            } finally {
                lock.unlock();
            }
        }
        public void shutdown() {
            for (Iterator<Worker> workerIterator = workers.iterator(); workerIterator.hasNext(); ) {
                Worker worker = workerIterator.next();
                //关闭运行的工作线程
                worker.setRunFlag(false);
                if(!worker.thread.isInterrupted()){
                    //将运行的线程进行中断处理
                    worker.thread.interrupt();
                }
            }
            poolRunning = false;
        }
        /**
         * 真正执行任务的线程
         */
        private class Worker implements Runnable {
            //需要执行的任务
            private Runnable task;
            //线程运行标志
            private boolean runFlag = true;
            private Thread thread;
            public void setRunFlag(boolean runFlag) {
                this.runFlag = runFlag;
            }
            public Worker(Runnable task) {
                this.task = task;
                thread = new Thread(this);
            }
            public void run() {
                try {
                    while (runFlag && (task != null || (task = getTask()) != null)) {
                        task.run();
                        //执行完成,让gc回收
                        task = null;
                    }
                } catch (InterruptedException e) {
                    System.out.println("线程被中断");
                }
            }
        }
    }
    

    现在来回答上面的几个问题

    1. 如何保证工作线程一直执行
      利用了一个阻塞队列完成,任务不断的从阻塞队列中取出,然后执行,如果阻塞队列没有任务则当前线程进入等待队列。这要是阻塞队列的原理,其实就是消费者和生产者的模型。
      2.任务存储在哪里
      由上面可以知道,存储在阻塞队列中。
      上面使用的阻塞队列是自己简单实现的,为了方便理解,代码如下:
    public class BlockedQueue<T> {
        public Object[] objects;
        //当前数组元素个数
        private int count;
        //添加元素的下标位置
        private int addIndex;
        //删除元素的下标位置
        private int removeIndex;
        private Lock lock = new ReentrantLock();
        //队列未满信号
        private Condition notFull = lock.newCondition();
        //队列不为空信号
        private Condition notEmpty = lock.newCondition();
        private final static int DEFAULT_SIZE = 1000;
        public BlockedQueue() {
            //初始化数组,大小为默认大小
            objects = new Object[DEFAULT_SIZE];
        }
        public BlockedQueue(int size) {
            //初始化数组
            objects = new Object[size];
        }
        public boolean add(T t) throws InterruptedException {
            lock.lock();
            try {
                while (count == objects.length) {
                    //队列满了,阻塞线程,等待notFull信号
                    System.out.println("begin wait for notFull signal");
                    notFull.await();
                }
                //说明队列有空位了
                objects[addIndex++] = t;
                //等于数组的大小
                if (addIndex == objects.length) {
                    //循环添加队列元素
                    addIndex = 0;
                }
                count++;
                //发送notEmpty信号
                System.out.println("send notEmpty signal");
                notEmpty.signal();
                return true;
            } finally {
                lock.unlock();
            }
        }
        public T remove() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0) {
                    //空的队列等待数据加入
                    System.out.println("begin wait notEmpty signal");
                    notEmpty.await();
                }
                T t = (T) objects[removeIndex];
                objects[removeIndex++] = null;
                if (removeIndex == objects.length) {
                    removeIndex = 0;
                }
                --count;
                System.out.println("send notFull signal");
                notFull.signal();
                return t;
            } finally {
                lock.unlock();
            }
        }
    }
    

    可以看到,这个简单的阻塞队列利用了ReentrantLock和Condition信号量完成。

    参考《java并发编程艺术》

    相关文章

      网友评论

          本文标题:java线程池原理

          本文链接:https://www.haomeiwen.com/subject/ancxsftx.html