美文网首页
利用等待-通知模式实现简单的ThreadPool

利用等待-通知模式实现简单的ThreadPool

作者: 陆小红_ | 来源:发表于2017-04-03 11:37 被阅读0次

    写在前面

      说实话,我个人是不太喜欢写博客或技术文章的。一来是因为个人水平有限,怕写出来的东西错误百出,误人子弟;二来在学习的过程中,如果想着写博客输出wiki等,会严重影响思路(我个人是这么认为的),所以几次想尝试写写学习笔记啥的都没能走出第一步。前几天,因为某些原因给我一些触动,让我觉得还是要向某人学习,该输出的还是要输出。

      本文主要是我在前段时间学习Java多线程相关知识时,总结的一些个人实操,代码本机测试OK(PC : MacBook Pro - Mac OS 10.11.6 , JDK : JDK 1.8.0_40),如发现BUG之类,欢迎各位指正。除此之外,说明一下,本文部分代码和内容参考自《Java并发编程的艺术》与《Java并发编程实战》,如有雷同,就是引用。

    引言

      相信大多数程序员在学习或工作中都听过或使用过线程池这一简单高效的并发组件,尤其在一些对响应时间要求非常严苛的场景,使用线程池能让我们高效方便的的写出并发代码。但是对于相当一部分人来说,对于线程池的认识也仅仅停留在用过甚至是听过的阶段,并没有真正的去了解它的工作原理和实现方式。借着最近学习《Java并发编程的艺术》契机,我看了JDK8中几种线程池的实现和它们对应的特性,再结合书中的案例,实现了一个简单的线程池。本线程池并没有过多的考虑各种各样的应用场景,只是为了在自己学习的基础上,探究并实践了线程池实现的原理和机制,虽说代码不多,但是其用到的思想还是很值得我们在编写代码中学习的,个人认为这种:**获得锁 → 条件不满足 → 等待 → 阻塞 → 释放锁 → 被通知 → 获得锁 → 阻塞返回并执行 → 释放锁 **的模式能实现很多并发,异步,消费者生产者的问题。所以代码不多,思想很妙。下面是示例代码:

    1、线程池接口

      示例代码:

    /**
     * Created by luxiaohong on 17/3/7.
     */
    public interface IThreadPool<Job extends Runnable> {
        int MAX_THREAD_COUNT = 1024;  //线程池允许最大的活动线程数
        int DEFAULT_THREAD_COUNT = 10;  //线程池默认线程数
        int MIN_THREAD_COUNT = 1;  //线程池最小线程数
        void submit(Job job);  //提交任务至线程池执行
        void shutdown(int count);  //关闭一定数量的线程数
        void shutdownAll();  //关闭所有数量的线程
    } ```
    ###2、线程池实现类
    &emsp;&emsp;示例代码:
    

    /**

    • Created by luxiaohong on 17/3/7.
      */
      public class ThreadPool<Job extends Runnable> implements IThreadPool<Job>{

      private volatile boolean isShutdown = false; //标识线程池是否被关闭

      private int threadCount = DEFAULT_THREAD_COUNT;

      private int threadNum = 0; //标识线程的编号

      private final LinkedList<Job> jobs = new LinkedList<>(); //用LinkedList来组织待处理任务列表

      private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>()); //用List组织工作者线程

      public ThreadPool(){
      this(DEFAULT_THREAD_COUNT);
      }

      public ThreadPool(int initSize){
      if (initSize < MIN_THREAD_COUNT) {
      throw new IllegalArgumentException("initSize is negative");
      }
      if (initSize > MAX_THREAD_COUNT) {
      initSize = MAX_THREAD_COUNT;
      }
      for (int i = 0; i < initSize; i++) { //初始化工作者线程,并启动线程
      Worker worker = new Worker();
      Thread t = new Thread(worker, "thread-"+threadNum++);
      workers.add(worker);
      t.start();

       }
      

      }

    class Worker implements Runnable{
        //使用volatile修饰,保证其他线程修改,本线程立即可见
        private volatile boolean isAlive = true;
    
        @Override
        public void run(){
            while (isAlive) {  //首先判断线程是否存活
                Job job = null;
                synchronized (jobs) {  //对任务集合进行加锁
                    while (isAlive && jobs.isEmpty()) {  //当线程存活且任务队列为空,本工作线程阻塞等待
                        try {
                            jobs.wait();  //阻塞等待
                        } catch (InterruptedException e) {
                            //当抛出InterruptedException时,终端标志位会被重置,需要重新设置
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    //判断线程从阻塞返回的条件,如果jobs不为空,则执行
                    if (!jobs.isEmpty()) {
                        job = jobs.removeFirst();
                    }
                }
                 //此处需判断job是否为空,因为跳出循环可能是由于线程被杀死
                if (job != null) {
                    job.run();
                }
            }
        }
    
        //暂停worker
        public void shutdown(){
            this.isAlive = false;
        }
    }
    
    @Override
    public void submit(Job job) {
        if (job == null) {
            throw new NullPointerException("job is null.");
        }
        if (isShutdown) {
            throw new IllegalStateException("threadpool is shutdown.");
        }
        synchronized (jobs) {  //当提交一个任务后,先将任务队列加锁在将任务加入队尾,最后通知工作线程并释放锁
            jobs.addLast(job);
            jobs.notify();
        }
    }
    
    @Override
    public void shutdownAll() {
        if (isShutdown) {
            throw new IllegalStateException("threadpool is shutdown.");
        }
        for (Worker worker : workers) {
            worker.shutdown();
        }
        this.isShutdown = true;
    
    }
    
    @Override
    public void shutdown(int count) {
        synchronized (workers) {
            if (count > threadCount) {
                throw new IllegalArgumentException("shutdown thread count gt current thread count.");
            }
            if (count == threadCount) {
                //关闭所有线程
                shutdownAll();
                return;
            }
            for (int i = threadCount - 1; i > threadCount - count; i--) {
                workers.get(i).shutdown();
            }
            threadCount -= count;
        }
    }
    

    }

    ###3、测试线程池主程序
    &emsp;&emsp;示例代码:
    

    /**

    • Created by luxiaohong on 17/3/8.
      */
      public class ThreadPoolTest {
      public static void main(String[] args) {
      ThreadPool threadPool = new ThreadPool();
      for (int i =0 ; i < 1000 ; i++) {
      threadPool.submit(() -> {
      try {
      Thread.sleep(500);
      System.out.println(Thread.currentThread().getName()+"执行完毕");
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      });
      }
      //threadPool.shutdownAll();
      }
      }
    ###4、执行结果
    
    ![线程池测试主程序执行效果图  MacBook Pro - Mac OS 10.11.6](https://img.haomeiwen.com/i5343648/d164b1d40a5a583b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    ###5、总结
    &emsp;&emsp;此线程池实际包含两个重要的组成部分。第一个是任务执行者池(Runnable),第二个是待执行任务池(Runnable)。线程池实现思想就是,当待执行任务池为空即没有任务时,任务执行者池中的任务执行者(使用Thread包装的Runable对象)会阻塞在待执行任务池这个条件上,一旦客户端提交了一个任务,客户端线程首先对待执行任务池加锁,然后将任务安全的加入到待执行任务池中,最后通知任务执行者池中的某一个任务执行者,被选中通知的任务执行者将会从待执行任务池中获取这个任务,然后执行下去。至此,一个任务从提交到执行结束的生命周期就这样完成了。我们都知道在Java中要想新建一个线程,有两种方式,一是实现Runnable接口,然后用Thread类包装,start()调用执行;二是继承Thread类,重写run()方法,然后实例化一个对象,start()执行。之前了解到使用实现Runnable接口的方式有很多优点,通过线程池的实现也可以看出,这种方式对于一些复杂任务,更高级的并发组件实现起来,具有很好的解耦性。
    

    相关文章

      网友评论

          本文标题:利用等待-通知模式实现简单的ThreadPool

          本文链接:https://www.haomeiwen.com/subject/crgnottx.html