美文网首页
【BlockingQueue】ArrayBlockingQueu

【BlockingQueue】ArrayBlockingQueu

作者: 有章 | 来源:发表于2018-08-12 11:06 被阅读0次

ArrayBlockingQueue
基于数组实现的有界队列,put()和take()方法为阻塞方法,内部使用ReentryLock方法实现
常用方法:
add():
内部调用了offer()方法,如果队列满,则Queue full异常
offer():
如果队列满则返回false,不继续添加
put():
内部用Condition实现,如果队列满,则notFullCondition.await()等待唤醒
take():
队列中为空则阻塞,删除队列头部元素并返回
poll():
非阻塞,队列为空则返回null,否则删除头部队列并返回元素
peek():
返回队列头部元素,并不删除


LinkedBlockingQueue
1.内部有两把锁,putLock+takeLock,提高的吞吐率
基于链表实现,默认capacity=Integer.MAX_VAlUE;
常用方法:
offer():
1.如果队列size>=capacity,则直接返回false
2.队列不满则直接将对象封装成node入队,然后再判断队列是否满,不满的唤醒添加线程,notFull.singal()
3.当c(=count. getAndIncrement())==0时,说明有消费线程等待,需要唤醒notEmtpy.singal(),需要对takeLock进行加锁

public boolean offer(E e) {
     //添加元素为null直接抛出异常
     if (e == null) throw new NullPointerException();
      //获取队列的个数
      final AtomicInteger count = this.count;
      //判断队列是否已满
      if (count.get() == capacity)
          return false;
      int c = -1;
      //构建节点
      Node<E> node = new Node<E>(e);
      final ReentrantLock putLock = this.putLock;
      putLock.lock();
      try {
          //再次判断队列是否已满,考虑并发情况
          if (count.get() < capacity) {
              enqueue(node);//添加元素
              c = count.getAndIncrement();//拿到当前未添加新元素时的队列长度
              //如果容量还没满
              if (c + 1 < capacity)
                  notFull.signal();//唤醒下一个添加线程,执行添加操作
          }
      } finally {
          putLock.unlock();
      }
      // 由于存在添加锁和消费锁,而消费锁和添加锁都会持续唤醒等到线程,因此count肯定会变化。
      //这里的if条件表示如果队列中还有1条数据
      if (c == 0) 
        signalNotEmpty();//如果还存在数据那么就唤醒消费锁
    return c >= 0; // 添加成功返回true,否则返回false
  }

//入队操作
private void enqueue(Node<E> node) {
     //队列尾节点指向新的node节点
     last = last.next = node;
}

//signalNotEmpty方法
private void signalNotEmpty() {
      final ReentrantLock takeLock = this.takeLock;
      takeLock.lock();
          //唤醒获取并删除元素的线程
          notEmpty.signal();
      } finally {
          takeLock.unlock();
      }
  }

add():
add方法内部调用了offer()方法,如果队列满,则报"queue full"异常
put():
1.获取putLock锁,如果队列已满,则等待,否则入队
2.再次判断队列大小,如果没满,则唤醒添加线程notFull.singal()
3.判断c(=count.getAndIncrement())是否==0,如果是,表明可能有消费线程等待,需要唤醒notEmpty.singal(),需要获取takeLock锁

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

取数据方法:
poll():
1.如果没有数据,则返回null,如果有则返回数据
2.如果队列的大小c大于1,则唤醒notEmpty消费线程
3.如果c==capacity,表示notFull上等待有添加线程,需要唤醒,这点与前面分析if(c==0)是一样的道理。因为只有可能队列满了,notFull条件对象上才可能存在等待的添加线程
take()方法:
1.take是可阻塞、可中断的移除方法,如果队列为空,则notEmpty.await()
2.如果队列不为空(c>1),则唤醒notEmpty.singal(),唤醒其他等待的消费线程
3.if(c==capacit)表明可能有添加线程等待在notFull上,需要唤醒notFull.singal()【需要对putLock加锁】
remove()方法:
1.需要对putLock和takeLock同时加锁

public E take() throws InterruptedException {
        E x;
        int c = -1;
        //获取当前队列大小
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//可中断
        try {
            //如果队列没有数据,挂机当前线程到条件对象的等待队列中
            while (count.get() == 0) {
                notEmpty.await();
            }
            //如果存在数据直接删除并返回该数据
            x = dequeue();
            c = count.getAndDecrement();//队列大小减1
            if (c > 1)
                notEmpty.signal();//还有数据就唤醒后续的消费线程
        } finally {
            takeLock.unlock();
        }
        //满足条件,唤醒条件对象上等待队列中的添加线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
public E poll() {
         //获取当前队列的大小
        final AtomicInteger count = this.count;
        if (count.get() == 0)//如果没有元素直接返回null
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //判断队列是否有数据
            if (count.get() > 0) {
                //如果有,直接删除并获取该元素值
                x = dequeue();
                //当前队列大小减一
                c = count.getAndDecrement();
                //如果队列未空,继续唤醒等待在条件对象notEmpty上的消费线程
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        //判断c是否等于capacity,这是因为如果满说明NotFull条件对象上
        //可能存在等待的添加线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
  private E dequeue() {
        Node<E> h = head;//获取头结点
        Node<E> first = h.next; 获取头结的下一个节点(要删除的节点)
        h.next = h; // help GC//自己next指向自己,即被删除
        head = first;//更新头结点
        E x = first.item;//获取删除节点的值
        first.item = null;//清空数据,因为first变成头结点是不能带数据的,这样也就删除队列的带数据的第一个节点
        return x;
    }

LinkedBlockingQueue和ArrayBlockingQueue迥异
1.array和linked两种队列大小不一样,array在构建时必须指定队列大小,而linked可以指定队列大小,如果没有指定,默认为integer.max_value
2.代码实现不同,array内部使用了一把ReentryLock,linked采用锁分离,使用了两把锁,putLock和takeLock,在高并发场景下,可以并行的对队列进行操作,提高了吞吐率
3.linked当添加线程快于消费线程时,会造成内存溢出等问题
4.由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。

【参考博客】
https://blog.csdn.net/javazejian/article/details/77410889?locationNum=1&fps=1

相关文章

网友评论

      本文标题:【BlockingQueue】ArrayBlockingQueu

      本文链接:https://www.haomeiwen.com/subject/sycsbftx.html