问题
(1)DelayQueue是阻塞队列吗?
(2)DelayQueue的实现方式?
(3)DelayQueue主要用于什么场景?
简介
DelayQueue是java并发包下的延时阻塞队列,常用于实现定时任务。
继承体系
从继承体系可以看到,DelayQueue实现了BlockingQueue,所以它是一个阻塞队列。
另外,DelayQueue还组合了一个叫做Delayed的接口,DelayQueue中存储的所有元素必须实现Delayed接口。
那么,Delayed是什么呢?
publicinterfaceDelayedextendsComparable{longgetDelay(TimeUnit unit);}
Delayed是一个继承自Comparable的接口,并且定义了一个getDelay()方法,用于表示还有多少时间到期,到期了应返回小于等于0的数值。
源码分析
主要属性
// 用于控制并发的锁privatefinaltransientReentrantLock lock =newReentrantLock();// 优先级队列privatefinalPriorityQueue q =newPriorityQueue();// 用于标记当前是否有线程在排队(仅用于取元素时)privateThread leader =null;// 条件,用于表示现在是否有可取的元素privatefinalCondition available = lock.newCondition();
从属性我们可以知道,延时队列主要使用优先级队列来实现,并辅以重入锁和条件来控制并发安全。
因为优先级队列是无界的,所以这里只需要一个条件就可以了。
还记得优先级队列吗?点击链接直达【死磕 java集合之PriorityQueue源码分析】
主要构造方法
publicDelayQueue(){}publicDelayQueue(Collection<? extends E> c){this.addAll(c);}
构造方法比较简单,一个默认构造方法,一个初始化添加集合c中所有元素的构造方法。
入队
因为DelayQueue是阻塞队列,且优先级队列是无界的,所以入队不会阻塞不会超时,因此它的四个入队方法是一样的。
publicbooleanadd(E e){returnoffer(e);}publicvoidput(E e){ offer(e);}publicbooleanoffer(E e,longtimeout, TimeUnit unit){returnoffer(e);}publicbooleanoffer(E e){finalReentrantLock lock =this.lock; lock.lock();try{ q.offer(e);if(q.peek() == e) { leader =null; available.signal(); }returntrue; }finally{ lock.unlock(); }}
入队方法比较简单:
(1)加锁;
(2)添加元素到优先级队列中;
(3)如果添加的元素是堆顶元素,就把leader置为空,并唤醒等待在条件available上的线程;
(4)解锁;
出队
因为DelayQueue是阻塞队列,所以它的出队有四个不同的方法,有抛出异常的,有阻塞的,有不阻塞的,有超时的。
我们这里主要分析两个,poll()和take()方法。
publicEpoll(){finalReentrantLock lock =this.lock; lock.lock();try{ E first = q.peek();if(first ==null|| first.getDelay(NANOSECONDS) >0)returnnull;elsereturnq.poll(); }finally{ lock.unlock(); }}
poll()方法比较简单:
(1)加锁;
(2)检查第一个元素,如果为空或者还没到期,就返回null;
(3)如果第一个元素到期了就调用优先级队列的poll()弹出第一个元素;
(4)解锁。
publicEtake()throwsInterruptedException{finalReentrantLock lock =this.lock; lock.lockInterruptibly();try{for(;;) {// 堆顶元素E first = q.peek();// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待if(first ==null) available.await();else{// 堆顶元素的到期时间longdelay = first.getDelay(NANOSECONDS);// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素if(delay <=0)returnq.poll();// 如果delay大于0 ,则下面要阻塞了// 将first置为空方便gc,因为有可能其它元素弹出了这个元素// 这里还持有着引用不会被清理first =null;// don't retain ref while waiting// 如果前面有其它线程在等待,直接进入等待if(leader !=null) available.await();else{// 如果leader为null,把当前线程赋值给它Thread thisThread = Thread.currentThread(); leader = thisThread;try{// 等待delay时间后自动醒过来// 醒过来后把leader置空并重新进入循环判断堆顶元素是否到期// 这里即使醒过来后也不一定能获取到元素// 因为有可能其它线程先一步获取了锁并弹出了堆顶元素// 条件锁的唤醒分成两步,先从Condition的队列里出队// 再入队到AQS的队列中,当其它线程调用LockSupport.unpark(t)的时候才会真正唤醒// 关于AQS我们后面会讲的^^available.awaitNanos(delay); }finally{// 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素if(leader == thisThread) leader =null; } } } } }finally{// 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程if(leader ==null&& q.peek() !=null)// signal()只是把等待的线程放到AQS的队列里面,并不是真正的唤醒available.signal();// 解锁,这才是真正的唤醒lock.unlock(); }}
take()方法稍微要复杂一些:
(1)加锁;
(2)判断堆顶元素是否为空,为空的话直接阻塞等待;
(3)判断堆顶元素是否到期,到期了直接调用优先级队列的poll()弹出元素;
(4)没到期,再判断前面是否有其它线程在等待,有则直接等待;
(5)前面没有其它线程在等待,则把自己当作第一个线程等待delay时间后唤醒,再尝试获取元素;
(6)获取到元素之后再唤醒下一个等待的线程;
(7)解锁;
使用方法
说了那么多,是不是还是不知道怎么用呢?那怎么能行,请看下面的案例:
publicclassDelayQueueTest{publicstaticvoidmain(String[] args){ DelayQueue queue =newDelayQueue<>();longnow = System.currentTimeMillis();// 启动一个线程从队列中取元素newThread(()->{while(true) {try{// 将依次打印1000,2000,5000,7000,8000System.out.println(queue.take().deadline - now); }catch(InterruptedException e) { e.printStackTrace(); } } }).start();// 添加5个元素到队列中queue.add(newMessage(now +5000)); queue.add(newMessage(now +8000)); queue.add(newMessage(now +2000)); queue.add(newMessage(now +1000)); queue.add(newMessage(now +7000)); }}classMessageimplementsDelayed{longdeadline;publicMessage(longdeadline){this.deadline = deadline; }@OverridepubliclonggetDelay(TimeUnit unit){returndeadline - System.currentTimeMillis(); }@OverridepublicintcompareTo(Delayed o){return(int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); }@OverridepublicStringtoString(){returnString.valueOf(deadline); }}
是不是很简单,越早到期的元素越先出队。
总结
(1)DelayQueue是阻塞队列;
(2)DelayQueue内部存储结构使用优先级队列;
(3)DelayQueue使用重入锁和条件来控制并发安全;
(4)DelayQueue常用于定时任务;
写在最后:
码字不易看到最后了,那就点个关注呗,只收藏不点关注的都是在耍流氓!关注并私信我“架构”,免费送一些Java架构资料,先到先得!
网友评论