美文网首页
JDK并发工具类源码--LinkedBlockingQueue

JDK并发工具类源码--LinkedBlockingQueue

作者: shoulda | 来源:发表于2018-06-27 21:28 被阅读0次

    1.简介

    LinkedBlockingQueue常用于生产者/消费者模式中,他可以作为生产者和消费者的桥梁。LinkedBlockingQueue,ConcurrentLinkedQueue,PriorityBlockingQueue功能相似,都是Queue的一种。其中LinkedBlockingQueue,PriorityBlockingQueue是阻塞的,而ConcurrentLinkedQueue是非阻塞的。LinkedBlockingQueue和PriorityBlockingQueue是通过加锁实现线程安全的,而ConcurrentLinkedQueue使用CAS实现实现线程安全;PriorityBlockingQueue还支持优先级。

    2.原理

    LinkedBlockingQueue内部实现相对简单,直接使用一个链表存储数据,通过加锁实现线程安全。其内部类Node代表节点。

    /**
     * Linked list node class
     */
    static class Node<E> {
        /** The item, volatile to ensure barrier separating write and read */
        volatile E item;
        Node<E> next;
        Node(E x) { item = x; }
    }
    

    3主要方法

    3.1 入队(offer())

    /**
     * 入队,无阻塞,队列未满则直接入队,否则直接返回false
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;// 保存当前队列的长度
        // 这里因为count是Atomic的,所以有类似volatile的内存可见性效果
        // 即对count的修改能够立即被其他线程可见,所以此处不加锁的情况下读取count值是会读取到最新值的
        // 然后根据此值进行前置判断,避免不必要的加锁操作
        if (count.get() == capacity)// 队列已满直接返回false
            return false;
        int c = -1;
        final ReentrantLock putLock = this.putLock;// 获取putLock,加锁
        putLock.lock();
        try {
            if (count.get() < capacity) {// 队列未满则插入
                insert(e);
                c = count.getAndIncrement();// 更新count值
                if (c + 1 < capacity)// 未满则唤醒等待在notFull上的线程
                    // 此处有点怪异,入队唤醒notFull~
                    // 此处唤醒notFull是考虑有可能如果多个线程同时出队,由于出队唤醒notFull时也需要对putLock进行加锁
                    // 所以有可能一个线程出队,唤醒notFull,但是被另一个出队线程抢到了锁,所以入队线程依旧在等待
                    // 当另一个线程也唤醒了notFull,释放了putLock后,只能唤醒一个入队线程,所以其他线程依旧在等待
                    // 所以此处需要再次唤醒notFull
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        // c==0表示队列在插入之前是空的,所以需要唤醒等待在notEmpty上的线程
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    
    /**
     *唤醒notEmpty,需对takeLock进行加锁,因为notEmpty与takeLock相关
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }
    

    3.2出队(poll())

    /**
     * 出队,无阻塞,队列为空则直接返回null
     */
    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)// 队列为空,直接返回
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {// 不为空,获取一个元素
                x = extract();
                c = count.getAndDecrement();
                if (c > 1)// 同offer(),此处需唤醒notEmpty
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();// 同offer(),此处需唤醒notFull
        return x;
    }
    
    /**
     *出队,将head指向head.next
     * @return
     */
    private E extract() {
        Node<E> first = head.next;
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
    
    /**
     * 唤醒notFull,需对putLock进行加锁,因为notFull与putLock相关
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
    

    3.3 删除(remove())

    /**
     * 删除指定元素
     */
    public boolean remove(Object o) {
        if (o == null) return false;
        boolean removed = false;
        fullyLock();// 同时对takeLock和pullLock加锁,避免任何的入队和出队操作
        try {
            Node<E> trail = head;
            Node<E> p = head.next;
            while (p != null) {// 从队列的head开始循环查找与o相同的元素
                if (o.equals(p.item)) {// 找到相同的元素则设置remove为true
                    removed = true;
                    break;
                }
                trail = p;// 继续循环
                p = p.next;
            }
            if (removed) {
                // remove==true,则表示查找到待删除元素,即p,将trail的next指向p的next,即将p从队列移除及完成删除
                p.item = null;
                trail.next = p.next;
                if (last == p)
                    last = trail;
                if (count.getAndDecrement() == capacity)
                    notFull.signalAll();
            }
        } finally {
            fullyUnlock();
        }
        return removed;
    }
    

    相关文章

      网友评论

          本文标题:JDK并发工具类源码--LinkedBlockingQueue

          本文链接:https://www.haomeiwen.com/subject/ebpiyftx.html