本章节内容主要参考《深入理解Android Java虚拟机ART》进行研究,所参考ART源码版本为 Android 10,如有问题请及时指出;
1. synchronized实现原理
分5层逐步解析synchronized的工作原理:Java代码实现,字节码实现,JVM实现,操作系统实现,汇编实现;
1.1 Java代码实现
- sychronized的定义及使用场景:
Java提供的一种支持原子性的锁机制,可以保证多线程并发访问共享数据时是互斥的;sychronized是互斥锁,同一时刻最多只有一个线程能持有这个锁,所以被sychronized修饰的代码块可以以不可分割的方式被执行;由于synchronized机制悲观地认为,自己在使用数据的时候一定有其他线程来修改数据,所以synchronized是一种悲观锁;注释:锁定的是对象而不是代码:执行某一段代码时必须锁定某个对象;
- 示例代码:
private static Thread[] threads = new Thread[100]; private static int count = 0; public static void main(String[] args) { Object o = new Object(); Runnable runnable = () -> { // synchronized (o) { for (int i = 0; i < 100_0000; i++) { count++; } // } }; for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(runnable); } for (Thread t : threads) { t.start(); } try { for (Thread t : threads) { t.join(); } } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("lewis : count is " + count); }
如果不加synchronized修饰,打印出来的count!=100*100_0000;发生这种多线程并发访问出现不安全问题的原因涉及到Java线程内存模型;
- 简单了解一下Java线程的内存模型,这里涉及到主存和工作内存的概念:
-
CPU硬件结构如图所示:
CPU硬件结构.png
-
CPU在读取运算所需的数据、存储运算得到的数据结果等操作的时候,会涉及到与内存的读写交互,但CPU运算速度远高于内存的读写速度,为了解决这种速度不匹配的情况,增加了多级高速缓存;这样,运算前,将数据复制到高速缓存,运算后,将结果同步写回主存(对于JVM来说,这个主存是指Java堆内存);
-
CPU在计算时,并不总是从主存中读数据(数据是指共享变量,存在线程间共享和竞争的关系),而是通过从多级缓存中拿数据,从而提高执行效率。读取数据的优先级是:寄存器 > 高速缓存(多级) > 主存;
-
所以线程的工作内存是个抽象概念,指的是寄存器和(多级)高速缓存,线程的工作内存中保存的是主存中数据的备份;
-
当多线程访问主存中的某个经常被访问的数据时,是先从线程的工作内存中读写数据,然后在某个合适的时候再写回主存中,所以每个工作线程中的副本变量不一定是最新的(对其他线程是不可见的),这就是上面多线程读写count字段出现线程不安全的原因;
Java线程模型.png
-
所以需要加锁保证多线程间数据同步,简单说一下(后续介绍),volatile字段的作用只能保证数据的可见性和有序性,即:线程A对volatile count字段执行写操作后,会立即将count值写回到主存中,并通知其他线程count值发生变化,这样count对其他线程就是可见的了,但是volatile并不会保证操作的原子性;
-
- sychronized的4种使用方式:
public class Foo { private int count = 0; private Object o = new Object(); public static synchronized void t1() { // 静态方法加synchronized和t2()的方式争用的都是类锁,和对象锁唯一的区别就是,内存中类锁只有一把; } public void t2() { synchronized (Foo.class) { } } public synchronized void t3() { } public void t4() { synchronized (this) { } } public void t5() { // 如果使用下面的o对象,多线程并发访问t5(),结果是什么? // Object o = new Object(); synchronized (o) { count++; } } }
1.2 字节码实现
- 可以通过
javac Foo.java
命令生成.class文件,然后通过javap -v Foo.class
将字节码反编译后显示在terminal中,或者使用idea的插件ASM Bytecode Viewer: - 下面是t1(),t2(),t3()和t5()的字节码:
public static synchronized void t1();
descriptor: ()V
// JVM会解析方法的符号引用,然后根据flags去锁定(类或者对象)
flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
Code:
stack=0, locals=0, args_size=0
0: return
LineNumberTable:
line 12: 0
public void t2();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=2, locals=3, args_size=1
0: ldc #5 // class juc/Foo
2: dup
3: astore_1
4: monitorenter
5: aload_1
6: monitorexit
7: goto 15
10: astore_2
11: aload_1
12: monitorexit
13: aload_2
14: athrow
15: return
Exception table:
from to target type
5 7 10 any
10 13 10 any
LineNumberTable:
line 15: 0
line 16: 5
line 17: 15
StackMapTable: number_of_entries = 2
frame_type = 255 /* full_frame */
offset_delta = 10
locals = [ class juc/Foo, class java/lang/Object ]
stack = [ class java/lang/Throwable ]
frame_type = 250 /* chop */
offset_delta = 4
public synchronized void t3();
descriptor: ()V
flags: ACC_PUBLIC, ACC_SYNCHRONIZED
Code:
stack=0, locals=1, args_size=1
0: return
LineNumberTable:
line 20: 0
public void t5();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=3, locals=3, args_size=1
// 偏移量: 指令
// 将局部变量表中第一个局部变量的引用推到栈顶,对于非static方法,aload_0表示this
0: aload_0
1: getfield #3 // Field o:Ljava/lang/Object;
// dup是复制栈顶元素,并压入栈顶
4: dup
// 0~5表示:将this对象入栈,使用duplicate指令复制栈顶元素并放入局部变量表位置1的地方,此时当前栈帧(方法执行的数据结构)的操作数栈(后进先出)中只有一个元素this
5: astore_1
// synchronized的具体实现由monitorenter和monitorexit指令完成
6: monitorenter
7: aload_0
8: dup
9: getfield #4 // Field count:I
12: iconst_1
13: iadd
14: putfield #4 // Field count:I
17: aload_1
18: monitorexit
19: goto 27
// 以上是正常执行完方法的步骤,下面是编译器自动生成的异常处理步骤,必须保证不管正常执行完毕还是异常退出方法,都必须释放锁
22: astore_2
23: aload_1
24: monitorexit
25: aload_2
26: athrow
27: return
Exception table:
from to target type
7 19 22 any
22 25 22 any
LineNumberTable:
line 29: 0
line 30: 7
line 31: 17
line 32: 27
StackMapTable: number_of_entries = 2
frame_type = 255 /* full_frame */
offset_delta = 22
locals = [ class juc/Foo, class java/lang/Object ]
stack = [ class java/lang/Throwable ]
frame_type = 250 /* chop */
offset_delta = 4
1.3 JVM(ART)实现
-
在进入使用synchronized关键字的代码块时,会生成一条monitorenter指令,退出该代码块时生成一条monitorexit指令;
-
在ART虚拟机中,关于线程同步涉及到的类如下图所示,真正实现线程同步功能的是Monitor类(art/runtime/monitor.cc文件中定义);
ART中线程同步相关类.png
- Monitor::monitor_lock_:类型为Mutex(art/runtime/base/Mutex.h),Mutex是ART虚拟机对Linux操作系统提供的同步机制的封装,后面介绍;
- Monitor::monitor_id_:类型为32位的无符号整型,每个Monitor对象都有一个id,这个id字段比较重要,后期就是通过这个id从Monitor统一的管理类MonitorPool获取真正的Monitor对象的;
- Monitor::obj_:类型为GcRoot<mirror::Object>,每个Monitor对象都会关联一个Object对象,但不一定每个Object都关联一个Monitor对象【1】,后面解释;
- MonitorPool:Monitor和OS关联紧密(因为调用了系统提供的同步方法),所以每个Monitor很重要,由专门的MonitorPool管理其内存分配和回收;
- MonitorList:ART虚拟机用来管理虚拟机进程中分配的Monitor对象,可以看作一个list容器,这两个类是不参与线程同步的,了解一下即可;
- Object::monitor_:类型为32位无符号整型,这个monitor_对象指向的不是Monitor对象,而是一个LockWord对象;
- LockWord::value_:类型为32位无符号整型,存储着关于锁的信息【2】,后面介绍;
-
先看问题【1】,既然Monitor对象是和Object对象关联的,为什么Object.monitor_指向的却不是真正的Monitor对象呢?
- Java层每个对象都可以作为同步锁,对应ART虚拟机层就是每个Object对象都包含一个可作为"同步锁"的成员变量;
- 而Object::monitor_指向了一个LockWord对象?这是因为Monitor和OS关联紧密,属于重型资源,内存中有很多Object对象,但只有很少一部分被作为线程同步锁使用,所以不能每个Object对象都分配一个真正的Monitor对象;所以ART设计了LockWord类,代表Object对象中的"锁";
-
再看问题【2】,LockWord是干什么的。LockWord本身所需内存很小,只有一个成员变量
uint32_t value_
,即Object::monitor_只有32位,而这个value_包含了多种信息。当synchronized修饰的代码块只有单个线程访问时,monitor_起到轻量级保护作用(后面解释),此时monitor_字段中保存的锁的类型是"Thin Lock";当多个线程访问synchronized代码块时,monitor_字段中的锁类型标记位就需要膨胀成"Fat Lock"类型,而胖锁需要使用系统调用来确保多线程并发访问的正确性,这个膨胀的过程就是关联一个Monitor对象,通过Monitor对象实现多线程同步;LockWord的主要代码如下:class LockWord { public: enum SizeShiftsAndMasks : uint32_t { ... // 锁的类型:瘦、胖锁等 kStateThinOrUnlocked = 0, kStateFat = 1, kStateHash = 2, kStateForwardingAddress = 3, ... }; static LockWord FromThinLockId(uint32_t thread_id, uint32_t count, uint32_t gc_state) { ... } ... static LockWord FromDefault(uint32_t gc_state) { return LockWord(gc_state << kGCStateShift); } static LockWord Default() { // 默认创建一个状态为kUnlocked的瘦锁 return LockWord(); } // 锁的状态:表明是上锁还是没上锁 enum LockState { kUnlocked, // No lock owners. kThinLocked, // Single uncontended owner. kFatLocked, // See associated monitor. ... }; ... private: ... // The encoded value holding all the state. uint32_t value_; }; } // namespace art
-
value_字段中不同bit保存不同的信息,具体看如下表格:
锁类型\bit范围 31-30 29-28 27-16 15-0 kStateThinOrUnlocked 00 rb Lock Count Thread Id Owner kStateFat 01 rb Monitor Id Monitor Id kStateHash 10 rb Hash Code Hash Code kStateForwardingAddress 11 Forwarding Address Forwarding Address Forwarding Address - 31-30位:表示锁的类型,目前只需要关注前两种即可;
- kStateThinOrUnlocked类型:29-28位包含一个Read Barrier;27-16位是一个计数器(lock count),最大4096;15-0位是持有当前锁的线程ID;
- kStateFat类型:27-0位是Monitor Id;
-
前面提到,默认创建的类型是瘦锁,而瘦锁是可以通过一系列步骤转换为胖锁的,具体的操作在
Monitor::Inflate
函数中,该函数实现在art/runtime/monitor.cc文件中:void Monitor::Inflate(Thread* self, Thread* owner, ObjPtr<mirror::Object> obj, int32_t hash_code) { // 该函数的目标是将obj对象中的monitor_(LockWord类型)描述的瘦锁,转化为一个胖锁,其实就是将monitor_.value_中27-0位设置一个Monitor Id // 这样就可以在省内存的情况下,给obj对象关联了一个Monitor对象 ... // 根据线程和obj参数,从Monitor池中获取一个Monitor对象 Monitor* m = MonitorPool::CreateMonitor(self, owner, obj, hash_code); DCHECK(m != nullptr); // 真正的操作是调用Install函数 if (m->Install(self)) { ... // 把这个monitor对象添加到Monitor List中进行管理 Runtime::Current()->GetMonitorList()->Add(m); CHECK_EQ(obj->GetLockWord(true).GetState(), LockWord::kFatLocked); } else { MonitorPool::ReleaseMonitor(self, m); } } bool Monitor::Install(Thread* self) { MutexLock mu(self, monitor_lock_); // Uncontended mutex acquisition as monitor isn't yet public. LockWord lw(GetObject()->GetLockWord(false)); switch (lw.GetState()) { case LockWord::kThinLocked: { CHECK_EQ(owner_->GetThreadId(), lw.ThinLockOwner()); // 如果当前是瘦锁,则需要更新该锁被上锁的次数,前面提到单线程多次上锁的情况,在转化为胖锁时,需要保留原瘦锁的信息,以防后续减肥时需要还原瘦锁及其信息 lock_count_ = lw.ThinLockCount(); break; } ... // 其他状态要么是报错,要么不处理 } // 构建胖锁,this表示的是Monitor,所以胖锁创建时会关联一个Monitor对象 LockWord fat(this, lw.GCState()); // 将新建的胖锁,更新到obj对象的monitor_成员变量中,CasLockWord()内部最终通过AtomicInteger对象的CompareAndSet()函数, // 将当前obj_对象中monitor_.value_更换为新的胖锁的value_,从始至终更换的都是32位无符号整型value_的值 bool success = GetObject()->CasLockWord(lw, fat, CASMode::kWeak, std::memory_order_release); ... return success; }
CAS操作:Compare And Swap,是一种"无锁"算法,不管Java层还是C层,多线程并发访问同一变量,要么用锁把这个变量锁起来,要么使用CAS算法,最终由汇编指令保证更新内存操作的原子性;当且仅当内存地址V的值与预期值A相等时,将内存地址V的值修改为B,否则自旋重复上面的比较,直到达到自旋次数上限;ART虚拟机中使用AtomicXXX来实现原子操作;
-
当然也有胖锁转化为瘦锁的操作,在Monitor::Deflate()函数中,该函数实现在art/runtime/monitor.cc文件中:
bool Monitor::Deflate(Thread* self, ObjPtr<mirror::Object> obj) { LockWord lw(obj->GetLockWord(false)); if (lw.GetState() == LockWord::kFatLocked) { // 内部是根据 Monitor Id 通过MonitorPool::MonitorFromMonitorId()获取到obj对象关联的胖锁对象 Monitor* monitor = lw.FatLockMonitor(); MutexLock mu(self, monitor->monitor_lock_); // 如果发现还有其他线程等待获取该对象锁,则直接return,不进行瘦身 if (monitor->num_waiters_ > 0) { return false; } Thread* owner = monitor->owner_; if (owner != nullptr) { if (monitor->HasHashCode()) { return false; } if (static_cast<uint32_t>(monitor->lock_count_) > LockWord::kThinLockMaxCount) { return false; } // 创建瘦锁 LockWord new_lw = LockWord::FromThinLockId(owner->GetThreadId(), monitor->lock_count_, lw.GCState()); // 更新obj的monitor_成员变量 obj->SetLockWord(new_lw, false); } // 因为减肥成瘦锁,原来的obj对象不再需要Monitor对象了,所以解绑 monitor->obj_ = GcRoot<mirror::Object>(nullptr); } return true; }
但Deflate()函数并没有被调用到,是因为 胖->瘦 涉及到内存回收,ART放到了GC部分处理了,此处只需要了解一下就行;
-
有了以上基础,看一下Java对synchronized的处理,是在进入synchronized修饰的代码块之前,执行monitorenter指令,退出代码块时执行monitorexit指令。但ART运行时有2种方法:解释执行模式,机器码执行模式,不同模式下执行指令的方式不一样。
ART运行时模式:默认是机器码执行模式,可以通过-Xint指定。【解释执行模式】是取出dex code,由解释器逐条执行;【机器码执行模式】是在安装时,把dex优化成机器码,运行时直接执行机器码;
-
解释执行模式:Android 10 的代码执行入口会走到MONITOR_ENTER()方法(runtime/interpreter/interpreter_switch_impl-inl.h文件中),接着走到静态方法DoMonitorEnter()中(在runtime/interpreter/interpreter_common.h文件):
template <bool kMonitorCounting> static inline void DoMonitorEnter(Thread* self, ShadowFrame* frame, ObjPtr<mirror::Object> ref) NO_THREAD_SAFETY_ANALYSIS REQUIRES(!Roles::uninterruptible_) { ... StackHandleScope<1> hs(self); Handle<mirror::Object> h_ref(hs.NewHandle(ref)); // 这里拿到准备被锁住的obj对象并执行Object::MonitorEnter()函数 h_ref->MonitorEnter(self); ... if (kMonitorCounting && frame->GetMethod()->MustCountLocks()) { frame->GetLockCountData().AddMonitor(self, h_ref.Get()); } }
- 接着进入当前锁对象obj的Object::MonitorEnter()函数(位于runtime/mirror/object-inl.h文件中):
inline ObjPtr<mirror::Object> Object::MonitorEnter(Thread* self) { return Monitor::MonitorEnter(self, this, /*trylock=*/false); }
- 可以看到,传入了想要争用锁的线程对象,并调用Monitor::MonitorEnter()函数,进入查看(在runtime/monitor.cc文件中):
ObjPtr<mirror::Object> Monitor::MonitorEnter(Thread* self, ObjPtr<mirror::Object> obj, bool trylock) { ... // 这个参数很重要,是ART优化时使用到的参数 size_t contention_count = 0; StackHandleScope<1> hs(self); Handle<mirror::Object> h_obj(hs.NewHandle(obj)); // 循环!!! while (true) { // 获取obj的monitor_(32位无符号整型),并赋值到lock_word.value_中 LockWord lock_word = h_obj->GetLockWord(false); switch (lock_word.GetState()) { case LockWord::kUnlocked: { // 如果当前obj对象处于未上锁状态,则创建瘦锁并更新到obj.monitor_中 LockWord thin_locked(LockWord::FromThinLockId(thread_id, 0, lock_word.GCState())); // 执行CasLockWord()函数,内部使用C++ AtomicInteger保证原子操作,将瘦锁更新到当前obj.monitor_中 if (h_obj->CasLockWord(lock_word, thin_locked, CASMode::kWeak, std::memory_order_acquire)) { AtraceMonitorLock(self, h_obj.Get(), /* is_wait= */ false); return h_obj.Get(); // Success! } continue; // Go again. } case LockWord::kThinLocked: { // 如果当前obj对象已经上了瘦锁,说明有一个线程在使用这个锁,先拿到这个线程id uint32_t owner_thread_id = lock_word.ThinLockOwner(); // 如果发现准备争用对象锁的线程就是当前线程,即同一个线程多次获取obj对象锁 if (owner_thread_id == thread_id) { // 持有该锁的线程再次获取锁,只需要lock count位加一即可 uint32_t new_count = lock_word.ThinLockCount() + 1; if (LIKELY(new_count <= LockWord::kThinLockMaxCount)) { // lock count 有个默认最大值4096,因为使用27-16位来保存的这个字段 // 创建新的瘦锁,并将lock count 加一即可 LockWord thin_locked(LockWord::FromThinLockId(thread_id, new_count, lock_word.GCState())); if (!kUseReadBarrier) { h_obj->SetLockWord(thin_locked, /* as_volatile= */ false); AtraceMonitorLock(self, h_obj.Get(), /* is_wait= */ false); return h_obj.Get(); // Success! } else { if (h_obj->CasLockWord(lock_word, thin_locked, CASMode::kWeak, std::memory_order_relaxed)) { AtraceMonitorLock(self, h_obj.Get(), /* is_wait= */ false); return h_obj.Get(); // Success! } } continue; // Go again. } else { // 如果一个线程获取锁的次数大于4096,则需要膨胀了 InflateThinLocked(self, h_obj, lock_word, 0); } } else { // 说明是第二个线程来争用这个obj对象的锁,这时候也需要膨胀了 ... contention_count++; ... // 这里的逻辑和最外层的while循环配合使用,这是ART做出的优化操作: if (contention_count <= runtime->GetMaxSpinsBeforeThinLockInflation()) { // 1. 如果循环的次数contention_count<50(即runtime->GetMaxSpinsBeforeThinLockInflation()最终获取的是Opt::MaxSpinsBeforeThinLockInflation的值,默认50次), // 则通过sched_yield系统调用,当前调用线程B(新的竞争锁的线程)会让出执行资格,操作系统将在后续某个时间恢复该新竞争线程B的执行,这就存在一个时间间隔; // 2. 在这个时间间隔中,有可能正在占用锁的线程A释放了锁,那就不用使用Monitor这个重型资源了,达到优化的目的,但循环的次数上限为50次,因为多核CPU可能会导致这个时间间隔很短, // 接近于0,会导致新竞争线程B不必要的一直等待; // 3. 表现:新的竞争锁的线程等待一会,不继续执行,但新竞争线程并没有放弃CPU,还是会占用CPU资源; sched_yield(); } else { contention_count = 0; // 3. 所以如果循环了50次后新竞争线程B还是拿不到锁,那就只能将锁升级为胖锁,使用Monitor实现同步了。 InflateThinLocked(self, h_obj, lock_word, 0); } } continue; // Start from the beginning. } case LockWord::kFatLocked: { // 如果是已经是胖锁了,其他线程来争用这个锁,则使用monitor->lock()函数获取锁,实现线程间同步 std::atomic_thread_fence(std::memory_order_acquire); Monitor* mon = lock_word.FatLockMonitor(); if (trylock) { return mon->TryLock(self) ? h_obj.Get() : nullptr; } else { // 【1】如果锁已经被占用,则新的竞争线程B会在此处等待,直到lock()函数返回,lock()内部使用操作系统的futex实现线程间同步 mon->Lock(self); return h_obj.Get(); // Success! } } ... } } } }
总结如下:
MonitorEnter实现.png
- 了解了ART做的一些优化,接着看瘦锁膨胀的流程,在Monitor::InflateThinLocked(art/runtime/monitor.cc文件中):
void Monitor::InflateThinLocked(Thread* self, Handle<mirror::Object> obj, LockWord lock_word, uint32_t hash_code) { uint32_t owner_thread_id = lock_word.ThinLockOwner(); if (owner_thread_id == self->GetThreadId()) { // 如果是同一个线程,直接调用Inflate()函数,这种情况一般是同一个线程多次获取锁并且获取次数超过4096,不能不转化为胖锁 Inflate(self, self, obj.Get(), hash_code); } else { // 如果是多个线程竞争同一个对象锁,就需要膨胀了,过程和把大象装冰箱总共分3步一样 ThreadList* thread_list = Runtime::Current()->GetThreadList(); self->SetMonitorEnterObject(obj.Get()); bool timed_out; Thread* owner; { ScopedThreadSuspension sts(self, kWaitingForLockInflation); // 1. 先将当前拥有锁的线程暂停,只是暂停当前线程的运行,并不是让线程放弃CPU的执行 owner = thread_list->SuspendThreadByThreadId(owner_thread_id, SuspendReason::kInternal, &timed_out); } if (owner != nullptr) { lock_word = obj->GetLockWord(true); if (lock_word.GetState() == LockWord::kThinLocked && lock_word.ThinLockOwner() == owner_thread_id) { // 2. 然后将瘦锁转化为胖锁 Inflate(self, owner, obj.Get(), hash_code); } // 3. 再把线程恢复运行 bool resumed = thread_list->Resume(owner, SuspendReason::kInternal); } self->SetMonitorEnterObject(nullptr); } }
- 所以,锁膨胀的过程是需要将当前持有锁的线程暂停运行的;
- Monitor::MonitorExit()函数的过程其实涉及到 胖->瘦 的过程,比较简单,也是判断锁状态,如果是瘦锁,那就将锁的lockcount--,但如果是胖锁,直接执行Monitor::Unlock()函数,而不是执行Monitor::Deflate()函数,因为涉及到锁的内存回收,所以放在了GC部分,就不展开讨论了;
- 上述就是monitorenter指令获取对象锁的流程;
-
机器码执行模式:
机器码执行模式,放到1.5节去介绍,主要内容涉及到汇编程序代码; -
总结:
- Java层的synchronized字段的实现,是由编译器编译时,在synchronized代码块开始时添加monitorenter指令,在离开代码块时添加monitorexit指令(至于这两条指令是怎么生成但,他们的执行入口到底在哪,1.5节进行介绍),而这两条指令的JVM实现主要在art/runtime/monitor.cc中,monitor是使用ART封装的Mutex(互斥锁)实现的,而Mutex又是使用了futex系统调用或pthread中pthread_mutex_t相关函数(具体使用哪种由ART的ART_USE_FUTEXES宏控制);
- 在ART虚拟机的两种不同的执行模式下会有不同的实现,但针对arm架构,最终,不论解释执行模式还是机器码执行模式,都会调用Monitor::MonitorEnter和Monitor::MonitorExit,处理争用锁和释放锁的流程;
1.4 操作系统层实现
其实Mutex(art/runtime/base/mutex.cc文件)是ART虚拟机封装的一个互斥锁,最终使用的是Android操作系统(Linux)的同步机制【futex系统调用】或【pthread中pthread_mutex_t的相关方法】实现的,先简单介绍一下Mutex:
-
在art/runtime/base/mutex.h文件头部中定义了宏ART_USE_FUTEXES:
#if defined(__linux__) #define ART_USE_FUTEXES 1 #else #define ART_USE_FUTEXES 0 #endif
Android系统是基于Linux的,所以默认使用futex机制实现线程同步;
-
介绍futex机制之前先看一下Mutex::ExclusiveLock()的实现,上述在1.3.9中介绍【1】位置
mon->lock()
,该lock()函数中会使用即使用Mutex::ExclusiveLock()进行上锁的操作:void Mutex::ExclusiveLock(Thread* self) { ... if (!recursive_ || !IsExclusiveHeld(self)) { // 从这开始! #if ART_USE_FUTEXES bool done = false; do { int32_t cur_state = state_and_contenders_.load(std::memory_order_relaxed); if (LIKELY((cur_state & kHeldMask) == 0) /* lock not held */) { // 判断如果锁没有被任何线程持有,则更新state_and_contenders的低位值,标记此时锁被持有了 // state_and_contenders是AtomicInteger类型(32位有符号整型)的对象,低位0表示没有被持有,低位1表示被持有 done = state_and_contenders_.CompareAndSetWeakAcquire(cur_state, cur_state | kHeldMask); } else { // 此处代表争用锁失败,则需要将线程挂起 ScopedContentionRecorder scr(this, SafeGetTid(self), GetExclusiveOwnerTid()); // 1. 记录竞争线程的个数+1 increment_contenders(); cur_state += kContenderIncrement; // 2. 在执行futex系统调用前,当前线程需要先检查标记位as_struct类型(在art/runtime/thread.h文件中),优先执行其他事情(指的是as_struct里标记的事情) if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) { self->CheckEmptyCheckpointFromMutex(); } // 3. 然后执行futex()函数,此处会让新的竞争线程进入睡眠等待,对应线程将不再执行,必须唤醒后才能执行后面的操作: if (futex(state_and_contenders_.Address(), FUTEX_WAIT_PRIVATE, cur_state, nullptr, nullptr, 0) != 0) { if ((errno != EAGAIN) && (errno != EINTR)) { PLOG(FATAL) << "futex wait failed for " << name_; } } decrement_contenders(); } } while (!done); ... #else ... #endif exclusive_owner_.store(SafeGetTid(self), std::memory_order_relaxed); RegisterAsLocked(self); } recursion_count_++; ... }
ART虚拟机Check Point机制:Java线程执行指令时,会时常检查标记位as_struct,如果有变化,就需要让线程先去执行标记位中标记的一些事情,这些事情包括GC等等,这里表现出来的就是程序卡顿、无响应;Check Point机制是为了让线程工作过程当中,可以有转头执行更重要的事情的机会;
- art::futex()函数使用系统调用,即syscall()函数,指令是SYS_futex(定义在art/runtime/mutex-inl.h文件头部);
#if ART_USE_FUTEXES static inline int futex(volatile int *uaddr, int op, int val, const struct timespec *timeout, volatile int *uaddr2, int val3) { return syscall(SYS_futex, uaddr, op, val, timeout, uaddr2, val3); } #endif // ART_USE_FUTEXES
- uaddr:用户态下的一块共享内存的地址,内部保存了一个计数器;
- op:是操作类型,Mutex的实现使用了FUTEX_WAIT_PRIVATE,意思是原子性的检查
if(uaddr中的计数器的值==val) ? 让线程挂起,直到timeout : null
,也就是说把线程挂到一个等待队列中;
-
Linux操作系统的【系统调用futex】:
- Futex:Fast Userspace Mutexes的缩写,快速用户空间互斥体;
- 传统的进程同步机制semaphores, msgqueues, sockets等,都是对内核空间中的一个内核对象(这个内核对象提供了原子操作)进行操作,这个内核对象对于所有要同步的进程都是可见的,当进程间需要同步的时候,都需要通过系统调用在内核空间完成一些操作,比如操作上述的内核对象,这种系统调用是很耗费资源的;
- 但发现,很多时候,同步是没有竞争的,就是说一个进程进入互斥区,到退出互斥区,往往没有其他进程要抢着进入互斥区,这种情况下进程仍然需要通过系统调用,操作内核空间的对象,比较浪费,所以为了降低这种消耗,Futex机制诞生;
- 其实说白了,Futex机制目的就是减少进程同步时对内核空间的不必要的访问,从而减少资源消耗,Futex是一种用户态和内核态混合的同步机制;
- Futex机制是通过mmap分配一段共享内存,futex变量保存在这段共享内存中,当进程准备进入/退出互斥区时,先去共享内存中查看futex变量的值,而不是通过系统调用直接操作内核空间的对象。如果没竞争,则只需要修改共享内存中的futex变量,如果futex变量的值表示有进程间的竞争,则还是得通过系统调用去完成相应的操作;
- 对于线程,情况比较简单,因为线程共享虚拟内存空间,虚拟地址就可以唯一的标识出futex变量,即线程用同样的虚拟地址来访问futex变量;
- 所以Futex在程序low-contention的时候能获得比传统同步机制更好的性能;
- 【维基百科上这么描述】:Futex 由一块能够被多个进程共享的内存空间(一个对齐后的整型变量)组成;这个整型变量的值能够通过汇编语言调用CPU提供的原子操作指令来增加或减少,并且一个进程可以等待直到那个值变成正数。Futex 的操作几乎全部在用户空间完成;只有当操作结果不一致从而需要仲裁时,才需要进入操作系统内核空间执行。这种机制允许使用 futex 的锁定原语有非常高的执行效率:由于绝大多数的操作并不需要在多个进程之间进行仲裁,所以绝大多数操作都可以在应用程序空间执行,而不需要使用(相对高代价的)内核系统调用;
-
总结:
- Android操作系统中的多线程同步,是使用Android【系统调用futex机制】实现的;
- 系统调用futex在ART虚拟机Mutex互斥锁中的应用:1)支持对线程的睡眠与唤醒的原子操作;2)管理线程挂起时的等待队列;
1.5 汇编实现
-
在前面1.3.11中留了2个问题:【1】monitorenter和monitoerexit这两条指令在哪生成的?【2】他们的执行入口到底在哪?不同CPU架构,执行不同的汇编代码,下面以ARM架构简单描述一下,有兴趣也可以对比x86的实现,更容易理解;
-
【1】先看如何生成这两条指令的,ART虚拟机中dex2oat工具会对dex进行优化(涉及到ART虚拟机的编译原理,足够单独拿出来一大章进行描述,也会在后续的volatile章节中介绍如何使用dex2oat工具),经过dex2oat工具编译后,monitorenter和monitorexit指令将转换为HMonitorOperation IR(Intermediate Representation: 中间代码)对象,该对象会生成对应的机器码,生成机器码的函数是InstructionCodeGeneratorARMVIXL::VisitMonitorOperation(),定义在art/compiler/optimizing/code_generator_arm_vixl.cc文件中:
void InstructionCodeGeneratorARMVIXL::VisitMonitorOperation(HMonitorOperation* instruction) { // 1. 如果是monitorenter,则传入kQuickLockObject,monitorexit的话传入kQuickUnlockObject // 2. ARM架构下,kQuickLockObject指向汇编函数art_quick_lock_object(定义在art/runtime/entrypoints/quick/quick_default_externs.h文件中), // 即art_quick_lock_object函数由汇编代码实现;kQuickUnlockObject指向汇编函数art_quick_unlock_object codegen_->InvokeRuntime(instruction->IsEnter() ? kQuickLockObject : kQuickUnlockObject, instruction, instruction->GetDexPc()); ... }
vixl 是安卓系统使用的针对ARMv8 架构的动态代码生成工具,x86架构对应的汇编程序在code_generator_x86.cc文件中;
- 上述两个汇编函数的原型定义在art/runtime/entrypoints/quick/quick_default_enterns.h文件中,函数的参数都是Object对象,即线程争用的对象:
// Lock entrypoints. extern "C" void art_quick_lock_object(art::mirror::Object*); extern "C" void art_quick_unlock_object(art::mirror::Object*);
-
【2】再看在哪被执行,由于篇幅过长,下面只介绍lock过程的实现,需要继续找汇编代码,在art/runtime/arch/arm/quick_entrypoints_arm.S文件的art_quick_lock_object方法,.S文件是汇编程序文件格式:
ENTRY art_quick_lock_object ldr r1, [rSELF, #THREAD_ID_OFFSET] // cbz指令:把r0的值和0比较,如果相等,则执行后面的.Lslow_lock指令 cbz r0, .Lslow_lock .Lretry_lock: ... .Lnot_unlocked: @ r2: original lock word, r1: thread_id, r3: r2 ^ r1 ... .Llock_strex_fail: b .Lretry_lock @ retry END art_quick_lock_object ENTRY art_quick_lock_object_no_inline ... .Lslow_lock: SETUP_SAVE_REFS_ONLY_FRAME r1 @ save callee saves in case we block mov r1, rSELF @ pass Thread::Current // 注意这里!bl指令:跳转到artLockObjectFromCode函数 bl artLockObjectFromCode @ (Object* obj, Thread*) RESTORE_SAVE_REFS_ONLY_FRAME REFRESH_MARKING_REGISTER RETURN_IF_RESULT_IS_ZERO DELIVER_PENDING_EXCEPTION END art_quick_lock_object_no_inline
- 当跳转到.Lslow_lock后,会执行artLockObjectFromCode函数,该函数在art/runtime/entrypoints/quick/quick_lock_entrypoints.cc文件中:
extern "C" int artLockObjectFromCode(mirror::Object* obj, Thread* self) NO_THREAD_SAFETY_ANALYSIS REQUIRES(!Roles::uninterruptible_) REQUIRES_SHARED(Locks::mutator_lock_) /* EXCLUSIVE_LOCK_FUNCTION(Monitor::monitor_lock_) */ { ScopedQuickEntrypointChecks sqec(self); if (UNLIKELY(obj == nullptr)) { ... } else { // 就是这,终于连起来了... ObjPtr<mirror::Object> object = obj->MonitorEnter(self); // May block ... return 0; } }
x86架构的汇编代码中,使用lock cmpxchg命令:(LOCK_IF_MP)-> lock cmpxchg ,即如果是多核CPU,则在执行CAS时加上操作系统的lock指令;其中【上锁操作】最终锁的是总线,多个CPU访问同一块内存是通过总线访问;单核就不加lock命令了;意思是"lock"命令保证"cmpxchg"操作是一个原子性操作,不允许其他CPU打断cmpxchg操作(即:CAS写一块内存时,不允许其他CPU访问这块内存)
1.6 总结
- Java代码添加synchronized修饰符后,编译器编译成字节码时,会在进入同步代码块前添加monitorenter指令,退出代码块时添加monitorexit指令;
- 在机器码执行模式下,monitorenter这个Java指令对应的汇编代码被执行时,执行了quick_lock_entrypoints.cc文件中的artLockObjectFromCode()函数;
- artLockObjectFromCode()函数调用了obj->MonitorEnter(self)函数,内部调用Monitor::MonitorEnter()函数;
- Monitor::MonitorEnter()函数根据判断,如果同一个线程获取锁,则更新lock count;如果多线程争用锁,则看情况,严重的话使用ART提供的Mutex互斥锁;
- 而Mutex的实现是使用了系统调用futex()函数,所以在多线程争用锁的时候,最终是由Linux的系统调用futex()保证多线程同步机制的;
网友评论