美文网首页
JAVA 多线程与高并发学习笔记(十五)——JUC容器

JAVA 多线程与高并发学习笔记(十五)——JUC容器

作者: 简单一点点 | 来源:发表于2022-10-12 18:42 被阅读0次

Java 中的容器类主要有 List、Set、Queue和Map,但他们的基础实现比如 ArrayList、HashMap 是线程不安全的。Java内置锁提供了一套线程安全的同步容器类虽然同步容器类解决了线程安全问题,不过性能不高。JUC提供了一套高并发容器类。

线程安全的同步容器类

Java 同步容器类通过 Synchronized(内置锁)来实现同步的容器,比如Vector、HashTable等。另外,Java还可以通过包装方法将一个普通的基础容器包装成一个线程安全的同步容器。

下面是通过 synchronizedSortedSet 静态方法包装出一个同步容器的具体实现:

public class CollectionDemo {

    public static void main(String[] args) throws InterruptedException {
        // 创建一个基础的有序集合
        SortedSet<String> elementSet = new TreeSet<>();

        // 增加元素
        elementSet.add("element 1");
        elementSet.add("element 2");

        // 将 elementSet 包装成一个同步容器
        SortedSet sorSet = Collections.synchronizedSortedSet(elementSet);
        // 输出容器中的元素
        System.out.println("SortedSet is : " + sorSet);
        CountDownLatch latch = new CountDownLatch(5);

        for(int i = 0; i < 5; i++) {
            int finalI = i;
            ExecutorService executor = Executors.newCachedThreadPool();
            executor.submit(() -> {
                sorSet.add("element " + (3 + finalI));
                System.out.println("add element" + (3 + finalI));
                latch.countDown();
            });
        }
        latch.await();
        System.out.println("SortedSet is : " + sorSet);
    }
}

运行程序,输出结果如下:

synchronizedSortedSet.png

另外,还包含 synchronizedList、synchronizedMap 等方法。

同步容器使用了关键字 synchronized,它如前面所介绍,在线程没有发生争用的场景下处于偏向锁的状态,其性能是非常高的。但是一旦发生了线程争用,synchronized 会由偏向锁膨胀成重量级锁,在抢占和释放时发生 CPU 内核态与用户态切换,所以削弱了并发性,降低了吞吐量,而且会严重影响性能。

为了解决同步容器的性能问题,有了JUC高并发容器。

JUC高并发容器

JUC 高并发容器是基于非阻塞算法(或无锁编程算法)实现的容器类,无锁编程算法主要通过CAS + volatile 组合实现,通过CAS保障操作的原子性,通过 volatile 保障变量内存内的可见性,它主要有以下优点:

  1. 开销比较小,不需要在内核态和用户态之间切换进程。
  2. 读写不互斥,只有写操作需要使用基于CAS机制的乐观锁,读操作之间可以不用互斥。

List

JUC 包中的高并发 List 主要有 CopyOnWriteArrayList,对应的基础容器为 ArrayList。在读多写少的场景下,其性能远远高于 ArrayList 的同步包装器。

Set

JUC 包中的 Set 主要有 CopyOnWriteArraySet 和 CocurrentSkipListSet。

  • CopyOnWriteArraySet 对应的基础容器为 HashSet,其内部组合了一个 CopyOnWriteArrayList 对象,它的核心操作是基于 CopyOnWriteArrayList 实现的。

  • CocurrentSkipListSet是线程安全的有序集合,对应的基础容器为 TreeSet。它是通过 CocurrentSkipListMap 实现的。

Map

JUC 包中 Map 主要有 ConcurrentHashMap 和 ConcurrentSkipListMap。

  • ConcurrentHashMap 对应的基础容器为 HashMap。JDK6 中的 ConcurrentHashMap 采用了一种更加细粒度的“分段锁”加锁机制,JDK8采用CAS无锁算法。

  • ConcurrentSkipListMap 对应的基础容器为 TreeMap。其内部的 SkipList(跳表)结构是一种可以代替平衡树的数据结构,默认是按照Key值升序的。

Queue

JUC包中的Queue的实现类包括三类,单向队列、双向队列和阻塞队列。

  • ConcurrentLinkedQueue 是基于列表实现的单向队列,按照FIFO(先进先出)原则对元素进行排序,新元素从队列尾部插入,而获取队列元素则需要性队列头部获取。

  • ConcurrentLinkedDeque 是基于链表的双向队列,但是该队列不允许 null 元素,作为双向队列,ConcurrentLinkedDeque 可以作为栈使用,并且高效的支持并发环境。

除了单向队列和双向队列,JUC拓展了队列,增加了可阻塞的插入和获取等操作,提供了一组阻塞队列,具体如下:

  • ArrayBlockingQueue:基于数组实现的可阻塞的FIFO队列。
  • LinkedBlockingQueue:基于链表实现的可阻塞的FIFO队列。
  • PriorityBlockingQueue:按优先级排序的队列。
  • DelayQueue:按照元素的delay时间进行排除的队列。
  • SynchronousQueue:无缓冲的等待队列。

CopyOnWriteArrayList

在很多应用场景中,读操作会远远大于写操作。由于读操作不修改数据,我们可以允许多个线程同时访问。

写时复制(Copy On Write,COW)思想是计算程序设计领域的一种优化策略,具体思想是,如果有多个访问器(Accessor)访问一个资源,他们会共同获取相同的指针指向相同的资源,只要有一个修改器(Mutator)需要修改该资源,系统就会复制一个专用副本(Private Copy)给该修改器,而其它访问器所见到的最初资源仍然保持不变,修改的过程对其他访问器都是透明的(Transparently)。

CopyOnWriteArrayList 原理

CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

CopyOnWriteArrayList 的原理如下图所示:

copyonwritearraylist.jpeg

CopyOnWriteArrayList 是一个满足 CopyOnWrite 思想并使用 Array 数组存储数据的线程安全 List,CopyOnWriteArrayList 的核心成员如下:

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    private static final long serialVersionUID = 8673264195747942595L;

    /** 对所有的修改器方法进行保护,访问器方法并不需要保护*/
    final transient ReentrantLock lock = new ReentrantLock();

    /** 内部对象数组 */
    private transient volatile Object[] array;

    /**
     * 获取内部对象数组
     */
    final Object[] getArray() {
        return array;
    }

    /**
     * 设置内部对象数组
     */
    final void setArray(Object[] a) {
        array = a;
    }

...
}

CopyOnWriteArrayList 读取操作

访问器的读取操作并没有同步控制和锁操作,理由是内部数组 array 不会发生修改,只会被另一个 array 替换,因此可以保证数据安全。

 private E get(Object[] a, int index) {
        return (E) a[index];
    }

    /**
     * {@inheritDoc}
     *
     * @throws IndexOutOfBoundsException {@inheritDoc}
     */
    public E get(int index) {
        return get(getArray(), index);
    }

CopyOnWriteArrayList 写入操作

CopyOnWriteArrayList 的写入操作 add 方法在执行时加了独占锁以确保只能有一个线程进行写入操作,避免多线程写的时候会复制出多个副本。

public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加锁
    try {
        Object[] elements = getArray();
        int len = elements.length;

        // 复制新数组
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock(); // 释放锁
    }
}

可以看到其中重新复制一份数组,再往新的数组添加元素,待添加完了,再将新的 array 引用指向新的数组。

CopyOnWriteArrayList 的迭代器实现

CopyOnWriteArrayList 有自己的迭代器,该迭代器不会检查修改状态,也无需加您查状态,因为 array 数组是只读的。

static final class COWIterator<E> implements ListIterator<E> {
    /** 对象数组的快照 */
    private final Object[] snapshot;
    /** Index of element to be returned by subsequent call to next.  */
    private int cursor;

    private COWIterator(Object[] elements, int initialCursor) {
        cursor = initialCursor;
        snapshot = elements;
    }

    public boolean hasNext() {
        return cursor < snapshot.length;
    }

    public boolean hasPrevious() {
        return cursor > 0;
    }

    @SuppressWarnings("unchecked")
    public E next() {
        if (! hasNext())
            throw new NoSuchElementException();
        return (E) snapshot[cursor++];
    }

    @SuppressWarnings("unchecked")
    public E previous() {
        if (! hasPrevious())
            throw new NoSuchElementException();
        return (E) snapshot[--cursor];
    }
    ...
}

迭代器的快照成员会在构造迭代器的时候使用 CopyOnWriteArrayList 的 array 成员去初始化。

public ListIterator<E> listIterator() {
    return new COWIterator<E>(getArray(), 0);
}

CopyOnWriteArrayList 和 ReentrantReadWriteLock 读写锁的思想非常类似,即读读共享、写写互斥、读写互斥、写读互斥。但前者的读取完全不用加锁,写入也不会阻塞读取操作,提升了性能。

BlockingQueue

在多线程环境中,通过 BlockingQueue 可以很容易实现多线程之间的数据共享和通信。

BlockingQueue 特点

阻塞队列和普通队列最大不同是阻塞队列提供了阻塞式的添加和删除方法。

  1. 阻塞添加。当阻塞队列元素已满时,队列会阻塞添加元素的线程,直到对了元素不满时,才重新唤醒线程执行元素添加操作。

  2. 阻塞删除。阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空时,才重新唤醒删除线程,再执行删除操作。

阻塞队列的常用方法

public interface BlockingQueue<E> extends Queue<E> {
    // 将指定的元素添加到此队列尾部
    // 在成功时返回true,如果队列已满,就抛出 IllegalStateException
    boolean add(E e);

    // 非阻塞添加,将指定的元素添加到此队列的尾部
    // 如果该队列已满,就直接返回
    boolean offer(E e);

    // 限时阻塞式添加,将指定的元素添加到此队列的尾部
    // 如果该队列已满,那么在到达指定的等待时间之前,添加线程会则色,等待可用的空间。该方法可中断
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    // 阻塞式添加:将指定的元素添加到此队列的尾部,如果队列已满,就一直等待
    void put(E e) throws InterruptedException;

    // 阻塞式删除:获取并移除此队列的头部,如果没有元素就等待
    // 直到有元素,将唤醒等待线程执行该操作
    E take() throws InterruptedException;

    // 非阻塞式删除:获取并移除此队列的头部,如果没有元素就直接返回null
    E poll() throws InterruptedException;

    // 限时阻塞式删除:获取并移除此队列的头部,在指定的等待时间前一直等待获取元素,超过时间,方法结束
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    // 获取但不移除此队列的头元素,没有则抛出异常NoSuchElementException
    E element();

    // 获取但不溢出此队列的头元素,如果此队列为空,就返回null
    E peek();

    // 从此队列中移除指定元素,返回删除是否成功
    boolean remove(Object o);
}

方法总结:

抛出异常 特殊值 阻塞 限时阻塞
添加 add(e) offer(e) put(e) offer(e, time, unit)
删除 remove() poll() take() poll(time, unit)
获取 element() peek()

特征说明:

  • 抛出异常:如果操作无法立即执行,就抛出一个异常。
  • 特殊值,如果操作无法立即执行,就返回一个特殊值(一般是true、false)。
  • 阻塞,如果操作无法立即执行,就会发生阻塞,直到能够执行。
  • 限时阻塞,如果操作无法立即执行,就会发生阻塞,直到能够执行,但等待时间不会超过设置的上限值。

常见的 BlockingQueue

ArrayBlockingQueue

ArrayBlockingQueue 是一个常用的阻塞队列,基于数组实现。它的内部还保存着两个整形变量,标识队列的头部和尾部在数组中的位置。

ArrayBlockingQueue 的添加和删除操作共用同一个锁对象,这意味着添加和删除操作无法并行运行。

由于 ArrayBlockingQueue 在添加或删除元素时不会产生或销毁任何额外的Node实例,所以相比 LinkedBlockingQueue 更加常用。

LinkedBlockedQueue

LinkedBlockedQueue 是一个基于链表的阻塞队列,它对于添加和删除元素分别采用了独立的锁来控制。

需要注意的是,在新建 LinkedBlockedQueue 对象是,如果没有指定容量,则默认容量为 Intger.MAX_VALUE。

DelayQueue

DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取该元素,它没有大小限制。

DelayQueue 使用场景较少,常见的例子是使用 DelayQueue 来管理一个超时未响应的连接队列。

PriorityBlockingQueue

基于优先级的阻塞队列。

SynchronousQueue

无缓冲的等待队列。

ArrayBlockingQueue 介绍

ArrayBlockingQueue 可以用作公平队列和非公平队列。

  1. 对于公平队列,被阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞的线程先访问队列。

  2. 对于非公平队列,当队列可用时,阻塞的线程将进入争夺访问资源的竞争中,也就是说谁先抢到谁就执行,没有固定的先后顺序。

ArrayBlockingQueue 构造器

创建公平和非公平阻塞队列的示例代码如下:

// 默认非公平阻塞队列
ArrayBlockingQueue queue = new ArrayBlockingQueue(10);

// 公平阻塞队列
ArrayBlockingQueue queue = new ArrayBlockingQueue(10, true);

两个构造器的源码:

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    // 根据fair参数构造公平锁/非公平锁
    lock = new ReentrantLock(fair);
    // 有元素加入,队列为非空
    notEmpty = lock.newCondition();
    // 有元素被取出,队列为未满
    notFull =  lock.newCondition();
}

ArrayBlockingQueue 内部的阻塞队列是通过重入锁 ReentrantLock 和 Condition 条件队列实现的。

ArrayBlockingQueue 内部成员变量

ArrayBlockingQueue 成员如下:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {


    /** 存储数据的数组 */
    final Object[] items;

    /** 获取、删除元素的索引,主要用于take、pool、peek、remove方法 */
    int takeIndex;

    /** 添加元素的索引,主要用于put、offer、add方法 */
    int putIndex;

    /** 队列元素的个数 */
    int count;

    /** 控制并发访问的显式锁 */
    final ReentrantLock lock;

    /** notEmpty条件对象,用于通知消费线程可执行删除操作 */
    private final Condition notEmpty;

    /** notNull条件对象,用于通知生产线程可执行添加操作 */
    private final Condition notFull;

    /**
     * 迭代器
     */
    transient Itrs itrs = null;
    ...
        
}

其中数组对象 items 存储说所有数据, ReentrantLock类型成员 lock控制线程并发。notEmpty成员来存放或唤醒被阻塞的消费线程,当数组对象items有元素时,告诉take线程可以执行删除操作。notFull成员来存放或唤醒被阻塞的生产线程,当队列未满时,高速线程可以执行添加元素操作。

非阻塞式添加元素:add、offer方法

非阻塞式添加元素的方法会立即返回,所以其执行线程不会被阻塞。

add方法

add方法调用了父类的add方法,实现如下。

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

可以看到add方法是调用offer方法实现的。

offer方法

offer方法流程如下:

  1. 如果数组满了,就直接释放锁,然后返回false。
  2. 如果数组没满,就将元素入队(加入数组),然后返回true。
public boolean offer(E e) {
    checkNotNull(e); // 检查元素是否为null
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加锁
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

我们注意到offer方法调用了enqueue方法。

enqueue方法
// 入队操作
 private void enqueue(E x) {
    // 获取当前数组
    final Object[] items = this.items;
    // 赋值
    items[putIndex] = x;
    // 索引自增,如果已经是最后一个位置,重新设置putIndex = 0
    if (++putIndex == items.length)
        putIndex = 0;
    // 队列中元素数量加1
    count++;
    // 唤醒调用take方法的线程,执行元素获取操作
    notEmpty.signal();
}

阻塞式添加元素:put方法

阻塞式添加元素时,在队列满而不能添加元素时,执行添加操作的线程会被阻塞。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 该方法可中断
    try {
        // 当队列元素个数与数组长度相等时,无法添加元素
        while (count == items.length)
            // 将当前调用线程挂起,添加到notFull队列条件中,等待被唤醒
            notFull.await();
        enqueue(e); // 如果队列没有满,就直接添加
    } finally {
        lock.unlock();
    }
}

put方法的添加操作流程:

  1. 获取lock锁。
  2. 如果队列已满,就被阻塞,put线程进入notFull的等待队列中,等待被唤醒。
  3. 如果队列未满,元素通过enqueue方法入队。
  4. 释放lock锁。

非阻塞式删除元素:poll方法

在队列空而不能删除元素时,非阻塞式删除元素的方法会立即返回,所以其执行线程不会被阻塞。

poll方法
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

poll 方法删除此队列的头元素,若队列为空,则立即返回null。poll方法调用了dequeue元素出队方法。

dequeue方法
// 删除队列头元素
private E dequeue() {
    // 拿到当前数组的数据
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // 获取要删除的对象
    E x = (E) items[takeIndex];
    // 清空位置,将数组中的takeIndex索引位置设置为null
    items[takeIndex] = null;
    // takeIndex索引加1并判断是否与数组长度相等
    // 如果相等就说明已到尽头,恢复为0
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--; // 元素减1
    if (itrs != null)
        itrs.elementDequeued(); // 同时更新迭代器中的元素数据
    // 删除了元素说明队列有空位,唤醒notFUll条件等待队列中的put线程,执行添加操作
    notFull.signal();
    return x;
}

阻塞式删除元素:take方法

take方法是一个可阻塞、可中断的删除方法,主要做两件事:

  1. 如果队列没有数据,就将线程加入notEmpty等待队列并阻塞线程,一直到有元素插入数据并通过notEmpty发送一个消息,notEmpty将从等待队列唤醒一个消费节点,同时启动消费线程。

  2. 如果队列有数据,就通过dequeue方法执行元素的删除操作。

// 从队列头部移除元素,队列没有元素就阻塞,可中断
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 中断
    try {
        // 如果队列没有元素
        while (count == 0)
            // 执行阻塞操作
            notEmpty.await();
        return dequeue(); // 如果队列有元素就执行删除操作
    } finally {
        lock.unlock();
    }
}

peek方法返回队列头元素

peek方法从takeIndex直接可以获取最早被添加的元素,不存在就返回null。

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

ConcurrentHashMap

ConcurrentHashMap 是一个常用的高并发容器类,也是一种线程安全的哈希表。

HashMap 和 HashTable 的问题

基础容器 HashMap 是线程不安全的,在多线程环境下,使用HashMap进行put操作时,可能会引起死循环,导致CPU利用率飙升。

HashTable是JDK提供的线程安全的Map,它在HashMap基础上有两点改进:

  • HashTable不允许key和value为null。
  • HashTable使用synchronized来保证线程安全,所有相关需要同步执行的方法都要加上synchronized关键字,对Hash表进行行锁定。

Hashtable线程安全代价非常大,相当于给整个哈希表加了一把锁,效率很低。

JDK 1.7中的ConcurrentHashMap

在JDK1.7中ConcurrentHashMap采用了数组+Segment+分段锁的方式实现。分段锁并不是锁,而是一种锁的设计,用来提升并发线程性能的重要手段,好玩 LongAddr 一样,所以热点分散型的削峰手段。

ConcurrentHashMap中的分段锁称为Segment,它即类似于HashMap的结构,即内部拥有一个Entry数组,数组中的每个元素又是一个链表,同时又是一个ReentrantLock(Segment继承了ReentrantLock)。

ConcurrentHashMap使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。如下图是ConcurrentHashMap的内部结构图:

concurrentHashMap_jdk17.png

JDK1.8中的ConcurrentHashMap

JDK1.8中ConcurrentHashMap参考了JDK8 HashMap的实现,采用了数组+链表+红黑树的实现方式来设计,内部大量采用CAS操作。并发控制使synchronized 和 CAS 来操作。

相比JDK1.7默认情况下将一个table分裂成16个Segement,JDK1.8直接将并发粒度细化到每一个桶,不再需要Segemnt。

JDK1.8中的ConcurrentHashMap内部结构

concurrentHashMap_jdk18.png

其中主要包含如下结构:

  • Node,这是currentHashMap的核心内部类,它包装了“Key-Value对”。
  • TreeBin, Node子类,当数据链表(链式桶)长度大于8时,会转换为TreeBin(树状桶),TreeBin作为根节点,可以认为时红黑树对象。在ConcurrentHashMap的table“数组”中,存放的就是TreeBin对象,而不是TreeNode对象。
  • TreeNode,树状桶的节点类。

JDK1.8中的ConcurrentHashMap主要成员

JDK1.8中的ConcurrentHashMap通过一个Node<K,V>[]数组table来保存添加到哈希表中的桶,而在每一个桶位置是通过链表和红黑树的形式来保存的。数组table是懒加载的,只有第一次添加元素的时候才会初始化。

ConcurrentHashMap主要成员大致如下:

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {

    private static final int MAXIMUM_CAPACITY = 1 << 30;
    private static final int DEFAULT_CAPACITY = 16;
   
    static final int TREEIFY_THRESHOLD = 8;
    static final int UNTREEIFY_THRESHOLD = 6;
    static final int MIN_TREEIFY_CAPACITY = 64;

    =
     /*
     * Encodings for Node hash fields. See above for explanation.
     */
    static final int MOVED     = -1; // 表示正在转移
    static final int TREEBIN   = -2; // 表示已经转换成树
    static final int RESERVED  = -3; // hash for transient reservations
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
    // 数组,用来保存元素
    transient volatile Node<K,V>[] table;
    // 转移时用的数组
    private transient volatile Node<K,V>[] table;
    // 用来控制表初始化和扩容的控制属性
    private transient volatile int sizeCtl;

    ...
}

重要属性如下:

  • table,用于保存添加到哈希表中的桶。
  • DEFAULT_CAPACITY,table的默认长度,默认初始长度是16,即在第一次添加元素时,会将table初始化为16个元素的数组。
  • MIN_TREEIFY_CAPACITY,链式桶转换为红黑树的阈值,当链表长度大于该值是,将链表转换为红黑树。
  • UNTREEIFY_THRESHOLD,红黑树桶还原回链式桶的阈值,当红黑树内节点数量小于6时,将红黑树转换成链表。
  • MIN_TREEIFY_CAPACITY,链式桶转换为红黑树桶还有一个要求,table的容量达到最小树形化容量的阈值,只有当哈希表中的table容量大于该值时,才允许将链表转换成红黑树的操作。
  • sizeCtl,用来控制table的初始化和扩容操作的过程,其值大致如下:
    • -1代表table正在初始化,其它线程应该交出CPU时间片。
    • -N表示有N-1个线程正在进行扩容操作,严格来说,当其为负数时,只用到其低16位,如果其低16位数值为M,此时有M-1个线程进行扩容。
    • 大于0分两种情况:如果table未初始化,sizeCtl表示table需要初始化的大小;如果table初始化完成,sizeCtl表示table的容量,默认是table大小的0.75倍。

涉及修改sizeCtl的方法有5个:

  • initTable,初始化哈希表。
  • addCount,增加容量。
  • tryPresize,扩容方法之一。
  • transfer,数据转移到nextTable。
  • helpTransfer,并发添加元素时,如果正在扩容,其它线程会帮助扩容,也就是多线程扩容。

JDK1.8中的ConcurrentHashMap核心源码

put操作源码如下:

public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    // 自旋:并发情况下,也可以保障安全添加成功
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            // 第一次添加,先初始化node数组
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 计算出table[i]无节点,创建节点
            // 使用Unsafe.compareAndSwapObject原子操作table[i]的位置
            // 如果为null,就添加新建的node节点,跳出循环
            // 反之,再循环进入执行添加操作
            if (casTabAt(tab, i, null,
                            new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            // 如果当前处于转移状态,返回新的tab内部表,然后进入循环执行添加操作
            tab = helpTransfer(tab, f);
        else {
            // 在链表或红黑树中追加节点
            V oldVal = null;
            // 使用synchronized对f对象加锁
            // 在争用不激烈的场景中,synchronized和ReetrantLock性能不相上下
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    // 在链表上追加节点
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                    (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                            value, null);
                                break;
                            }
                        }
                    }
                    // 在红黑树上追加节点
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                        value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // 节点数大于临界值,转换成红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

从put源码可以发现,JDK1.8版本在使用CAS自旋完成桶的设置时,使用synchronized内置锁保证桶内并发操作的线程安全。尽管对同一个Map操作的线程争用会非常激烈,但是在同一个桶内的线程争用通常不会很激烈,所以使用CAS自旋、synchronized偏向锁不降低性能。

不使用ReentrantLock是因为为每个桶都创建一个ReentrantLock实例会带来大量的内存消耗。

get方法源码如下:

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

其中并不涉及加锁操作。

相关文章

网友评论

      本文标题:JAVA 多线程与高并发学习笔记(十五)——JUC容器

      本文链接:https://www.haomeiwen.com/subject/defxzrtx.html