美文网首页
LinkedBlockingQueue分析

LinkedBlockingQueue分析

作者: gczxbb | 来源:发表于2018-05-24 18:37 被阅读1次

    LinkedBlockingQueue内部结构采用链式存储,元素节点Node,封装对象E泛型,Node#next指向下一个节点。

    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }
    
    存储结构图。 阻塞队列LinkedBlockingQueue链式存储结构.jpg

    LinkedBlockingQueue#构造方法。

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
    

    参数说明
    capacity:容量,默认是Integer.MAX_VALUE。
    count:当前队列元素数量,AtomicInteger线程同步,可根据count与capactiy比较值作为是否已满的依据。
    head/last指针:初始化创建第一个Node节点作为头结点与尾节点,封装内容是null。


    LinkedBlockingQueue插入节点

    put/offer方法:向尾部插入元素,加锁putLock。区别是,若队列已满,put方法线程阻塞等待,当有空余位置时,被唤醒,继续插入,offer直接返回插入失败。

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();//以下代码段加锁
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node); // 到这里插入,说明有空余至少一个。
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();//若还有至少一个空余,通知其他等待的不满信号。
        } finally {
            putLock.unlock();//解锁
        }
        if (c == 0)
            signalNotEmpty();
    }
    
    • 插入对象不能是空。

    • 封装E对象Node节点。

    • 线程并发,一定要ReentrantLock加锁,put锁,确保其他线程此刻无法访问上锁的代码段。

    • 判断count,count是AtomicInteger类型,并发保持同步。若达到最大容量capacity,此时线程将阻塞,让出锁,notFull不满条件等待,即等待不满信号唤醒。

    • 若未达到最大容量,在队列尾部enqueue插入Node节点。

    • count自增,返回的c仍是count自增前的值,若+1还小于capacity,说明此次插入后还有空余空间,发出未满通知信号。
      这里为什么会发出一个notFull未满信号呢?
      线程A1执行put方法时,获取putLock锁,其他线程无法获取该锁,当A1线程发现队列已满,在notFull上等待,出让putLock锁,线程A2获取该锁,同样在notFull上等待,此刻,存在多个阻塞的插入线程A1.2.3...都在notFull#await导致阻塞,出让putLock锁,在notFull上等待。
      直到take/poll方法获取数据,队列出现空位,signalNotFull执行notFull#signal方法,在notFull上有线程在等待,唤醒A1,signal只唤醒notFull上的一个线程。A1抢到putLock锁,执行元素插入,此刻其他线程仍休眠。enqueue插入成功,再次判断是否有多余容量,若有,notFull#signal继续唤醒在notFull上休眠的其他线程。若没有notFull上的等待线程,执行signal也没有影响,重点在于,它会继续唤醒其他notFull等待的线程。

    • ReentrantLock解锁

    • 若c==0,说明插入之前队列是空,插入后不空,一定要发出不空信号NotEmpty,让阻塞在NotEmpty上的读取线程唤醒。

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    

    offer方法比较简单,判断count未达到最大容量capactiy,插入节点。同理,根据空余空间发出未满信号。c的初始值是-1,若成功插入,则一定会赋值成一个>=0的值,因此。offer的返回的值可判断是否成功插入,offer不会阻塞。


    LinkedBlockingQueue取出节点

    take/poll方法:从队列头部加锁takeLock获取元素。若队列为空,take方法获取线程阻塞等待,有元素时被唤醒。poll返回失败。

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    

    分析逻辑与put一致。

    生产者与消费者模型,元素在头部获取,尾部插入。多线程并发操作时,采用两种锁保证线程同步,互相独立,并行操作队列元素,数量更新AtomicInteger同步。


    任重而道远

    相关文章

      网友评论

          本文标题:LinkedBlockingQueue分析

          本文链接:https://www.haomeiwen.com/subject/uqsudftx.html