美文网首页Java 杂谈Java总社区
并发机制的底层实现

并发机制的底层实现

作者: java劝退师图图 | 来源:发表于2019-04-02 17:16 被阅读2次

concurrent 包的实现

由于 Java 的 CAS 同时具有 volatile 读和 volatile 写的内存语义,因此 Java 线程之间的通信现在有了下面四种方式:

A 线程写 volatile 变量,随后 B 线程读这个 volatile 变量。

A 线程写 volatile 变量,随后 B 线程用 CAS 更新这个 volatile 变量。

A 线程用 CAS 更新一个 volatile 变量,随后 B 线程用 CAS 更新这个 volatile 变量。

A 线程用 CAS 更新一个 volatile 变量,随后 B 线程读这个 volatile 变量。

同时,volatile 变量的读/写和 CAS 可以实现线程之间的通信。把这些特性整合在一起,就形成了整个 concurrent 包得以实现的基石。如果我们仔细分析 concurrent 包的源代码实现,会发现一个通用化的实现模式:

首先,声明共享变量为 volatile;

然后,使用 CAS 的原子条件更新来实现线程之间的同步;

同时,配合以 volatile 的读/写和 CAS 所具有的 volatile 读和写的内存语义来实现线程之间的通信。

AQS,非阻塞数据结构和原子变量类(Java.util.concurrent.atomic 包中的类),这些 concurrent 包中的基础类都是使用这种模式来实现的,而 concurrent 包中的高层类又是依赖于这些基础类来实现的。从整体来看,concurrent 包的实现示意图如下:

synchronized

synchronized 的要点

关键字 synchronized 可以保证在同一个时刻,只有一个线程可以执行某个方法或者某个代码块。

synchronized 有 3 种应用方式:

同步实例方法

同步静态方法

同步代码块

同步实例方法

❌ 错误示例 - 未同步的示例

@NotThreadSafepublicclassSynchronizedDemo01implementsRunnable{staticinti=0;publicvoidincrease() {        i++;    }@Overridepublicvoidrun() {for(intj=0; j<100000; j++) {            increase();        }    }publicstaticvoidmain(String[]args)throwsInterruptedException{SynchronizedDemo01instance=newSynchronizedDemo01();Threadt1=newThread(instance);Threadt2=newThread(instance);        t1.start();        t2.start();        t1.join();        t2.join();System.out.println(i);    }}//输出结果: 小于 200000 的随机数字

Java 实例方法同步是同步在拥有该方法的对象上。这样,每个实例其方法同步都同步在不同的对象上,即该方法所属的实例。只有一个线程能够在实例方法同步块中运行。如果有多个实例存在,那么一个线程一次可以在一个实例同步块中执行操作。一个实例一个线程。

@ThreadSafepublicclassSynchronizedDemo02implementsRunnable{staticinti=0;publicsynchronizedvoidincrease() {        i++;    }@Overridepublicvoidrun() {for(intj=0; j<100000; j++) {            increase();        }    }publicstaticvoidmain(String[]args)throwsInterruptedException{SynchronizedDemo02instance=newSynchronizedDemo02();Threadt1=newThread(instance);Threadt2=newThread(instance);        t1.start();        t2.start();        t1.join();        t2.join();System.out.println(i);    }}//输出结果://2000000

同步静态方法

静态方法的同步是指同步在该方法所在的类对象上。因为在 JVM 中一个类只能对应一个类对象,所以同时只允许一个线程执行同一个类中的静态同步方法。

对于不同类中的静态同步方法,一个线程可以执行每个类中的静态同步方法而无需等待。不管类中的那个静态同步方法被调用,一个类只能由一个线程同时执行。

@ThreadSafepublicclassSynchronizedDemo03implementsRunnable{staticinti=0;publicstaticsynchronizedvoidincrease() {        i++;    }@Overridepublicvoidrun() {for(intj=0; j<100000; j++) {            increase();        }    }publicstaticvoidmain(String[]args)throwsInterruptedException{Threadt1=newThread(newSynchronizedDemo03());Threadt2=newThread(newSynchronizedDemo03());        t1.start();        t2.start();        t1.join();        t2.join();System.out.println(i);    }}//输出结果://200000

同步代码块

有时你不需要同步整个方法,而是同步方法中的一部分。Java 可以对方法的一部分进行同步。

注意 Java 同步块构造器用括号将对象括起来。在上例中,使用了 this,即为调用 add 方法的实例本身。在同步构造器中用括号括起来的对象叫做监视器对象。上述代码使用监视器对象同步,同步实例方法使用调用方法本身的实例作为监视器对象。

一次只有一个线程能够在同步于同一个监视器对象的 Java 方法内执行。

@ThreadSafepublicclassSynchronizedDemo04implementsRunnable{staticinti=0;staticSynchronizedDemo04instance=newSynchronizedDemo04();@Overridepublicvoidrun() {synchronized(instance) {for(intj=0; j<100000; j++) {                i++;            }        }    }publicstaticvoidmain(String[]args)throwsInterruptedException{Threadt1=newThread(instance);Threadt2=newThread(instance);        t1.start();        t2.start();        t1.join();        t2.join();System.out.println(i);    }}//输出结果://200000

synchronized 的原理

synchronized 实现同步的基础是:Java 中的每一个对象都可以作为锁。

对于普通同步方法,锁是当前实例对象。

对于静态同步方法,锁是当前类的 Class 对象。

对于同步方法块,锁是 Synchonized 括号里配置的对象。

👉 参考阅读:Java 并发编程:synchronized 👉 参考阅读:深入理解 Java 并发之 synchronized 实现原理

volatile

volatile 的要点

volatile 是轻量级的 synchronized,它在多处理器开发中保证了共享变量的“可见性”。

可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。

一旦一个共享变量(类的成员变量、类的静态成员变量)被 volatile 修饰之后,那么就具备了两层语义:

保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。

禁止进行指令重排序。

如果一个字段被声明成 volatile,Java 线程内存模型确保所有线程看到这个变量的值是一致的。

volatile 的原理

观察加入 volatile 关键字和没有加入 volatile 关键字时所生成的汇编代码发现,加入 volatile 关键字时,会多出一个 lock 前缀指令。

lock 前缀指令实际上相当于一个内存屏障(也成内存栅栏),内存屏障会提供 3 个功能:

它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面;即在执行到内存屏障这句指令时,在它前面的操作已经全部完成;

它会强制将对缓存的修改操作立即写入主存;

如果是写操作,它会导致其他 CPU 中对应的缓存行无效。

volatile 的应用场景

如果 volatile 变量修饰符使用恰当的话,它比 synchronized 的使用和执行成本更低,因为它不会引起线程上下文的切换和调度。

但是,volatile 无法替代 synchronized ,因为 volatile 无法保证操作的原子性。通常来说,使用 volatile 必须具备以下 2 个条件:

对变量的写操作不依赖于当前值

该变量没有包含在具有其他变量的不变式中

应用场景:

状态标记量

volatilebooleanflag=false;while(!flag) {    doSomething();}publicvoidsetFlag() {    flag=true;}

double check

classSingleton{privatevolatilestaticSingletoninstance=null;privateSingleton() {}publicstaticSingletongetInstance() {if(instance==null) {synchronized(Singleton.class) {if(instance==null)                    instance=newSingleton();            }        }returninstance;    }}

👉 参考阅读:Java 并发编程:volatile 关键字解析

CAS

简介

CAS(Compare and Swap),字面意思为比较并交换。CAS 有 3 个操作数,内存值 V,旧的预期值 A,要修改的新值 B。当且仅当预期值 A 和内存值 V 相同时,将内存值 V 修改为 B,否则什么都不做。

操作

我们常常做这样的操作

if(a==b) {    a++;}

试想一下如果在做 a++之前 a 的值被改变了怎么办?a++还执行吗?出现该问题的原因是在多线程环境下,a 的值处于一种不定的状态。采用锁可以解决此类问题,但 CAS 也可以解决,而且可以不加锁。

intexpect=a;if(a.compareAndSet(expect,a+1)) {    doSomeThing1();}else{    doSomeThing2();}

这样如果 a 的值被改变了 a++就不会被执行。按照上面的写法,a!=expect 之后,a++就不会被执行,如果我们还是想执行 a++操作怎么办,没关系,可以采用 while 循环

while(true) {intexpect=a;if(a.compareAndSet(expect, a+1)) {        doSomeThing1();return;    }else{        doSomeThing2();    }}

采用上面的写法,在没有锁的情况下实现了 a++操作,这实际上是一种非阻塞算法。

应用

非阻塞算法 (nonblocking algorithms)

一个线程的失败或者挂起不应该影响其他线程的失败或挂起的算法。

现代的 CPU 提供了特殊的指令,可以自动更新共享数据,而且能够检测到其他线程的干扰,而 compareAndSet() 就用这些代替了锁定。

拿出 AtomicInteger 来研究在没有锁的情况下是如何做到数据正确性的。

privatevolatileintvalue;

首先毫无疑问,在没有锁的机制下可能需要借助 volatile 原语,保证线程间的数据是可见的(共享的)。

这样才获取变量的值的时候才能直接读取。

publicfinalintget() {returnvalue;}

然后来看看++i 是怎么做到的。

publicfinalintincrementAndGet() {for(;;) {intcurrent=get();intnext=current+1;if(compareAndSet(current, next))returnnext;    }}

在这里采用了 CAS 操作,每次从内存中读取数据然后将此数据和+1 后的结果进行 CAS 操作,如果成功就返回结果,否则重试直到成功为止。

而 compareAndSet 利用 JNI 来完成 CPU 指令的操作。

publicfinalbooleancompareAndSet(intexpect,intupdate) {returnunsafe.compareAndSwapInt(this, valueOffset, expect, update);}

整体的过程就是这样子的,利用 CPU 的 CAS 指令,同时借助 JNI 来完成 Java 的非阻塞算法。其它原子操作都是利用类似的特性完成的。

其中 unsafe.compareAndSwapInt(this, valueOffset, expect, update)类似:

if(this==expect) {this=updatereturntrue;}else{returnfalse;}

那么问题就来了,成功过程中需要 2 个步骤:比较 this == expect,替换 this = update,compareAndSwapInt 如何这两个步骤的原子性呢? 参考 CAS 的原理

原理

Java 代码如何确保处理器执行 CAS 操作?

CAS 通过调用 JNI(JNI:Java Native Interface 为 Java 本地调用,允许 Java 调用其他语言。)的代码实现的。JVM 将 CAS 操作编译为底层提供的最有效方法。在支持 CAS 的处理器上,JVM 将它们编译为相应的机器指令;在不支持 CAS 的处理器上,JVM 将使用自旋锁。

特点

优点

一般情况下,比锁性能更高。因为 CAS 是一种非阻塞算法,所以其避免了线程被阻塞时的等待时间。

缺点

ABA 问题

因为 CAS 需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是 A,变成了 B,又变成了 A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA 问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么 A-B-A 就会变成 1A-2B-3A。

从 Java1.5 开始 JDK 的 atomic 包里提供了一个类 AtomicStampedReference 来解决 ABA 问题。这个类的 compareAndSet 方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

循环时间长开销大

自旋 CAS 如果长时间不成功,会给 CPU 带来非常大的执行开销。如果 JVM 能支持处理器提供的 pause 指令那么效率会有一定的提升,pause 指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起 CPU 流水线被清空(CPU pipeline flush),从而提高 CPU 的执行效率。

比较花费 CPU 资源,即使没有任何用也会做一些无用功。

只能保证一个共享变量的原子操作

当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量 i = 2,j=a,合并一下 ij=2a,然后用 CAS 来操作 ij。从 Java1.5 开始 JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作。

总结

可以用 CAS 在无锁的情况下实现原子操作,但要明确应用场合,非常简单的操作且又不想引入锁可以考虑使用 CAS 操作,当想要非阻塞地完成某一操作也可以考虑 CAS。不推荐在复杂操作中引入 CAS,会使程序可读性变差,且难以测试,同时会出现 ABA 问题。

免费Java资料需要自己领取,涵盖了Java、Redis、MongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo/Kafka、Hadoop、Hbase、Flink等高并发分布式、大数据、机器学习等技术。

传送门:https://mp.weixin.qq.com/s/JzddfH-7yNudmkjT0IRL8Q

相关文章

网友评论

    本文标题:并发机制的底层实现

    本文链接:https://www.haomeiwen.com/subject/gragbqtx.html