美文网首页
JUC学习笔记(三)—同步阻塞队列

JUC学习笔记(三)—同步阻塞队列

作者: Monica2333 | 来源:发表于2018-12-14 15:34 被阅读0次

BlockingQueue
阻塞队列接口继承自Queue接口,BlockingQueue接口提供了3个添加元素方法:

  • add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常
  • offer:添加元素到队列里,添加成功返回true,添加失败返回false
  • put:添加元素到队列里,如果容量满了会阻塞直到容量不满
    3个删除方法:
  • poll:删除队列头部元素,如果队列为空,返回null。否则返回元素。
  • remove:基于对象找到对应的元素,并删除。删除成功返回true,否则返回false
  • take:删除队列头部元素,如果队列为空,一直阻塞到队列有元素并删除
    常用的阻塞队列具体类有ArrayBlockingQueue、LinkedBlockingQueue、LinkedBlockingDeque、PriorityBlockingQueue、DelayQueue、SynchronousQueue、LinkedTransferQueue等。


ArrayBlockingQueue
由数组实现的阻塞有界队列,FIFO。支持对等待的生产者线程和使用者线程进行排序的可选公平策略。
定义

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;
    /** Main lock guarding all access */
    //一把锁控制出列和入列
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
}
//fair为true表示公平锁
 public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

LinkedBlockingQueue
由单向链表实现的无界阻塞队列(容量可选,默认为Integer.MAX_VALUE),内部入列和出列由不同的锁控制,队列大小定义为AtomicInteger 可随时可见。
定义

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /**
     * Linked list node class
     */
    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
}

LinkedBlockingDeque
由双向链表构成的可选容量队列,由一把锁控制入列和出列
定义

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    /** Doubly-linked list node class */
    static final class Node<E> {
        /**
         * The item, or null if this node has been removed.
         */
        E item;

        /**
         * One of:
         * - the real predecessor Node
         * - this Node, meaning the predecessor is tail
         * - null, meaning there is no predecessor
         */
        Node<E> prev;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head
         * - null, meaning there is no successor
         */
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }

    /**
     * Pointer to first node.
     * Invariant: (first == null && last == null) ||
     *            (first.prev == null && first.item != null)
     */
    transient Node<E> first;

    /**
     * Pointer to last node.
     * Invariant: (first == null && last == null) ||
     *            (last.next == null && last.item != null)
     */
    transient Node<E> last;

    /** Number of items in the deque */
    private transient int count;

    /** Maximum number of items in the deque */
    private final int capacity;

    /** Main lock guarding all access */
    final ReentrantLock lock = new ReentrantLock();

    /** Condition for waiting takes */
    private final Condition notEmpty = lock.newCondition();

    /** Condition for waiting puts */
    private final Condition notFull = lock.newCondition();

    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }

PriorityBlockingQueue
拥有优先级的阻塞队列,基于最小堆实现的有界队列。

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    /**
     * Default array capacity.
     */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /**
     * The maximum size of array to allocate.
     * Some VMs reserve some header words in an array.
     * Attempts to allocate larger arrays may result in
     * OutOfMemoryError: Requested array size exceeds VM limit
     */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;


    private transient Object[] queue;

    /**
     * The number of elements in the priority queue.
     */
    private transient int size;

    /**
     * The comparator, or null if priority queue uses elements'
     * natural ordering.
     */
    private transient Comparator<? super E> comparator;

    /**
     * Lock used for all public operations
     */
    private final ReentrantLock lock;

    /**
     * Condition for blocking when empty
     */
    private final Condition notEmpty;

    /**
     * Spinlock for allocation, acquired via CAS.扩容时需要
     */
    private transient volatile int allocationSpinLock;

    /**
     * A plain PriorityQueue used only for serialization,
     * to maintain compatibility with previous versions
     * of this class. Non-null only during serialization/deserialization.
     */
    private PriorityQueue<E> q;

    public PriorityBlockingQueue() {
//11
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }
}

DelayQueue
DelayQueue是一个支持延时操作的阻塞队列。列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行。也就是说只有在延迟期满时才能够从队列中去元素。
它主要运用于如下场景:
缓存系统的设计:缓存是有一定的时效性的,可以用DelayQueue保存缓存的有效期,然后利用一个线程查询DelayQueue,如果取到元素就证明该缓存已经失效了。
定时任务的调度:DelayQueue保存当天将要执行的任务和执行时间,一旦取到元素(任务),就执行该任务。
DelayQueue实现的关键主要有如下几个:

  • 可重入锁ReentrantLock
  • 用于阻塞和通知的Condition对象
  • 根据Delay时间排序的优先级队列:PriorityQueue
  • 用于优化阻塞通知的线程元素leader
    定义
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    private Thread leader = null;

    private final Condition available = lock.newCondition();

    public DelayQueue() {}

Delayed 接口

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     */
    long getDelay(TimeUnit unit);
}

SynchronousQueue
作为BlockingQueue中的一员,SynchronousQueue与其他BlockingQueue有着不同特性:

  • SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
  • 因为没有容量,所以对应 peek, contains, clear, isEmpty … 等方法其实是无效的。例如clear是不执行任何操作的,contains始终返回false,peek始终返回null。
  • SynchronousQueue分为公平和非公平,默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为true即可)。
  • 若使用 TransferQueue, 则队列中永远会存在一个 dummy node(这点后面详细阐述)。
    SynchronousQueue非常适合做交换工作,生产者的线程和消费者的线程同步以传递某些信息、事件或者任务。
    LinkedTransferQueue
    基于链表的FIFO无界阻塞队列,不会锁住整个队列。
    LinkedTransferQueue采用一种预占模式。什么意思呢:
    有就直接拿走,没有就占着这个位置直到拿到或者超时或者中断。即消费者线程到队列中取元素时,如果发现队列为空,则会生成一个null节点,然后park住等待生产者。后面如果生产者线程入队时发现有一个null元素节点,这时生产者就不会入列了,直接将元素填充到该节点上,唤醒该节点的线程,被唤醒的消费者线程拿东西走人。

相关文章

  • JUC学习笔记(三)—同步阻塞队列

    BlockingQueue阻塞队列接口继承自Queue接口,BlockingQueue接口提供了3个添加元素方法:...

  • JUC 阻塞队列

    概述 阻塞队列解决的问题:在一个容量有限的仓库里面,实现满了就挂起生产线程,空了就挂起消费线程的兼顾性能和安全的数...

  • JUC学习笔记三

    JUC学习笔记三 用于解决多线程同步问题的方式 隐式锁(synchronized) 同步代码块 同步方法 显式锁(...

  • Swift GCD 的串行队列与并行队列

    队列异步是否阻塞当前线程同步是否阻塞当前线程执行顺序串行队列否是按添加顺序并行队列否是同时执行,但会被同步阻塞 串...

  • AQS同步器学习

    AQS队列同步器学习 在学习并发的时候,我们一定会接触到 JUC 当中的工具,JUC 当中为我们准备了很多在并发中...

  • Java并发之AQS同步器学习

    AQS队列同步器学习 在学习并发的时候,我们一定会接触到 JUC 当中的工具,JUC 当中为我们准备了很多在并发中...

  • juc并发包集合整理

    JUC提供了java并发编程需要的类,主要分几个大模块1 原子类操作2 锁3 阻塞队列4 并发集合5 同步器6 线...

  • JUC--阻塞队列

    2018-10-03 原文推荐 死磕Java并发

  • JUC-阻塞队列

      阻塞队列数据结构示意图如下所示: 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞; 当阻塞队列是满时,往队...

  • JUC之阻塞队列

    阻塞队列 概念 阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,...

网友评论

      本文标题:JUC学习笔记(三)—同步阻塞队列

      本文链接:https://www.haomeiwen.com/subject/dkkshqtx.html