美文网首页
JAVA并发编程 并发容器 线程安全——Concurrent

JAVA并发编程 并发容器 线程安全——Concurrent

作者: 咕噜咕噜_f443 | 来源:发表于2020-03-29 22:04 被阅读0次

    ConcurrentHashMap

    使用 

    除了 Map 系列应该有的线程安全的 get,put 等方法外,ConcurrentHashMap 还提供了一个在并发下比较有用的方法putIfAbsent,如果传入 key 对应的 value 已经存在,就返回存在的 value,不进行替换。如果不存在,就添加 key 和 value, 返回 null。

    public class UseMap {

    public static void main(String[] args) {

    ConcurrentHashMap map =new ConcurrentHashMap<>();

    KeyVo keyVo =new KeyVo(1,"A");

    System.out.println("put不存在的值------");

    System.out.println(map.put(keyVo,"AA"));

    System.out.println(map.get(keyVo));

    System.out.println("put已存在的key-------------");

    System.out.println(map.put(keyVo,"BB"));

    System.out.println(map.get(keyVo));

    System.out.println("putIfAbsent已存在的key-------------");

    System.out.println(map.putIfAbsent(keyVo,"CC"));

    System.out.println(map.get(keyVo));

    System.out.println("putIfAbsent不存在的key-------------");

    KeyVo keyVo2 =new KeyVo(2,"B");

    System.out.println(map.putIfAbsent( keyVo2,"CC"));

    System.out.println(map.get(keyVo2));

    ConcurrentHashMap map2

    =new ConcurrentHashMap(2,

    0.75f,16);

    }

    }

    在代码层面它的作用类似于:  

    synchronized(map) {

    if(map.get(key) ==null) {   

     returnmap.put(key, value);          

      }else{                

    returnmap.get(key);          

      }        

    }

    它让上述代码的整个操作是线程安全的

    ConcurrentHashMap实现分析

    在 1.7 的实现

    ​​​​

    ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。 Segment 是一种可重入锁(ReentrantLock),在 ConcurrentHashMap 里扮演锁的 角色;HashEntry 则用于存储键值对数据。一个 ConcurrentHashMap 里包含一个 Segment 数组。Segment 的结构和 HashMap 类似,是一种数组和链表结构。一个 Segment 里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素, 每个 Segment 守护着一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据 进行修改时,必须首先获得与它对应的 Segment 锁。

    构造方法和初始化

    ConcurrentHashMap 初始化方法是通过 initialCapacity、loadFactor 和 concurrencyLevel(参数 concurrencyLevel 是用户估计的并发级别,就是说你觉得最 多有多少线程共同修改这个 map,根据这个来确定 Segment 数组的大小 concurrencyLevel 默认是 DEFAULT_CONCURRENCY_LEVEL=16;)等几个参数来初始 化 segment 数组、段偏移量 segmentShift、段掩码 segmentMask 和每个 segment 里的 HashEntry 数组来实现的。

    并发级别可以理解为程序运行时能够同时更新 ConccurentHashMap 且不产 生锁竞争的最大线程数,实际上就是 ConcurrentHashMap 中的分段锁个数,即 Segment[]的数组长度。ConcurrentHashMap 默认的并发度为 16,但用户也可以 在构造函数中设置并发度。当用户设置并发度时,ConcurrentHashMap 会使用大 于等于该值的最小 2 幂指数作为实际并发度(假如用户设置并发度为 17,实际 并发度则为 32)。

    如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大, 原本位于同一个 Segment 内的访问会扩散到不同的 Segment 中,CPUcache 命中 率会下降,从而引起程序性能下降。 (文档的说法是根据你并发的线程数量决定, 太多会导性能降低)

    segments 数组的长度 ssize 是通过 concurrencyLevel 计算得出的。为了能通 过按位与的散列算法来定位 segments 数组的索引,必须保证 segments 数组的长 度是 2 的 N 次方(power-of-twosize),所以必须计算出一个大于或等于 concurrencyLevel 的最小的 2 的 N 次方值来作为 segments 数组的长度。假如 concurrencyLevel 等于 14、15 或 16,ssize 都会等于 16,即容器里锁的个数也是 16。

    以下知识了解即可,无需深究

    初始化 segmentShift 和 segmentMask

    这 两 个 全 局 变 量 需 要 在 定 位 segment时 的 散 列 算 法 里 使 用 , sshift等 于 ssize从 1向 左 移 位 的 次 数 , 在 默 认 情 况 下 concurrencyLevel等 于 16 , 1需 要 向 左 移 位移 动 4次 , 所 以 sshift等 于 4 。 segmentShift用 于 定 位 参 与 散 列 运 算 的 位 数 ,segmentShift等 于 32减 sshift , 所 以 等 于 28 , 这 里 之 所 以 用 32是 因 为ConcurrentHashMap里 的 hash() 方 法 输 出 的 最 大 数 是 32位 的 。 segmentMask是 散列 运 算 的 掩 码 , 等 于 ssize减 1 , 即 15 , 掩 码 的 二 进 制 各 个 位 的 值 都 是 1 。 因 为ssize的 最 大 长 度 是 65536 ,所 以 segmentShift最 大 值 是 16 , segmentMask最 大 值是 65535 , 对 应 的 二 进 制 是 16位 , 每 个 位 都 是 1 。

    ​输入参数 initialCapacity 是 ConcurrentHashMap 的初始化容量,loadfactor 是 每个 segment 的负载因子,在构造方法里需要通过这两个参数来初始化数组中的 每个 segment。上面代码中的变量 cap 就是 segment 里 HashEntry 数组的长度, 它等于 initialCapacity 除以 ssize 的倍数 c,如果 c 大于 1,就会取大于等于 c 的 2 的 N 次方值,所以 segment 里 HashEntry 数组的长度不是 1,就是 2 的 N 次方。

    在默认情况下, ssize=16, initialCapacity=16,loadFactor=0.75f,那么 cap =1,threshold=(int)cap*loadFactor=0。

    既然 ConcurrentHashMap 使用分段锁 Segment 来保护不同段的数据,那么在 插入和获取元素的时候,必须先通过散列算法定位到 Segment。 ConcurrentHashMap 会首先使用 Wang/Jenkinshash 的变种算法对元素的 hashCode 进行一次再散列。

    ConcurrentHashMap 完全允许多个读操作并发进行,读操作并不需要加锁。 ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的以及volatile关键 字。

    get操作

    get 操作先经过一次再散列,然后使用这个散列值通过散列运算定位到 Segment(使用了散列值的高位部分),再通过散列算法定位到 table(使用了散列值 的全部)。整个 get 过程,没有加锁,而是通过 volatile 保证 get 总是可以拿到最 新值。

    put操作

    ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其他 槽,在插入第一个值的时候再进行初始化。

    ensureSegment 方法考虑了并发情况,多个线程同时进入初始化同一个槽 segment[k],但只要有一个成功就可以了。

    具体实现是

    put 方法会通过 tryLock()方法尝试获得锁,获得了锁,node 为 null 进入 try 语句块,没有获得锁,调用 scanAndLockForPut 方法自旋等待获得锁。

    scanAndLockForPut 方法里在尝试获得锁的过程中会对对应 hashcode 的链表 进行遍历,如果遍历完毕仍然找不到与 key 相同的 HashEntry 节点,则为后续的 put 操作提前创建一个 HashEntry。当 tryLock 一定次数后仍无法获得锁,则通过 lock 申请锁。

    在获得锁之后,Segment 对链表进行遍历,如果某个 HashEntry 节点具有相 同的 key,则更新该 HashEntry 的 value 值,

    否则新建一个 HashEntry 节点,采用头插法,将它设置为链表的新 head 节 点并将原头节点设为新 head 的下一个节点。新建过程中如果节点总数(含新建 的 HashEntry)超过 threshold,则调用 rehash()方法对 Segment 进行扩容,最后 将新建 HashEntry 写入到数组中。

    rehash操作

    扩容是新创建了数组,然后进行迁移数据,最后再将 newTable 设置给属性 table。

    为了避免让所有的节点都进行复制操作:由于扩容是基于 2 的幂指来操作, 假设扩容前某 HashEntry 对应到 Segment 中数组的 index 为 i,数组的容量为 capacity,那么扩容后该 HashEntry 对应到新数组中的 index 只可能为 i 或者 i+capacity,因此很多 HashEntry 节点在扩容前后 index 可以保持不变。

    假设原来 table 长度为 4,那么元素在 table 中的分布是这样的

    扩容后 table 长度变为 8,那么元素在 table 中的分布变成

    可以看见 hash 值为 34 和 56 的下标保持不变,而 15,23,77 的下标都是在原 来下标的基础上+4 即可,可以快速定位和减少重排次数。 

    该方法没有考虑并发,因为执行该方法之前已经获取了锁

    remove操作 

    与 put 方法类似,都是在操作前需要拿到锁,以保证操作的线程安全性

    ConcurrentHashMap的弱一致性 

    对链表遍历判断是否存在 key 相同的节点以及获得该节点的 value。但由于遍历过程中其他线程可能对链表结构做了调整,因此 get 和 containsKey 返 回的可能是过时的数据,这一点是 ConcurrentHashMap 在弱一致性上的体现。如 果要求强一致性,那么必须使用 Collections.synchronizedMap()方法。

    size、containsValue

    这些方法都是基于整个 ConcurrentHashMap 来进行操作的,他们的原理也基 本类似:首先不加锁循环执行以下操作:循环所有的 Segment,获得对应的值以 及所有 Segment 的 modcount 之和。在 put、remove 和 clean 方法里操作元素 前都会将变量 modCount 进行变动,如果连续两次所有 Segment 的 modcount 和相等,则过程中没有发生其他线程修改 ConcurrentHashMap 的情况,返回获得 的值。

    当循环次数超过预定义的值时,这时需要对所有的 Segment 依次进行加锁, 获取返回值后再依次解锁。所以一般来说,应该避免在多线程环境下使用 size 和 containsValue 方法。

    在 1.8 的实现

    改进

    改进一:取消 segments 字段,直接采用 transientvolatileHashEntry[] table 保存数据,采用 table 数组元素作为锁,从而实现了对缩小锁的粒度,进一 步减少并发冲突的概率,并大量使用了采用了 CAS+synchronized 来保证并发安 全性。

    改进二:将原先 table 数组+单向链表的数据结构,变更为 table 数组+单 向链表+红黑树的结构。对于 hash 表来说,最核心的能力在于将 keyhash 之后 能均匀的分布在数组中。如果 hash 之后散列的很均匀,那么 table 数组中的每个 队列长度主要为 0 或者 1。但实际情况并非总是如此理想,虽然 ConcurrentHashMap 类默认的加载因子为 0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式, 那么查询某个节点的时间复杂度为 O(n);因此,对于个数超过 8(默认值)的列表, jdk1.8 中采用了红黑树的结构,那么查询的时间复杂度可以降低到 O(logN),可以改进性能。

    使用 Node(1.7 为 Entry) 作为链表的数据结点,仍然包含 key,value, hash 和 next 四个属性。 红黑树的情况使用的是 TreeNode(extendsNode)。

    根据数组元素中,第一个结点数据类型是 Node 还是 TreeNode 可以判断 该位置下是链表还是红黑树。

    用于判断是否需要将链表转换为红黑树的阈值

    用于判断是否需要将红黑树转换为链表的阈值

    核心数据结构和属性

    Node

    Node 是最核心的内部类,它包装了 key-value 键值对。

    定义基本和 1.7 中的 HashEntry 相同。而这个 map 本身所持有的也是一个Node 型的数组

    增加了一个 find 方法来用以辅助 map.get()方法。其实就是遍历链表,子类 中会覆盖这个方法。

    在 map 中还定义了 Segment 这个类,不过只是为了向前兼容而已,不做过多考虑。

    TreeNode

    树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为 TreeNode。

    与 1.8 中 HashMap 不同点:

    1、它并不是直接转换为红黑树,而是把这些结点放在 TreeBin 对象中,由 TreeBin 完成对红黑树的包装。

    2、TreeNode 在 ConcurrentHashMap 扩展自 Node 类,而并非 HashMap 中的 扩展自 LinkedHashMap.Entry类,也就是说 TreeNode 带有 next 指针。

    TreeBin

    负责 TreeNode 节点。它代替了 TreeNode 的根节点,也就是说在实际的 ConcurrentHashMap“数组”中,存放的是 TreeBin 对象,而不是 TreeNode 对象。 另外这个类还带有了读写锁机制。

    特殊的 ForwardingNode

    一个特殊的 Node 结点,hash 值为 -1,其中存储 nextTable 的引用。有 table 发生扩容的时候,ForwardingNode 发挥作用,作为一个占位符放在 table 中表示当前结点为 null 或者已经被移动。

    sizeCtl

    用来控制 table 的初始化和扩容操作。

    负数代表正在进行初始化或扩容操作

    -1 代表正在初始化

    -N 表示有 N-1 个线程正在进行扩容操作

    0 为默认值,代表当时的 table 还没有被初始化 正数表示初始化大小或 Map 中的元素达到这个数量时,需要进行扩容了。

    核心方法

    构造方法

    可以发现,在 new 出一个 map 的实例时,并不会创建其中的数组等等相关 的部件,只是进行简单的属性设置而已,同样的,table 的大小也被规定为必须 是 2 的乘方数。

    真正的初始化在放在了是在向 ConcurrentHashMap 中插入元素的时候发生 的。如调用 put、computeIfAbsent、compute、merge 等方法的时候,调用时机 是检查 table==null。

    get操作

    get 方法比较简单,给定一个 key 来确定 value 的时候,必须满足两个条件 key 相同 hash 值相同,对于节点可能在链表或树上的情况,需要分别去查找。

    put操作

    总结来说,put 方法就是,沿用 HashMap 的 put 方法的思想,根据 hash 值 计算这个新插入的点在 table 中的位置 i,如果 i 位置是空的,直接放进去,否则 进行判断,如果 i 位置是树节点,按照树的方式插入新的节点,否则把 i 插入到 链表的末尾。

    整体流程上,就是首先定义不允许 key 或 value 为 null 的情况放入 对于每 一个放入的值,首先利用 spread 方法对 key 的 hashcode 进行一次 hash 计算,由 此来确定这个值在 table 中的位置。

    如果这个位置是空的,那么直接放入,而且不需要加锁操作。

    如果这个位置存在结点,说明发生了 hash 碰撞,首先判断这个节点的类型。 如果是链表节点,则得到的结点就是 hash 值相同的节点组成的链表的头节点。需 要依次向后遍历确定这个新加入的值所在位置。如果遇到 hash 值与 key 值都与 新加入节点是一致的情况,则只需要更新 value 值即可。否则依次向后遍历,直 到链表尾插入这个结点。如果加入这个节点以后链表长度大于 8,就把这个链表 转换成红黑树。如果这个节点的类型已经是树节点的话,直接调用树节点的插入 方法进行插入新的值。

    初始化 

    前面说过,构造方法中并没有真正初始化,真正的初始化在放在了是在向 ConcurrentHashMap 中插入元素的时候发生的。具体实现的方法就是 initTable

    transfer

    当 ConcurrentHashMap 容量不足的时候,需要对 table 进行扩容。这个方法 的基本思想跟 HashMap 是很像的,但是由于它是支持并发扩容的,所以要复杂的多。

    为何要并发扩容?

    因为在扩容的时候,总是会涉及到从一个“数组”到另一 个“数组”拷贝的操作,如果这个操作能够并发进行,就能利用并发处理去减少 扩容带来的时间影响。

    整个扩容操作分为两个部分:

    第一部分是构建一个 nextTable,它的容量是原来的 2 倍。

    第二个部分就是将原来 table 中的元素复制到 nextTable 中,这里允许多线程进行操作。

    整个扩容流程就是遍历和复制:

    为 null 或者已经处理过的节点,会被设置为 forwardNode 节点,当线程准备 扩容时,发现节点是 forwardNode 节点,跳过这个节点,继续寻找未处理的节点, 找到了,对节点上锁,

    如果这个位置是 Node 节点(fh>=0),说明它是一个链表,就构造一个反序链表,把他们分别放在 nextTable 的 i 和 i+n 的位置上

    如果这个位置是 TreeBin 节点(fh<0),也做一个反序处理,并且判断是否 需要红黑树转链表,把处理的结果分别放在 nextTable 的 i 和 i+n 的位置上

    遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table, 并且更新 sizeCtl 为新容量的 0.75 倍 ,完成扩容。

    并发扩容其实就是将数据迁移任务拆分成多个小迁移任务,在实现上使用了 一个变量 stride 作为步长控制,每个线程每次负责迁移其中的一部分。

    remove

    移除方法的基本流程和 put 方法很类似,只不过操作由插入数据变为移除数据而已,而且如果存在红黑树的情况下,会检查是否需要将红黑树转为链表的步骤。

    treeifyBin

    用于将过长的链表转换为 TreeBin 对象。但是他并不是直接转换,而是进行 一次容量判断,如果容量没有达到转换的要求,直接进行扩容操作并返回;如果 满足条件才将链表的结构转换为 TreeBin ,这与 HashMap 不同的是,它并没有 把 TreeNode 直接放入红黑树,而是利用了 TreeBin 这个小容器来封装所有的 TreeNode。

    size

    在 JDK1.8 版本中,对于 size 的计算,在扩容和 addCount()方法就已经有处理 了,可以注意一下 Put 函数,里面就有 addCount()函数,早就计算好的,然后你 size 的时候直接给你。 JDK1.7 是在调用 size()方法才去计算,其实在并发集合中去 计算 size 是没有多大的意义的,因为 size 是实时在变的。

    在具体实现上,计算大小的核心方法都是 sumCount()

    可以看见,统计数量时使用了baseCount、和 CounterCell 类型的变量 counterCells 。其实 baseCount 就是记录容器数量的,而 counterCells 则是记录 CAS 更新 baseCounter 值时,由于高并发而导致失败的值。这两个变量的变化在 addCount() 方法中有体现,大致的流程就是:

    1、对 baseCount 做 CAS 自增操作。

    2、如果并发导致 baseCountCAS 失败了,则使用 counterCells。

    3、如果 counterCellsCAS 失败了,在 fullAddCount 方法中,会继续死循环 操作,直到成功。

    相关文章

      网友评论

          本文标题:JAVA并发编程 并发容器 线程安全——Concurrent

          本文链接:https://www.haomeiwen.com/subject/pdjpuhtx.html