思考问题
- 首先请您思考下面的问题:
- Synchronized锁同步机制性能不好嘛?
- 一个对象天生对应一个monitor锁吗?
- 为什么说synchronized是非公平锁?
synchronized字节码
- 使用java反编译,javap -c -p -v class文件
- 使用jclasslib插件,更加方便快捷
public synchronized int getAge(){ return 18 ; } //synchronized使用在实例方法上标记为ACC_SYNCHRONIZED,如果是类方法ACC_STATIC public synchronized int getAge(); descriptor: ()I flags: ACC_PUBLIC, ACC_SYNCHRONIZED Code: stack=1, locals=1, args_size=1 0: bipush 18 2: ireturn public String setName(){ synchronized (this){ return "shiq"; } } public java.lang.String setName(); Code: 0: aload_0 //一个局部变量下标0加载到操作栈即将this加载到操作数栈 1: dup //复制 2: astore_1 //数值从操作数栈存储到局部变量表 3: monitorenter //对操作数栈顶元素加锁即this 4: ldc #2 // String shiq 6: aload_1 7: monitorexit 8: areturn 9: astore_2 10: aload_1 11: monitorexit 12: aload_2 13: athrow Exception table: //方法异常表,保证能够释放锁 from to target type 4 8 9 any 9 12 9 any //如果将上方替换成反编译后 synchronized (SynchronizedTest.class) 0: ldc #2 // class SynchronizedTest 2: dup 3: astore_1 4: monitorenter //0行表示将一个常量池中位置为2的常量加载到操作数栈,查询常量池 #2 = Class #24 // SynchronizedTest
- 以上我们可以看到synchronized修饰this表示对象本身加锁,修饰class表示对类加锁
对象内存构成
- 一个对象的内存构成有对象头,实例数据,对齐填充
oop层级
- oopDesc是对象类的顶层基类,每个Java Object 在 JVM 内部都有一个 native 的 C++ 对象 oop/oopDesc 与之对应。源码位置 openjdk\hotspot\src\share\vm\oops\oop.hpp
- 每个对象头由两部分组成,klass pointer和Mark Word和Array length(只有是数组才会有)源码在vm/oops/oop.hpp中定义的OopDes
- _mark表示对象标记、属于markOop类型,也就是接下来要讲解的Mark World,它记录了对象和锁有关的信息
- Klass Pointer对象指向它的类元数据的指针,虚拟机通过这个指针来确定对象是哪个类的实例。即下方的_metadata表示类元信息,类元信息存储的是对象指向它的类元数据(Klass)的首地址,其中Klass表示普通指针、 _compressed_klass表示压缩类指针
- 实例数据:按对象头为基址做相对偏移后操作 obj_field_addr(offset偏移量)
- 对齐填充 :java默认的8字节对齐规则,如果占位不足要补齐
class oopDesc {
friend class VMStructs;
private:
volatile markOop _mark; ////markOop:Mark Word标记字段
union _metadata {
Klass* _klass; ////对象类型元数据的指针
narrowKlass _compressed_klass;
} _metadata;
public:
markOop mark() const { return _mark; } //返回Mark Word数值
markOop* mark_addr() const { return (markOop*) &_mark; } //返回一个地址值为_mark
public:
// Need this as public for garbage collection.
template <class T> T* obj_field_addr(int offset) const;
}
oopDes结构图:
image
对象头中Mark World
- markOopDesc 位于openjdk\hotspot\src\share\vm\oops\markOop.hpp 继承oopDesc
//mark Word 在markOopDesc中有注释
// 32 bits:
// --------
// hash:25 ------------>| age:4 biased_lock:1 lock:2 (normal object)
// JavaThread*:23 epoch:2 age:4 biased_lock:1 lock:2 (biased object)
// size:32 ------------------------------------------>| (CMS free block)
// PromotedObject*:29 ---------->| promo_bits:3 ----->| (CMS promoted object)
//
// 64 bits:
// --------
// unused:25 hash:31 -->| unused:1 age:4 biased_lock:1 lock:2 (normal object)
// JavaThread*:54 epoch:2 unused:1 age:4 biased_lock:1 lock:2 (biased object)
// PromotedObject*:61 --------------------->| promo_bits:3 ----->| (CMS promoted object)
// size:64 ----------------------------------------------------->| (CMS free block)
/**
通过以上可知是个32bit数值,不太可能是指针
*/
//观察其中的方法 ,是否是重量级锁monitor_value = 2 => 锁标记为 10 , 如果value()为重量级锁标记则***10 & 10 != 0成立
//因此value()函数代表的即为当前obj的mark Word数值
bool has_monitor() const {
return ((value() & monitor_value) != 0);
}
private:
// Conversion
uintptr_t value() const { return (uintptr_t) this; }
//是否设置了偏向标志 biased_lock_mask_in_place = 3
bool has_bias_pattern() const {
return (mask_bits(value(), biased_lock_mask_in_place) == biased_lock_pattern);
}
-
mark word 图解
image - 以上分析也可以很容易理解了markOopDesc中函数monitor: 返回一个ObjectMonitor指针对象 这个ObjectMonitor 其实就是对象监视器
ObjectMonitor* monitor() const {
assert(has_monitor(), "check"); //断定为重量级锁
// Use xor instead of &~ to provide one extra tag-bit check.
return (ObjectMonitor*) (value() ^ monitor_value); //将Mark Word值转化成指针,指向的就是obj的ObjectMonitor锁对象
}
- 用于返回一个ObjectMonitor锁对象,方法在vm/runtime/synchronizer.cpp中
锁入口
- 无论是synchronized代码块和synchronized方法,其底层获取锁的逻辑都是一样的。我们以代码块的monitorentor和monitorexit为例
- monitorenter指令在HotSpot的中有两处地方对monitorenter指令进行解析:一个是在bytecodeInterpreter.cpp ,另一个是在templateTable_x86_64.cpp
- 前者是JVM字节码解释器,有C++书写,优点是实现相对简单且容易理解,缺点是执行慢;
- 后者是模板解释器:对每个指令都写了一段对应的汇编代码,启动时将每个指令与对应汇编代码入口绑定,效率极高但理解不易;
偏向锁
- 首先理解两个类BasicObjectLock,BasicLock BasicObjectLock将特定的Java对象与BasicLock关联。
class BasicObjectLock VALUE_OBJ_CLASS_SPEC {
friend class VMStructs;
private:
BasicLock _lock; //BasicLock对象
oop _obj; //与上方_lock关联的java对象
public:
oop obj() const { return _obj; }
void set_obj(oop obj) { _obj = obj; }
BasicLock* lock() { return &_lock; }
class BasicLock VALUE_OBJ_CLASS_SPEC {
friend class VMStructs;
private:
volatile markOop _displaced_header; //mark word字段
public:
markOop displaced_header() const { return _displaced_header; }
void set_displaced_header(markOop header) { _displaced_header = header; }
void print_on(outputStream* st) const;
// move a basic lock (used during deoptimization
void move_to(oop obj, BasicLock* dest);
static int displaced_header_offset_in_bytes(){ return offset_of(BasicLock, _displaced_header); }
};
- BasicLock中的markOop其实就是偏向锁中保存当前mark word字段使用的
偏向锁加锁
- 偏向锁入口函数_monitorenter字节码
CASE(_monitorenter): {
//当前锁对象
oop lockee = STACK_OBJECT(-1);
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
BasicObjectLock* entry = NULL;
while (most_recent != limit ) {
if (most_recent->obj() == NULL) entry = most_recent;
else if (most_recent->obj() == lockee) break;
most_recent++;
}
if (entry != NULL) { //找到entry即可用的最高位的Lock Record
entry->set_obj(lockee); //Lock Record的obj指向锁对象
int success = false; //标记位: 用于确定是否需要升级锁为轻量级锁
uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
markOop mark = lockee->mark();
intptr_t hash = (intptr_t) markOopDesc::no_hash;
/**
1. 可以使用偏向锁
*/
if (mark->has_bias_pattern()) {
uintptr_t thread_ident;
uintptr_t anticipated_bias_locking_value;
thread_ident = (uintptr_t)istate->thread();
anticipated_bias_locking_value =
(((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
~((uintptr_t) markOopDesc::age_mask_in_place);
/**
* 1.2 : 当前偏向线程是自己,什么都不做,设置success = true,防止进入下方锁升级
*/
if (anticipated_bias_locking_value == 0) {
// already biased towards this thread, nothing to do
if (PrintBiasedLockingStatistics) {
(* BiasedLocking::biased_lock_entry_count_addr())++;
}
success = true;
}
//批量重偏向 klass()->prototype_header不支持偏向锁
else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
// try revoke bias
markOop header = lockee->klass()->prototype_header();
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
if (PrintBiasedLockingStatistics)
(*BiasedLocking::revoked_lock_entry_count_addr())++;
}
}
/**
* 1.3 当前偏向锁的epoch失效,则重新偏向
*/
else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
// 1.3.1 构造一个偏向当前线程切epoch为class最新的mark word
markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
if (hash != markOopDesc::no_hash) {
new_header = new_header->copy_set_hash(hash);
}
// 1.3.2 CAS 替换 mark word
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
if (PrintBiasedLockingStatistics)
(* BiasedLocking::rebiased_lock_entry_count_addr())++;
}
else {
/**
* /替换ThreadID失败,有两种可能:
1. 撤销原有偏向ID,重新设置
2. 有竞争,升级锁,当下方success=true 所以在monitorenter中升级的
*/
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
else {
/**
* 1.1 当前无锁或 1.3 当前偏向其他线程
*/
markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |
(uintptr_t)markOopDesc::age_mask_in_place |
epoch_mask_in_place));
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
/**
* 1. 构造一个偏向当前线程的mark Word
2. 设置首位的Lock Record lock即BasicLock为线程ID默认 0xdeaddead
3. 使用CAS替换操作
*/
markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
// debugging hint
DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
if (PrintBiasedLockingStatistics)
(* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
}
else {
/**
* 替换ThreadID失败,有两种可能:
1. 撤销原有偏向ID,重新设置
2. 有竞争,升级锁,当下方success=true 所以在monitorenter中升级的
*/
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
}
-
偏向锁示意图:
image -
整个过程示意图:
image
偏向锁撤销
- 在获取偏向锁的过程因为不满足条件导致要将锁对象改为非偏向锁状态
- 调取过程在 InterpreterRuntime::monitorenter -> 开启JVM偏向锁 ObjectSynchronizer::fast_enter -> java线程 BiasedLocking::revoke_and_rebias
static BiasedLocking::Condition revoke_bias(oop obj, bool allow_rebias, bool is_bulk, JavaThread* requesting_thread, JavaThread** biased_locker) {
markOop mark = obj->mark();
//如果不是偏向模式,返回
if (!mark->has_bias_pattern()) {
return BiasedLocking::NOT_BIASED;
}
//创建两个mark word,一个是匿名偏向模式(101),一个是无锁模式(001)
uint age = mark->age();
markOop biased_prototype = markOopDesc::biased_locking_prototype()->set_age(age);
markOop unbiased_prototype = markOopDesc::prototype()->set_age(age);
...
JavaThread* biased_thread = mark->biased_locker();
if (biased_thread == NULL) {
// 匿名偏向。当调用锁对象的hashcode()方法可能会导致走到这个逻辑
// 如果不允许重偏向,则将对象的mark word设置为无锁模式
if (!allow_rebias) {
obj->set_mark(unbiased_prototype);
}
return BiasedLocking::BIAS_REVOKED;
}
//0. 判断偏向线程是否还存活
bool thread_is_alive = false;
//0.1 如果当前线程就是偏向线程
if (requesting_thread == biased_thread) {
thread_is_alive = true;
} else {
// 0.2 遍历当前jvm的所有线程,如果能找到,则说明偏向的线程还存活
for (JavaThread* cur_thread = Threads::first(); cur_thread != NULL; cur_thread = cur_thread->next()) {
if (cur_thread == biased_thread) {
thread_is_alive = true; //偏向线程还在
break;
}
}
}
/**
* 1. 如果偏向线程不存在了
*/
if (!thread_is_alive) {
// 1.1 允许重偏向则将对象mark word设置为匿名偏向状态101,否则设置为无锁状态001
if (allow_rebias) {
obj->set_mark(biased_prototype);
} else {
obj->set_mark(unbiased_prototype);
}
return BiasedLocking::BIAS_REVOKED;
}
/**
* 2. 如果偏向线程存在,则遍历该线程栈中所有的Lock Record
*/
GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread); //获取线程中的栈 All Lock Record
BasicLock* highest_lock = NULL;
for (int i = 0; i < cached_monitor_info->length(); i++) { //从上向下遍历
MonitorInfo* mon_info = cached_monitor_info->at(i);
/**
* 2.1 判断当前Lock Record 的 owner是否指向锁对象,如果指向说明当前线程在偏向,这个时候需要升级为轻量级锁啦
*/
if (TraceBiasedLocking && Verbose) {
tty->print_cr(" mon_info->owner (" PTR_FORMAT ") == obj (" PTR_FORMAT ")",
p2i((void *) mon_info->owner()),
p2i((void *) obj));
}
/** 2.2 低位lock 直接修改偏向线程栈中的Lock Record。为了处理锁重入的case,在这里将Lock Record的Displaced Mark Word设置为null,
* 第一个Lock Record会在下面的代码中再处理
*/
markOop mark = markOopDesc::encode((BasicLock*) NULL);
highest_lock = mon_info->lock(); //
highest_lock->set_displaced_header(mark);
} else {
...
}
}
/**
* 2.3 对高位进行处理,此时Displaced Mark Word即lock不能在设置成null
*/
if (highest_lock != NULL) {
// 2.3.1 修改第一个Lock Record为unbiased_prototype无锁状态,
highest_lock->set_displaced_header(unbiased_prototype);
//2.3.2 将锁对象obj obj的mark word设置为指向该Lock Record的指针
obj->release_set_mark(markOopDesc::encode(highest_lock));
assert(!obj->mark()->has_bias_pattern(), "illegal mark state: stack lock used bias bit");
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of currently-locked object");
}
} else {
/**
* 3. 走到这里说明偏向线程已经不在同步块中了
*/
if (allow_rebias) {
//3.1 设置为匿名偏向状态
obj->set_mark(biased_prototype);
} else {
// 3.2 设置成无锁状态
obj->set_mark(unbiased_prototype);
}
}
- 查看偏向的线程是否存活,如果已经不存活了,则直接撤销偏向锁。JVM维护了一个集合存放所有存活的线程,通过遍历该集合判断某个线程是否存活。
- 偏向的线程是否还在同步块中,如果不在了,则撤销偏向锁。我们回顾一下偏向锁的加锁流程:每次进入同步块(即执行monitorenter)的时候都会以从高往低的顺序在栈中找到第一个可用的Lock Record,将其obj字段指向锁对象。每次解锁(即执行monitorexit)的时候都会将最低的一个相关Lock Record移除掉。所以可以通过遍历线程栈中的Lock Record来判断线程是否还在同步块中。
- 将偏向线程所有相关Lock Record的Displaced Mark Word设置为null,然后将最高位的Lock Record的Displaced Mark Word 设置为无锁状态,最高位的Lock Record也就是第一次获得锁时的Lock Record(这里的第一次是指重入获取锁时的第一次),然后将对象头指向最高位的Lock Record,这里不需要用CAS指令,因为是在safepoint。 执行完后,就升级成了轻量级锁。原偏向线程的所有Lock Record都已经变成轻量级锁的状态。
偏向锁的释放
- 释放过程相对比较简单
CASE(_monitorexit): {
oop lockee = STACK_OBJECT(-1);
CHECK_NULL(lockee);
// derefing's lockee ought to provoke implicit null check
// find our monitor slot
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
// 从低往高遍历栈的Lock Record
while (most_recent != limit ) {
// 如果Lock Record关联的是该锁对象
if ((most_recent)->obj() == lockee) {
BasicLock* lock = most_recent->lock();
markOop header = lock->displaced_header();
// 释放Lock Record
most_recent->set_obj(NULL);
// 如果是偏向模式,仅仅释放Lock Record就好了。否则要走轻量级锁or重量级锁的释放流程
if (!lockee->mark()->has_bias_pattern()) {
bool call_vm = UseHeavyMonitors;
// header!=NULL说明不是重入,则需要将Displaced Mark Word CAS到对象头的Mark Word
if (header != NULL || call_vm) {
if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
// CAS失败或者是重量级锁则会走到这里,先将obj还原,然后调用monitorexit方法
most_recent->set_obj(lockee);
CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
}
}
}
//执行下一条命令
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
}
//处理下一条Lock Record
most_recent++;
}
// Need to throw illegal monitor state exception
CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
ShouldNotReachHere();
}
- 偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动去释放偏向锁。
- 偏向锁的撤销,需要等待全局安全点,它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态,撤销偏向锁后恢复到未锁定(标志位为“01”)或轻量级锁(标志位为“00”)的状态
- -XX:+PrintGCApplicationStoppedTime -XX:+PrintSafepointStatistics -XX:PrintSafepointStatisticsCount=1 查看停顿时间详细信息
- JDK1.6以后默认开启偏向锁UseBiasedLocking,对于高并发提升效率-XX:-UseBiasedLocking
轻量级锁
轻量级锁加锁
- 入口函数同上
CASE(_monitorenter): { oop lockee = STACK_OBJECT(-1); ... if (entry != NULL) { ... // 上面省略的代码中如果CAS操作失败也会调用到InterpreterRuntime::monitorenter if (!success) { // 构建一个无锁状态的Displaced Mark Word markOop displaced = lockee->mark()->set_unlocked(); // 设置到Lock Record中去 entry->lock()->set_displaced_header(displaced); bool call_vm = UseHeavyMonitors; if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) { // 如果CAS替换不成功,代表锁对象不是无锁状态,这时候判断下是不是锁重入 // Is it simple recursive case? if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) { entry->lock()->set_displaced_header(NULL); } else { // CAS操作失败则调用monitorenter CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception); } } } UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1); } else { istate->set_msg(more_monitors); UPDATE_PC_AND_RETURN(0); // Re-execute } }
- 构建一个无锁的mark Work 设置到Lock Record 中lock中去
- CAS替换锁对象的Mark word为指向1中的Lock Record,成功轻量级锁完成
- 如果失败,两种情况
- 是否是锁重入,如果是则设置lock为null
- 否则,调取monitorenter 自旋锁或者升级锁
-
过程示意图
image
轻量级解锁
- 解锁过程由下往上查找当前线程的Lock Record 中owner为锁对象后判断BasicLock
- 如果BasicLock == null ,表示一个锁重入,当前线程还在同步块中,不做操作
- 如果BasicLock有值,即为锁对象指向的Display Mark Word ,则表示这时候需要真正释放锁,将DisPlay Mark Word替换回锁对象的Mark Word中去,如果失败,则调用InterpreterRuntime::monitorexit
- 总结:轻量级锁解锁时,把复制的对象头替换回去(cas)如果替换成功(就是要把无所的状态放回去给对象头,之后锁继续被拿还是轻量级锁,但是如果锁已经是重量级锁了,那么就失败,之后锁就是重量级的锁了),锁结束,之后别的线程来拿还是轻量级锁,如果失败,说明已有竞争,释放锁,此时把对象头设为重量级锁,并notify 唤醒其他等待线程。
重量级锁
重量级锁加锁
- 入口函数,当存在竞争时,调用ObjectSynchronizer::slow_enter() 最后一行
//需要膨胀为重量级锁,膨胀前,设置Displaced Mark Word为一个特殊值,代表该锁正在用一个重量级锁的monitor
lock->set_displaced_header(markOopDesc::unused_mark());
//先调用inflate膨胀为重量级锁,该方法返回一个ObjectMonitor对象,然后调用其enter方法
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
- 锁膨胀,即获取一个ObjectMonitor对象inflate()
//结构体如下objectMonitor.hpp中定义
ObjectMonitor::ObjectMonitor() {
_header = NULL; //对象头的Mark Word
_count = 0;
_waiters = 0,
_recursions = 0; //线程的重入次数
_object = NULL;
_owner = NULL; //标识拥有该monitor的线程
_WaitSet = NULL; //等待线程组成的双向循环链表,_WaitSet是第一个节点
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ; //多线程竞争锁进入时的单向链表
FreeNext = NULL ;
_EntryList = NULL ; //_owner从该双向循环链表中唤醒线程结点,_EntryList是第一个节点
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
}
inflate膨胀
- inflate膨胀有四种情况:
- Inflated(重量级锁状态) - 直接返回
- INFLATING(膨胀中) - 忙等待直到膨胀完成
- Stack-locked(轻量级锁状态) - 膨胀
- Neutral(无锁状态) - 膨胀
- Inflated 重量级锁状态: 直接返回
/**
* 1. 已经是重量级锁了,直接返回obj ->mark word指向的monitor对象
*/
if (mark->has_monitor()) {
ObjectMonitor * inf = mark->monitor() ;
return inf ;
}
- INFLATING 其他线程正在膨胀中,则当前线程等待
/**
* 2. 有其他线程正在膨胀中,这个时候忙等待后continue
*/
if (mark == markOopDesc::INFLATING()) {
TEVENT (Inflate: spin while INFLATING) ;
//3.1 在该方法中会进行spin/yield/park等操作完成自旋动作
ReadStableMark(object) ;
continue ;
}
- Stack-locked(轻量级锁状态升级) - 膨胀
/**
* 3. 轻量级锁升级
*/
if (mark->has_locker()) {
// 3.1 omAlloc从当前线程的Self->omFreeList的 ObjectMonitor数组中获取,成功加入全局中管理,否则从全局中获取
ObjectMonitor * m = omAlloc (Self) ;
// 初始化ObjectMonitor对象
m->Recycle();
m->_Responsible = NULL ;
m->OwnerIsThread = 0 ;
m->_recursions = 0 ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // Consider: maintain by type/class
// 将锁对象的mark word设置为INFLATING (0)状态
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
if (cmp != mark) {
omRelease (Self, m, true) ;
continue ; // Interference -- just retry
}
// 栈中的displaced mark word
markOop dmw = mark->displaced_mark_helper() ;
// monitor中的header设置为Displaced mark Word
m->set_header(dmw) ;
//将原有的Lock Record设置给monitor的owner
m->set_owner(mark->locker());
//monitor的obj设置锁对象
m->set_object(object);
//将锁对象头设置为重量级锁状态
object->release_set_mark(markOopDesc::encode(m));
...
return m ;
}
- 无锁状态
/**
* 4. 无锁状态
*/
//获取一个ObjectMonitor对象
ObjectMonitor * m = omAlloc (Self) ;
// 初始化操作
m->Recycle();
m->set_header(mark); //设置header为mark Word
m->set_owner(NULL); //Lock Record不存在
m->set_object(object);
m->OwnerIsThread = 1 ;
m->_recursions = 0 ;
m->_Responsible = NULL ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // consider: keep metastats by type/class
//用CAS替换对象头的mark word为重量级锁状态
if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {
m->set_object (NULL) ;
m->set_owner (NULL) ;
m->OwnerIsThread = 0 ;
m->Recycle() ;
omRelease (Self, m, true) ;
m = NULL ;
continue ;
}
...
return m ;
}
获取锁->enter(THREAD)
- 在ObjectMonitor::enter()四种情况
- 无锁直接可以获取到
- 重入情况
- 轻量级升级情况
- 重量级锁竞争情况
void ATTR ObjectMonitor::enter(TRAPS) {
Thread * const Self = THREAD ;
void * cur ;
/**
* 1. owner为null代表无锁状态,如果能CAS设置成功,则当前线程直接获得锁
*/
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
if (cur == NULL) {
...
return ;
}
/**
* 2. 如果是重入的情况
*/
if (cur == Self) {
// TODO-FIXME: check for integer overflow! BUGID 6557169.
_recursions ++ ;
return ;
}
/**
* 3. 当前线程是之前持有轻量级锁的线程。由轻量级锁膨胀且第一次调用enter方
* 法,那cur是指向Lock Record的指针
*/
if (Self->is_lock_owned ((address)cur)) {
assert (_recursions == 0, "internal state error");
// 重入计数重置为1
_recursions = 1 ;
// 设置owner字段为当前线程(之前owner是指向Lock Record的指针)
_owner = Self ;
OwnerIsThread = 1 ;
return ;
}
...
// 在调用系统的同步操作之前,先尝试自旋获得锁
if (Knob_SpinEarly && TrySpin (Self) > 0) {
...
//自旋的过程中获得了锁,则直接返回
Self->_Stalled = 0 ;
return ;
}
...
{
...
/**
* 4. 重量级锁竞争情况
*/
for (;;) {
jt->set_suspend_equivalent();
// 在该方法中调用系统同步操作,真正的竞争锁
EnterI (THREAD) ;
...
}
Self->set_current_pending_monitor(NULL);
}
...
}
重量级锁竞争
-
首先分析objectMonitor中的元素: cxq(下图中的ContentionList),EntryList ,WaitSet,owner, 前三个是由ObjectWaiter的链表结构, owner是当前持有锁的线程
image - ObjectWaiter结构
class ObjectWaiter : public StackObj {
public:
enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ } ;
enum Sorted { PREPEND, APPEND, SORTED } ;
ObjectWaiter * volatile _next;
ObjectWaiter * volatile _prev;
Thread* _thread;
jlong _notifier_tid;
ParkEvent * _event;
volatile int _notified ;
volatile TStates TState ;
Sorted _Sorted ; // List placement disposition
bool _active ; // Contention monitoring is enabled
public:
ObjectWaiter(Thread* thread); //对应获取锁的线程
void wait_reenter_begin(ObjectMonitor *mon);
void wait_reenter_end(ObjectMonitor *mon);
};
- 重量级加锁过程
void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
...
// 尝试获得锁
if (TryLock (Self) > 0) {
...
return ;
}
DeferredInitialize () ;
// 自旋
if (TrySpin (Self) > 0) {
...
return ;
}
...
// 将线程封装成node节点ObjectWaiter中
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;
/**
* 1. 将node节点插入到_cxq队列的头部,cxq是一个单向链表
*/
ObjectWaiter * nxt ;
for (;;) {
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// CAS失败的话 再尝试获得锁,这样可以降低插入到_cxq队列的频率
if (TryLock (Self) > 0) {
...
return ;
}
}
// SyncFlags默认为0,如果没有其他等待的线程,则将_Responsible设置为自己
if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
TEVENT (Inflated enter - Contention) ;
int nWakeups = 0 ;
int RecheckInterval = 1 ;
for (;;) {
if (TryLock (Self) > 0) break ;
assert (_owner != Self, "invariant") ;
...
// park self
if (_Responsible == Self || (SyncFlags & 1)) {
// 当前线程是_Responsible时,调用的是带时间参数的park
TEVENT (Inflated enter - park TIMED) ;
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
// Increase the RecheckInterval, but clamp the value.
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
//否则直接调用park挂起当前线程
TEVENT (Inflated enter - park UNTIMED) ;
Self->_ParkEvent->park() ;
}
if (TryLock(Self) > 0) break ;
...
if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;
...
/**
* 2. 在当前线程释放锁时,_succ会被设置为EntryList或_cxq中的一个线程:表示假定继承人
* _succ = Knob_SuccEnabled ? Wakee->_thread : NULL ; 在ExitEpilog中设置
* ExitEpilog(w) 调用w = _cxq 或者 w = _EntryList
*/
if (_succ == Self) _succ = NULL ;
// Invariant: after clearing _succ a thread *must* retry _owner before parking.
OrderAccess::fence() ;
}
// 走到这里说明已经获得锁了
assert (_owner == Self , "invariant") ;
assert (object() != NULL , "invariant") ;
// 将当前线程的node从cxq或EntryList中移除
UnlinkAfterAcquire (Self, &node) ;
if (_succ == Self) _succ = NULL ;
if (_Responsible == Self) {
_Responsible = NULL ;
OrderAccess::fence();
}
...
return ;
}
- 当一个线程尝试获得锁时,如果该锁已经被占用,则会将该线程封装成一个ObjectWaiter对象插入到cxq的队列的队首,然后调用park函数挂起当前线程。
- 当线程释放锁时,会从cxq或EntryList中挑选一个线程唤醒,被选中的线程叫做Heir presumptive即假定继承人(Ready Thread),假定继承人被唤醒后会尝试获得锁,但synchronized是非公平的,所以假定继承人不一定能获得锁;
- 如果线程获得锁后调用Object#wait方法,则会将线程加入到WaitSet中
- 当被Object#notify唤醒后,会将线程从WaitSet移动到cxq或EntryList中去。
- 调用一个锁对象的wait或notify方法时,如当前锁的状态是偏向锁或轻量级锁则会先膨胀成重量级锁。
重量级锁解锁
- 解锁包括: 1. 当前是可重入锁 2. 正常释放锁
- 可重入锁
// 重入计数器还不为0,则计数器-1后返回
if (_recursions != 0) {
_recursions--; // this is simple recursive enter
TEVENT (Inflated exit - recursive) ;
return ;
}
- 正常释放锁,可能需要唤醒其他线程
if (Knob_ExitPolicy == 0) { //Knob_ExitPolicy默认为0
/**
* 1. synchronized非公平锁原因: 当前线程先释放锁,这时如果有其他线程进入同步块则能获得锁,不管刚刚上面的
假定继承人及_succ是否存在,因此它是非公平锁
*/
OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
//内存屏障,是否需要唤醒继任者
OrderAccess::storeload() ; //See if we need to wake a successor
/**
* 2. 如果没有等待线程_EntryList和_cxq均为空,或者_succ假定继承人存在,这个时候是没有竞争的,可以运行,不需要唤醒,直接返回就好了
*/
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
TEVENT (Inflated exit - simple egress) ;
return ;
}
TEVENT (Inflated exit - complex egress) ;
/**
* 3. 走到这里表示,当前_EntryList ,_cxq不为空,且没有假定继承人,需要唤醒操作了
* 而要执行之后的操作就需要重新获得锁,即设置_owner为当前线程
*/
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
return ;
}
...
/**
* 4. 根据不同的QMode执行不同的唤醒策略,默认为0
*/
ObjectWaiter * w = NULL ;
// code 4:根据QMode的不同会有不同的唤醒策略,默认为0
int QMode = Knob_QMode ;
if (QMode == 2 && _cxq != NULL) {
/**
* QMode == 2 : cxq中的线程有更高优先级,直接唤醒cxq的队首线程,返回了,最终唤醒的就是这个队首线程_succ == w
*/
w = _cxq ;
assert (w != NULL, "invariant") ;
assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
ExitEpilog (Self, w) ;
return ;
}
f (QMode == 3 && _cxq != NULL) {
/**
* QMode == 3: 将cxq中的元素插入到EntryList的末尾,继续向下执行程序
*/
...
}
if (QMode == 4 && _cxq != NULL) {
/**
* QMode == 4: 将cxq插入到EntryList的队首
*/
...
}
/**
* 默认QMode == 0 : 什么也不做
*/
/**
* 3. 如果_EntryList不为null ,取队首w ,唤醒ObjectWaiter对象的线程,然后立即返回
*/
w = _EntryList ;
if (w != NULL) {
assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
/**
* 4. 如果EntryList的首元素为空,就将cxq的所有元素放入到EntryList中
* 然后再从EntryList中取出来队首元素执行ExitEpilog方法,然后立即返回
*
* 使用w保存_cxq队列后,将_cxq设置null
*/
w = _cxq ;
if (w == NULL) continue ;
// Drain _cxq into EntryList - bulk transfer.
// First, detach _cxq.
// The following loop is tantamount to: w = swap (&cxq, NULL)
for (;;) {
assert (w != NULL, "Invariant") ;
//下方用到w替代_cxq
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
if (QMode == 1) {
// QMode == 1 : 将cxq中的元素转移到EntryList,并反转顺序
...
} else {
// QMode == 0 or QMode == 2‘
// 将cxq中的元素转移到EntryList
_EntryList = w ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
//如果当前有个新增的假定继承人,所以不需要当前线程去唤醒,以减少上下文切换的比率
if (_succ != NULL) continue;
/**
* 4.1 将上方有cxq转移到_EntryList首位唤醒即可
*/
w = _EntryList ;
if (w != NULL) {
guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
//ExitEpilog用于唤醒线程操作
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
assert (_owner == Self, "invariant") ;
/**
* 假定继承人的设置,查看调用方法在解锁exit中设置w = _EntryList
* 只有QMode == 2 时 w = _cxq 中取
*/
_succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
ParkEvent * Trigger = Wakee->_event ;
Wakee = NULL ;
//如果有其他线程进来,将会直接运行,不会唤醒操作了
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::fence() ;
if (SafepointSynchronize::do_call_back()) {
TEVENT (unpark before SAFEPOINT) ;
}
DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self);
Trigger->unpark() ; //唤醒线程
// Maintain stats and report events to JVMTI
if (ObjectMonitor::_sync_Parks != NULL) {
ObjectMonitor::_sync_Parks->inc() ;
}
}
- 释放锁,这个时刻其他的线程能获取到锁,synchronized是非公平锁的原因
- 如果当前没有等待的线程或succ != null 有一个假定继承人存在(可以运行它) ,则直接返回就好了,因为不需要唤醒其他线程。
- 当前线程重新获得锁,因为之后要操作cxq和EntryList队列以及唤醒线程
- 根据QMode的不同,会执行不同的唤醒策略: 默认为0
- QMode = 2且cxq非空:取cxq队列队首的ObjectWaiter对象,调用ExitEpilog方法,该方法会唤醒ObjectWaiter对象的线程,然后立即返回,后面的代码不会执行了;
- QMode = 3且cxq非空:把cxq队列插入到EntryList的尾部;
- QMode = 4且cxq非空:把cxq队列插入到EntryList的头部;
- QMode = 0:默认,暂时什么都不做,继续往下看;
- 如果EntryList的首元素非空,就取出来调用ExitEpilog方法,该方法会唤醒ObjectWaiter对象的线程,然后立即返回;
- 如果EntryList的首元素为空,就将cxq的所有元素放入到EntryList中,然后再从EntryList中取出来队首元素执行ExitEpilog方法,然后立即返回;
wait ,notify唤醒操作
- wait将当前Thread构造ObjectWaiter后通过AddWaiter加入_waitSet队列
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
// 从waitSet头部添加
if (_WaitSet == NULL) {
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
ObjectWaiter* head = _WaitSet ;
ObjectWaiter* tail = head->_prev;
assert(tail->_next == head, "invariant check");
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
}
- notify唤醒wait
//获取等待队列中一个ObjectWaiter
ObjectWaiter * iterator = DequeueWaiter() ; //也是从头部取值呀
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
// dequeue the very first waiter
ObjectWaiter* waiter = _WaitSet;
if (waiter) {
DequeueSpecificWaiter(waiter);
}
return waiter;
}
...
int Policy = Knob_MoveNotifyee ; //默认为2
ObjectWaiter * List = _EntryList ;
if (Policy == 2) { // prepend to cxq
// 如果_EntryList为null,将唤醒的加入_EntryList很大几率运行
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else { //否则加入_cxq队首,等待_EntryList为null后加入运行,QMode =默认0时
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
}
}
- 加入等待队列从队首添加,取出也是从队首,notify只唤醒一个添加到_cxq或者_EntryList中,notifyAll()逻辑一致,只是使用for循环获取waitSet中所有数据添加而已
思考
- 对于下方代码,请您分析其执行顺序:
public class SyncDemo {
public static void main(String[] args) throws InterruptedException {
SyncDemo syncDemo = new SyncDemo();
syncDemo.startThreadA();
Thread.sleep(100);
syncDemo.startThreadB();
Thread.sleep(100);
syncDemo.startThreadC();
}
final Object lock = new Object();
public void startThreadA() {
new Thread(() -> {
synchronized (lock) {
System.out.println("ThreadA get lock");
try {
Thread.sleep(500);
} catch (Exception e) {}
System.out.println("ThreadA release lock");
}
}, "thread-A").start();
}
public void startThreadB() {
new Thread(() -> {
synchronized (lock) {
System.out.println("ThreadB get lock");
}
}, "thread-B").start();
}
public void startThreadC() {
new Thread(() -> {
synchronized (lock) {
System.out.println("ThreadC get lock");
}
}, "thread-C").start();
}
}
//最终打印结果一定是 A -> C -> B
下载链接
网友评论