前言
线程并发系列文章:
Java 线程基础
Java “优雅”地中断线程
Java 线程状态
真正理解Java Volatile的妙用
Java ThreadLocal你之前了解的可能有误
Java Unsafe/CAS/LockSupport 应用与原理
Java 并发"锁"的本质(一步步实现锁)
学过C/C++都应该对指针不陌生,指针指向了一个内存块,通过指针就可以轻易地修改内存。而Java已经没有指针这概念,取而代之的是引用,通过引用访问对象里的字段。实际上Java还是提供了操作内存的类,该类即是Unsafe。
通过本篇文章,你将了解到:
1、Unsafe 有哪些功能
2、Unsafe 操作对象
3、CAS 原理及应用
4、LockSupport 挂起/唤醒 线程
5、总结
1、Unsafe 有哪些功能
如何查看Unsafe 内容
Unsafe.java 在sun.misc包下,并不是Java标准里的类,但是很多基础类库,比如Netty、Hadoop、Spock、并发库下的锁等依靠它提升运行效率,提升操作底层的能力。正因为它能绕过JVM操作内存,一旦使用不当将造成严重后果,因此一般程序应该尽量避免使用它。
在Android Studio里查看并没有发现有Unsafe.java类,而仅仅查到Unsafe.class,其路径为:
rt.jar->sun.misc.Unsafe.class
.class文件里的变量不直观,对阅读不友好。因此我们需要找到Unsafe.java文件。
该文件需要查看JVM源码,源码网址为:
http://hg.openjdk.java.net/
该网址里的源码包含JDK和JVM,下载到本地即可查阅对应的文件。
此处下载的是jdk8u60,后续的一些列文章皆以此为基础分析。
获取Unsafe引用
截取部分源码查看:
#Unsafe.java
private Unsafe() {}//----->(1)
private static final Unsafe theUnsafe = new Unsafe();//-------->(2)
@CallerSensitive
public static Unsafe getUnsafe() {//-------->(3)
Class<?> caller = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(caller.getClassLoader()))
throw new SecurityException("Unsafe");
return theUnsafe;
}
从(1)可知,外界无法通过构造方法直接构造Unsafe对象。
从(2)可知,Unsafe定义了一个静态引用。
从(3)可知,提供了静态方法用以返回静态引用,不过该静态方法是有条件的:
判断调用者是否是使用"启动(Bootstrap)类加载器"加载的,显然我们调用者不是,而是使用系统(System)类加载器加载的,因此也无法通过该方法获取Unsafe引用。
无法正常调用,理所当然想到反射,以下获取"theUnsafe"变量:
private static final Unsafe unsafe;
static {
try {
//指定要反射的字段
final Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
拿到Unsafe引用后,就可以调用它的实例方法了。
Unsafe 提供的功能
用一张图表示:
该图片来源于:https://tech.meituan.com/2019/02/14/talk-about-java-magic-class-unsafe.html
可以看出,Unsafe提供了8大功能,我们本篇主要关注三个功能:
1、对象操作
2、CAS
3、线程挂起/唤醒
接下来一一分析三者的原理与应用。
2、Unsafe 操作对象
Java平时都是通过操作对象来访问字段的,接触不到指针,Unsafe提供了类似指针的操作。
class Student {
int age;
char name;
}
想要通过Unsafe访问age字段,先来看看Student对象在内存的分布:
image.png
我们知道,Java 虽然屏蔽了指针,但是底层还是通过指针访问的。因此,只要获取了对象在内存中的地址,找到其中字段在对象里的偏移,就可以访问相应的字段了。
class Student {
int age;
char name;
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long AGE;
static {
try {
//AGE 为age变量在Student对象里的偏移量
AGE = U.objectFieldOffset
(Student.class.getDeclaredField("age"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
//改变age的值
private void setAge(int age) {
U.putInt(this, AGE, age);
}
}
从上可知:
1、通过Unsafe获取age字段在Student对象里的偏移量
2、通过对象基准地址+偏移量就可以定位到age字段,进而可以访问(读/写)
3、此处是拿到偏移量后通过Unsafe修改
3、CAS 原理及应用
CAS 原理
Unsafe最常用的功能或许就是CAS了,CAS=Compare And Swap 简称,顾名思义:先比较,再交换。
CAS是Java并发的基础,之前提到过并发的重要条件:原子性。
试想多个线程同时访问同一个共享变量,怎么确保满足原子性呢?你可能想到了锁,锁就是依靠CAS实现互斥的。
先来看看Unsafe里提供的CAS方法:
#Unsafe.java
//假设想要改变的变量名为:int a
//o----->表示当前需要改变的变量a所在的对象
//offset----->表示当前需要改变的变量a在o里的偏移量
//expected---->表示a当前的预期值
//x----------->表示要更改a的值为x
//返回true表示更改a=x 成功
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
意思是要比较和交换一个整形值。
compareAndSwapInt 是个native方法,其对应的文件是Unsafe.cpp:
#Unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
//获取对象里变量的指针
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
//调用原子方法比较和交换,若返回的值与期望值一致,则认为修改成功
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
可以看出,compareAndSwapInt里传入的o/offset 确定了变量在对象里的地址,此处用指针指向它。然后调用了Atomic的cmpxchg(xx)方法,该方法是将汇编语句插入到C++文件里的。
image.png
可以看出,该方法在不同的系统下有不一样的实现方式,此处以Linux x86为例:
#atomic_linux_x86.inline.hpp
//内联函数
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
//是否是多处理器
int mp = os::is_MP();
//__asm__ -->内联汇编代码
//volatile 禁止指令重排
//LOCK_IF_MP--->如果是多处理器则加锁(锁总线或者锁缓存行cache line)
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
//cmpxchgl 表示汇编指令。%表示取第几个参数,从输出项数起,
//%1--> exchange_value %3--> dest 这俩取自输入项
: "=a" (exchange_value)
//输出项:a表示eax(累加寄存器),表示将eax赋值给exchange_value
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
//输入项:r表示将值读入通用寄存器,其中compare_value 放入eax
return exchange_value;//最后返回exchange_value
}
再来看看cmpxchgl 指令语法:
cmpxchgl 源操作数,目标操作数
先比较目标操作数与eax寄存器里的值是否相同,有两种结果:
1、如果相同,则将源操作数装载给目标操作数
2、如果不相同,则将目标操作数写入到eax里
结合上面的汇编,有如下指令:
cmpxchgl exchange_value,dest
先比较dest所指向内存的值与期望值是否一致(期望值compare_value 存放在eax里),有两种结果:
1、如果相同,则修改dest指向的值为新的值:exchange_value
2、如果不相同,则将dest指向的值写入到eax里
最后返回exchange_value,exchange_value 由eax赋值,若上面修改成功,则exchange_value==compare_value 否则exchange_value==dest指向的值。
因此最后判断内存是否修改成功可以通过比较exchange_value与compare_value,相等则认为成功。
(jint)(Atomic::cmpxchg(x, addr, e)) == e,返回true则认为成功。
因为涉及到修改内存,因此需要保证其原子性,cmpxchgl指令并没有保证原子性,LOCK_IF_MP 正是用来解决此问题的。
##atomic_linux_x86.inline.hpp
// Adding a lock prefix to an instruction on MP machine
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
若是多处理器,则需要添加lock前缀,该前缀的作用是在CPU访问主存前,先锁住总线/缓存行,这样其它CPU就无法更新缓存行/主存,解决了原子性问题。
因此当调用Unsafe.java方法:compareAndSwapInt(xx),其底层是上了锁保证了原子性,只是这个锁是由CPU实现的(硬件层面)。
此外,由于需要读取变量的值,由上篇文章可知,不同线程之间变量是不可见的,因此需要对变量加volatile修饰。
最后,用图表示流程:
image.png
CAS 应用
原理说了,来看看其应用,依然是以两个线程同时修改一个共享变量a为例:
public class TestCAS {
int volatile a = 0;
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long A;
static {
try {
A = U.objectFieldOffset
(TestCAS.class.getDeclaredField("a"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
private void changeValue(int newValue) {
boolean ret = U.compareAndSwapObject(TestCAS.this, A, a, newValue);
if (ret) {
System.out.println("change suc");
} else {
System.out.println("change fail");
}
}
}
其中changeValue(xx)是两个线程同时访问的方法。
- a=0(初始值),现在有两个线程想要更改a,线程1要将a改为1,线程2要将a改为2。
- 两个线程同时调用Unsafe. compareAndSwapInt(xx)方法,传入的参数o/offset固定不变。其它参数如下:
线程1: expected = 0, x = 1; 线程当前取得的a=0,因此expected = 0,想要更改a为1,因此x=1。
线程2: expected = 0, x = 2;原理同线程1。
可以看出,要想实现多个线程正确访问共享变量,借助于Unsafe.java,我们需要先计算出共享变量的偏移,再调用compareAndSwapObject(xx)方法,比较繁琐,并且这些步骤是可以提取出来作为公共方法的,最重要的是Unsafe.java不对应用层开放,也不建议应用层访问。
还好,JUC下对如上步骤进行了封装,并且提供了各个基础类:
image.png
如上图,不仅可以对基本类型如int、boolean、long等类型进行并发修改,也可以对引用类型进行修改,还是以上面修改共享变量a为例,使用AtomicInteger.java:
public class TestDemo {
static AtomicInteger a = new AtomicInteger(0);
public static void main(String args[]) {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
int count = 0;
while (count < 100) {
int value = a.addAndGet(1);
System.out.println("in thread1 a = " + value);
count++;
}
} catch (Exception e) {
}
}
});
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
int count = 0;
while (count < 100) {
int value = a.addAndGet(1);
System.out.println("in thread2 a = " + value);
count++;
}
} catch (Exception e) {
}
}
});
t2.start();
}
}
上面两个线程同时对AtomicInteger a 进行修改操作,每次都+1,循环100次,结束后每次都是稳定输出a=200,说明线程并发访问a结果正确。
来瞧一瞧addAndGet(xx)调用:
#AtomicInteger.java
public final int addAndGet(int delta) {
return U.getAndAddInt(this, VALUE, delta) + delta;
}
#Unsafe.java
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
//获取共享变量的值,通过偏移量获取
//getIntVolatile获取的变量是volatile修饰的,因此每次都能够拿到最新值
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));//不成功,则再次尝试
return v;
}
可以看到,Unsafe. getAndAddInt(xx)里有个死循环,一直尝试修改变量的值,不成功就一直去尝试,成功则退出循环。
若是共享变量一直被其它线程修改,则本线程则一直需要轮询,若是竞争不激烈,则本线程立马修改了值并返回。
4、LockSupport 挂起/唤醒 线程
Unsafe 里的挂起/唤醒
Unsafe.java 里有两个方法:
#Unsafe.java
//调用该方法的线程会挂起
//isAbsolute--->是否使用绝对时间,会影响time的单位
//time--->指定最多挂起多长的时间
//isAbsolute=true -->绝对时间,则time单位为毫秒,表示线程将被挂起到time这个时间点
//isAbsolute=false--->相对时间,则time单位为纳秒,如time =1000表示线程将被挂起1000纳秒
public native void park(boolean isAbsolute, long time);
//唤醒线程,thread表示待唤醒的线程
public native void unpark(Object thread);
这俩方法用来操作线程挂起与唤醒,当线程调用park(xx)挂起自身时,线程就阻塞于此,什么时候结束阻塞呢?
1、其它线程调用unpark(xx)唤醒它
2、其它线程中断了它
3、发生了不可预料的事情
4、分两种情况:如果是绝对时间,那么截止时间到了即结束;如果是相对时间,那么过期时间到了即结束
明显的,两者是本地方法,继续来看看其实现。
先看park(xx)对应的方法:
##Unsafe.cpp
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
UnsafeWrapper("Unsafe_Park");
EventThreadPark event;
...
JavaThreadParkedState jtps(thread, time != 0);
//调用parker的park方法
thread->parker()->park(isAbsolute != 0, time);
...
...
UNSAFE_END
thread指的是JavaThread。
image.png
JavaThread 继承自Thread类(在Thread.cpp里)。
image.png
而parker()函数返回Parker指针,继续往下走看看Parker内容:
#Park.hpp
class Parker : public os::PlatformParker {
private:
//许可计数
volatile int _counter ;
...
public:
//对应挂起、唤醒
void park(bool isAbsolute, jlong time);
void unpark();
...
};
看到os::PlatformParker就意识到park(xx)/unpark()不同系统有不一样的实现了。
此处选择Linux下的实现查看:
#os_linux.cpp
void Parker::park(bool isAbsolute, jlong time) {
//原子操作交换0和counter的值,也就是给counter赋0,返回值是counter原来的值
//不管原来counter是多少,只要counter>0,说明有许可,因此直接返回,无需挂起线程
if (Atomic::xchg(0, &_counter) > 0) return;
//判断线程中断与不合理的时间,将时间封装等,此处省略
//如果线程中断了,直接返回,否则尝试去获取_mutex锁,获取锁失败直接返回
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
return;
}
int status ;
//再次判断许可是否可用,如果可用则不用挂起,直接返回
if (_counter > 0) { // no wait needed
_counter = 0;
//释放锁
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
OrderAccess::fence();
return;
}
...
if (time == 0) {
//如果时间为0,则调用pthread_cond_wait 挂起线程等待
_cur_index = REL_INDEX; // arbitrary choice when not timed
status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
} else {
//否则调用safe_cond_timedwait 等待一段时间
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
pthread_cond_destroy (&_cond[_cur_index]) ;
pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
}
}
...
//许可置为0
_counter = 0 ;
//释放锁
status = pthread_mutex_unlock(_mutex) ;
...
}
这段代码是挂起的核心,其步骤如下:
1、先判断许可是否可用,若是则直接返回
2、尝试获取_mutex锁(互斥锁),获取成功则挂起线程等待,这里等待分无限/有限等待。safe_cond_timedwait 最终调用pthread_cond_timedwait
3、pthread_cond_wait/pthread_cond_wait 返回后修改许可并释放锁
pthread_cond_wait/pthread_cond_wait 在Native POSIX Thread Library (POSIX 原生线程库)里实现,里面用的是futex(fast userspace mutex)。
代码可查看:https://code.woboq.org/userspace/glibc/nptl/
再来看看unpark(xx)方法
与park(xx)调用类似,从Java->JNI->C++,调用如下:
Unsafe.java-->unpark(xx)
Unsafe.cpp-->Unsafe_Unpark(xx)
Parker->unpark();
重点来看Parker里的方法:
#os_linux.cpp
void Parker::unpark() {
int s, status ;
//先获取锁
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
//许可+1
_counter = 1;
if (s < 1) {
//当前没有许可,可能有线程在挂起
// thread might be parked
//_cur_index 可取三个值
//-1 ---> 表示没有挂起的线程
//0---->有挂起,使用相对时间
//1---->有挂起,使用绝对时间
if (_cur_index != -1) {
// thread is definitely parked
if (WorkAroundNPTLTimedWaitHang) {
//唤醒挂起的线程
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
//释放锁
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
} else {
//此处是反过来,先释放锁,再唤醒线程
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
}
} else {
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
} else {
//当前已有许可,说明没有线程挂起,因此不做唤醒操作,释放锁后退出
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}
结合Parker park(xx)与unpark()可知:
两者是通过监控许可的数量(counter)进行交互的
底层调用futex挂起与唤醒线程
以下这几个疑惑就可以解开了。
为什么需要加锁?
因为可能会有多个线程调用unpark()修改许可值,因此需要加锁来保证counter的正确性。
可以先unpark(),再park(xx)吗?
可以。park(xx)之前先去检测counter>0,若是直接返回。
许可的数量
counter值不会大于1。也就是即使多次unpark(),也只能产生1个许可。
LockSupport 里的挂起/唤醒
上面分析了Unsafe挂起/唤醒线程,同样的Unsafe不对普通应用开放,还好JUC下提供了LockSupport 类。
public static void park() {
U.park(false, 0L);
}
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}
//blocker 指的是任意对象,表示线程因为某个对象挂起,多用于排查原因
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
//记录到Thread.java里的parkBlocker字段
setBlocker(t, blocker);
U.park(false, nanos);
//线程唤醒后,置空
setBlocker(t, null);
}
}
LockSupport 里的方法不多,还有一些其它方法无非就是给Unsafe.java构造不同参数,最终还是依赖调用Unsafe.park(xx)/unpark(xx)。
5、总结
本次挑选了Unsafe里的三个功能进行分析,Unsafe借助Atomic能实现CAS、借助pthread实现线程挂起/唤醒。由于不推荐直接使用Unsafe本身,因此面向上层提供了AtomicInteger/LockSupport等类,总结关系如下:
image.png
本篇文章的内容是Java 线程并发的基础,夯实了基础之后,接下来深入聊聊并发常用的神器-->锁(AQS/Synchronized)前世今生,恩恩怨怨。
本文基于JDK1.8。
参考文章:
https://tech.meituan.com/2019/02/14/talk-about-java-magic-class-unsafe.html
https://segmentfault.com/a/1190000023381653
您若喜欢,请点赞、关注,您的鼓励是我前进的动力
持续更新中,和我一起步步为营系统、深入学习Android
1、Android各种Context的前世今生
2、Android DecorView 一窥全貌(上)
3、Android DecorView 一窥全貌(下)
4、Window/WindowManager 不可不知之事
5、View Measure/Layout/Draw 真明白了
6、Android事件分发全套服务
7、Android invalidate/postInvalidate/requestLayout 彻底厘清
8、Android Window 如何确定大小/onMeasure()多次执行原因
9、Android事件驱动Handler-Message-Looper解析
10、Android 键盘一招搞定
11、Android 各种坐标彻底明了
12、Android Activity/Window/View 的background
13、Android IPC 之Service 还可以这么理解
14、Android IPC 之Binder基础
15、Android IPC 之Binder应用
16、Android IPC 之AIDL应用(上)
17、Android IPC 之AIDL应用(下)
18、Android IPC 之Messenger 原理及应用
19、Android IPC 之获取服务(IBinder)
20、Android 存储基础
21、Android 10、11 存储完全适配(上)
22、Android 10、11 存储完全适配(下)
23、Java 并发系列不再疑惑
网友评论