美文网首页
Java基础—阻塞队列与线程池

Java基础—阻塞队列与线程池

作者: 锋Plus | 来源:发表于2018-08-04 13:02 被阅读87次

    Executor框架用于把任务的提交和执行解耦,任务的提交交给Runnable或者Callable,而Executor框架用来处理任务。Executor框架中最核心的成员就是ThreadPoolExecutor,它是线程池的核心实现类。

    在介绍ThreadPoolExecutor前,先要了解下BlockingQueue(阻塞队列)

    阻塞队列

    两种常见的阻塞场景:

    • 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
    • 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。

    BlockingQueue核心方法

    放入数据

    • offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程。

    • offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。

    • put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续。

    获取数据

    • poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。

    • take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;

    • drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

    常见的BlockingQueue

    1. ArrayBlockingQueue:基于数组的阻塞队列实现。生产者放入数据和消费者获取数据,共用同一个锁对象。默认采用非公平锁。
    2. LinkedBlockingQueue:基于链表的阻塞队列。生产者端和消费者端分别采用了独立的锁来控制数据同步,并发性能较好。需要注意的是,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE)。
    3. PriorityBlockingQueue: 基于优先级的阻塞无界队列(优先级的判断通过构造函数传入的Compator对象来决定,但不保证同优先级元素顺序)。不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。内部控制线程同步的锁采用的是公平锁。
    4. SynchronousQueue:无缓冲的等待队列。每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都要等待另一个线程的插入操作。由于队列没有容量,所以不能调用peek操作(返回队列头元素)。
    5. DelayQueue:支持延时获取元素的无界阻塞队列。队列中每个元素必须实现Delayed接口。插入数据的操作(生产者)永远不会被阻塞,只有获取数据的操作(消费者)才会被阻塞。

    参考资料:https://www.cnblogs.com/dolphin0520/p/3932906.html


    线程池

    ThreadPoolExecutor构造方法

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
                ······
                                  
        }
    
    • corePoolSize: 核心线程数。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,来预创建线程,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
    • maximumPoolSize:线程池最大线程数。当任务队列满了并且线程数小于maximumPoolSize时,线程池会创建新的线程来处理任务。
    • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,keepAliveTime参数也会作用于核心线程,直到线程池中的线程数为0。
    • unit:参数keepAliveTime的时间单位。
    • workQueue:任务队列(阻塞队列)。上文已经介绍过了。ArrayBlockingQueuePriorityBlockingQueue使用较少,一般使用LinkedBlockingQueueSynchronous。线程池的排队策略与BlockingQueue有关。
    • threadFactory:线程工厂。可以用线程工厂给每个线程设置名字。通常无需设置该参数。
    • handler:饱和策略。当任务队列和线程池都满了时采取的应对策略。默认值为AbortPolicy,有以下四种取值:
    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 
    
    线程池的处理流程

    常见的线程池

    • FixedThreadPool

    可重用固定线程的线程池。只有核心线程并且采用无界阻塞队列。

     public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
    • CachedThreadPool

    无界线程池,可以进行自动线程回收。无核心线程,采用SynchronousQueue不储存元素的阻塞队列。每次提交任务都会立即有线程去处理,所以适于大量的需要立即处理并且耗时较少的任务。

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
    • SingleThreadExecutor

    单线程。能确保所有任务在一个线程中按照顺序逐一执行。

        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
    • ScheduledThreadPool

    定时线程池,该线程池可用于周期性地去执行任务,通常用于周期性的同步数据。固定核心线程数,采用DelayedWorkQueue无界队列。

        public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    

    end

    参考资料:

    《Android进阶之光》
    https://www.cnblogs.com/superfj/p/7544971.html


    相关文章

      网友评论

          本文标题:Java基础—阻塞队列与线程池

          本文链接:https://www.haomeiwen.com/subject/eknuvftx.html