美文网首页
DelayQueue

DelayQueue

作者: 程序员札记 | 来源:发表于2022-05-02 08:27 被阅读0次

    DelayQueue是BlockingQueue的一种,所以它是线程安全的,DelayQueue的特点就是插入Queue中的数据可以按照自定义的delay时间进行排序。只有delay时间小于0的元素才能够被取出。

    DelayQueue是一个没有边界BlockingQueue实现,加入其中的元素必需实现Delayed接口。当生产者线程调用put之类的方法加入元素时,会触发Delayed接口中的compareTo方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。

    package com.conrrentcy.juc;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class DelayedQueneTest {
        public static void main(String[] args) throws InterruptedException {
            Item item1 = new Item("item1", 5, TimeUnit.SECONDS);
            Item item2 = new Item("item2", 10, TimeUnit.SECONDS);
            Item item3 = new Item("item3", 15, TimeUnit.SECONDS);
            DelayQueue<Item> queue = new DelayQueue<>();
            queue.put(item1);
            queue.put(item2);
            queue.put(item3);
            System.out.println("begin time:"
                    + LocalDateTime.now().format(
                            DateTimeFormatter.ISO_LOCAL_DATE_TIME));
            for (int i = 0; i < 3; i++) {
                Item take = queue.take();
                System.out
                        .format("name:{%s}, time:{%s}\n", take.name, LocalDateTime
                                .now().format(DateTimeFormatter.ISO_DATE_TIME));
            }
        }
    
    }
    
    class Item implements Delayed {
        /* 触发时间 */
        private long time;
        String name;
    
        public Item(String name, long time, TimeUnit unit) {
            this.name = name;
            this.time = System.currentTimeMillis()
                    + (time > 0 ? unit.toMillis(time) : 0);
        }
    
        @Override
        public long getDelay(TimeUnit unit) {
            return time - System.currentTimeMillis();
        }
    
        @Override
        public int compareTo(Delayed o) {
            Item item = (Item) o;
            long diff = this.time - item.time;
            if (diff <= 0) {// 改成>=会造成问题
                return -1;
            } else {
                return 1;
            }
        }
    
        @Override
        public String toString() {
            return "Item{" + "time=" + time + ", name='" + name + '\'' + '}';
        }
    }
    

    先看一下DelayQueue的定义:

    public interface Delayed extends Comparable<Delayed> {
     
        /**
         * Returns the remaining delay associated with this object, in the
         * given time unit.
         *
         * @param unit the time unit
         * @return the remaining delay; zero or negative values indicate
         * that the delay has already elapsed
         */
        long getDelay(TimeUnit unit);
    }
    

    由Delayed定义可以得知,队列元素需要实现getDelay(TimeUnit unit)方法和compareTo(Delayed o)方法, getDelay定义了剩余到期时间,compareTo方法定义了元素排序规则,注意,元素的排序规则影响了元素的获取顺序,为什么这样设计呢?

    因为DelayQueue的底层存储是一个PriorityQueue,PriorityQueue是一个可排序的Queue,其中的元素必须实现Comparable方法。而getDelay方法则用来判断排序后的元素是否可以从Queue中取出。
    内部存储结构  
    DelayedQuene的元素存储交由优先级队列存放。

    public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
        private final transient ReentrantLock lock = new ReentrantLock();
        private final PriorityQueue<E> q = new PriorityQueue<E>();//元素存放
    

    DelayedQuene的优先级队列使用的排序方式是队列元素的compareTo方法,优先级队列存放顺序是从小到大的,所以队列元素的compareTo方法影响了队列的出队顺序。
    若compareTo方法定义不当,会造成延时高的元素在队头,延时低的元素无法出队。

    获取队列元素

        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                E first = q.peek();
                if (first == null || first.getDelay(NANOSECONDS) > 0)
                    return null;
                else
                    return q.poll();
            } finally {
                lock.unlock();
            }
        }
    
    

    PriorityQueue队列peek()方法。
    public E peek() { return (size == 0) ? null : (E) queue[0];}

    由代码我们可以看出,获取元素时,总是判断PriorityQueue队列的队首元素是否到期,若未到期,返回null,所以compareTo()的方法实现不当的话,会造成队首元素未到期,当队列中有到期元素却获取不到的情况。因此,队列元素的compareTo方法实现需要注意。

    public E take() throws InterruptedException {
          final ReentrantLock lock = this.lock;
          lock.lockInterruptibly();
          try {
              for (;;) {
                  E first = q.peek();
                  if (first == null) //没有元素,让出线程,等待java.lang.Thread.State#WAITING
                      available.await();
                  else {
                      long delay = first.getDelay(NANOSECONDS);
                      if (delay <= 0) // 已到期,元素出队
                          return q.poll();
                      first = null; // don't retain ref while waiting
                      if (leader != null) 
                          available.await();// 其它线程在leader线程TIMED_WAITING期间,会进入等待状态,这样可以只有一个线程去等待到时唤醒,避免大量唤醒操作
                     else { 
                          Thread thisThread = Thread.currentThread(); 
                           leader = thisThread; 
                           try { 
                           available.awaitNanos(delay);// 等待剩余时间后,再尝试获取元素,他在等待期间,由于leader是当前线程,所以其它线程会等待。 
                        } finally { 
                         if (leader == thisThread) leader = null; 
                        } 
                  } 
             }
         } 
      } finally { 
        if (leader == null && q.peek() != null) 
                 available.signal();
               lock.unlock(); 
      } 
    }
    

    使用场景:

    DQueue非常有用的。我们利用DQueue的延时特性,可以讲DQueue应用于以下场景:

    1:缓存的设计。可以利用Dqueue保存缓存元素的有效期。使用一个线程循环的从队列中获取数据。一旦获取到数据,就说明缓存有效期到了。

    2:定时任务调度。可以使用Dqueue保存需要执行的任务和任务执行的时间,一旦从DQueue中获取到了任务,就开始执行任务了。比如TimerQueue就是使用了DelayQueue来实现的。

    相关文章

      网友评论

          本文标题:DelayQueue

          本文链接:https://www.haomeiwen.com/subject/zxmmyrtx.html