美文网首页
JAVA并发梳理(四) 各种安全队列

JAVA并发梳理(四) 各种安全队列

作者: 萌妈码码 | 来源:发表于2018-09-23 09:40 被阅读0次

在 Java 多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。
Java提供的线程安全的 Queue 可以分为

  • 阻塞队列,典型例子是 LinkedBlockingQueue
    适用阻塞队列的好处:多线程操作共同的队列时不需要额外的同步,另外就是队列会自动平衡负载,即那边(生产与消费两边)处理快了就会被阻塞掉,从而减少两边的处理速度差距。
  • 非阻塞队列,典型例子是 ConcurrentLinkedQueue
    当许多线程共享访问一个公共集合时,ConcurrentLinkedQueue 是一个恰当的选择。

LinkedBlockingQueue 多用于任务队列
ConcurrentLinkedQueue 多用于消息队列

下面分别介绍下JDK中阻塞队列和非阻塞队列的各种实现。

Concurrent Queue的两种实现(非阻塞队列)
1. ConcurrentLinkedQueue

a) 基于链表节点, 无界队列。注意size需要遍历整个链表,且如果有其他修改的操作会导致size不准确;尽量使用isEmpty。
b) 通过无锁CAS的方式操作元素,实现了高并发状态下的高性能。

2. ConcurrentLinkedDeque

线程安全双端队列,实现方式基本同ConcurrentLinkedQueue。

BlockingQueue的各种实现(阻塞队列)

BlockingQueue扩展Queue的方法,使其在在队列已满的情况下添加元素时有了4种模式:
其一,抛异常;boolean add(E e);
其二,立即返回false;boolean offer(E e);
其三,等待;void put(E e) throws InterruptedException;
其四,等待一段时间。 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

JDK中对BlockingQueue提供了近十种实现,学习的时候可以从两个主要方面入手,1) 有界还是无界 2)安全性是如何保证的。

1. ArrayBlockingQueue

a) 内部维护了一个定长数组,以便缓存队列中的数据对象。有界队列。
b) 从其关键属性可以看出来,安全访问控制通过ReentrantLock和Condition配合实现,固每次入列和出列时需要获得全局的锁,因此是不能完全并行的。也有人称其为内部没有实现读写分离。

   /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
2. LinkedBlockingQueue

基于链表的阻塞队列。
a) 其内部也维持着一个数据缓冲队列〈该队列由一个链表构成);可以指定容量,也可以不指定,不指定的话,默认最大Integer.MAX_VALUE。也可以算是个无界队列吧。
b) 其内部实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行,因此能够高效的处理并发数据。

 /**
     * Linked list node class
     */
    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
3. SynchronousQueue

一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并消费。内部提供了TransferQueueTransferStack,其实是对应FIFO, FILO,模拟公平锁和非公平锁。注意,在没有需要的生产者或者消费者的时候,SynchronousQueue会通过park阻塞线程。

4. PriorityBlockingQueue

基于优先级的阻塞队列(优先级的判断通过构造函数传入的Comparator对象来决定,传入队列的对象必须实现Comparable接口),在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列。
a) 底层是HEAP实现,数组可扩展,因此是无界队列,因此put永远不会阻塞。
b) 锁使用ReentrantLock,以及一个nonEmpty Condition,只有在队列为空,take的时候会阻塞。

/**
     * Priority queue represented as a balanced binary heap: the two
     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
     * priority queue is ordered by comparator, or by the elements'
     * natural ordering, if comparator is null: For each node n in the
     * heap and each descendant d of n, n <= d.  The element with the
     * lowest value is in queue[0], assuming the queue is nonempty.
     */
    private transient Object[] queue;

    /**
     * The number of elements in the priority queue.
     */
    private transient int size;

    /**
     * The comparator, or null if priority queue uses elements'
     * natural ordering.
     */
    private transient Comparator<? super E> comparator;

    /**
     * Lock used for all public operations
     */
    private final ReentrantLock lock;

    /**
     * Condition for blocking when empty
     */
    private final Condition notEmpty;
5. 双端队列BlockingDeque及实现LinkedBlockingDeque
    /** Main lock guarding all access */
    final ReentrantLock lock = new ReentrantLock();

    /** Condition for waiting takes */
    private final Condition notEmpty = lock.newCondition();

    /** Condition for waiting puts */
    private final Condition notFull = lock.newCondition();

可以看出来由于只有一个锁,其实也是不能做到真正意义上的两头并行操作的。

LinkedBlockingDeque的基本方法
5. DelayQueue

带有延迟时间的Queue,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue中的元素必须实现Delayed接口,DelayQueue是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等等。
对这个队列的实现可以结合ScheduledThreadPoolExecutor相关来看。

终于基本把线程安全Queue相关的实现类都过了一遍了,项目中要结合应用场景选择最合适的使用。这需要建立在对各种实现都很熟悉的基础上。

引用
LinkedBlockingQueue 和 ConcurrentLinkedQueue的用法及区别
java挑战高并发(14): LinkedBlockingQueue和ConcurrentLinkedQueue的区别及用法
java线程安全之并发Queue(十三)

相关文章

网友评论

      本文标题:JAVA并发梳理(四) 各种安全队列

      本文链接:https://www.haomeiwen.com/subject/ruqfoftx.html