美文网首页
Android多线程同步-synchronized

Android多线程同步-synchronized

作者: Hawozhencai | 来源:发表于2021-04-08 10:21 被阅读0次

    本章节内容主要参考《深入理解Android Java虚拟机ART》进行研究,所参考ART源码版本为 Android 10,如有问题请及时指出;

    1. synchronized实现原理

    分5层逐步解析synchronized的工作原理:Java代码实现,字节码实现,JVM实现,操作系统实现,汇编实现;

    1.1 Java代码实现

    1. sychronized的定义及使用场景:
      Java提供的一种支持原子性的锁机制,可以保证多线程并发访问共享数据时是互斥的;sychronized是互斥锁,同一时刻最多只有一个线程能持有这个锁,所以被sychronized修饰的代码块可以以不可分割的方式被执行;由于synchronized机制悲观地认为,自己在使用数据的时候一定有其他线程来修改数据,所以synchronized是一种悲观锁;

      注释:锁定的是对象而不是代码:执行某一段代码时必须锁定某个对象;

    2. 示例代码:
      private static Thread[] threads = new Thread[100];
      private static int count = 0;
      
      public static void main(String[] args) {
          Object o = new Object();
          Runnable runnable = () -> {
          //    synchronized (o) {
              for (int i = 0; i < 100_0000; i++) {
                  count++;
              }
          //    }
          };
      
          for (int i = 0; i < threads.length; i++) {
              threads[i] = new Thread(runnable);
          }
      
          for (Thread t : threads) {
              t.start();
          }
          try {
              for (Thread t : threads) {
                  t.join();
              }
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      
          System.out.println("lewis : count is " + count);
      }
      

      如果不加synchronized修饰,打印出来的count!=100*100_0000;发生这种多线程并发访问出现不安全问题的原因涉及到Java线程内存模型;

    3. 简单了解一下Java线程的内存模型,这里涉及到主存和工作内存的概念:
      • CPU硬件结构如图所示:

        CPU硬件结构.png
      • CPU在读取运算所需的数据、存储运算得到的数据结果等操作的时候,会涉及到与内存的读写交互,但CPU运算速度远高于内存的读写速度,为了解决这种速度不匹配的情况,增加了多级高速缓存;这样,运算前,将数据复制到高速缓存,运算后,将结果同步写回主存(对于JVM来说,这个主存是指Java堆内存);

      • CPU在计算时,并不总是从主存中读数据(数据是指共享变量,存在线程间共享和竞争的关系),而是通过从多级缓存中拿数据,从而提高执行效率。读取数据的优先级是:寄存器 > 高速缓存(多级) > 主存;

      • 所以线程的工作内存是个抽象概念,指的是寄存器和(多级)高速缓存,线程的工作内存中保存的是主存中数据的备份;

      • 当多线程访问主存中的某个经常被访问的数据时,是先从线程的工作内存中读写数据,然后在某个合适的时候再写回主存中,所以每个工作线程中的副本变量不一定是最新的(对其他线程是不可见的),这就是上面多线程读写count字段出现线程不安全的原因;

        Java线程模型.png
      • 所以需要加锁保证多线程间数据同步,简单说一下(后续介绍),volatile字段的作用只能保证数据的可见性和有序性,即:线程A对volatile count字段执行写操作后,会立即将count值写回到主存中,并通知其他线程count值发生变化,这样count对其他线程就是可见的了,但是volatile并不会保证操作的原子性;

    1. sychronized的4种使用方式:
      public class Foo {
          private int count = 0;
          private Object o = new Object();
      
          public static synchronized void t1() {
              // 静态方法加synchronized和t2()的方式争用的都是类锁,和对象锁唯一的区别就是,内存中类锁只有一把;
          }
      
          public void t2() {
              synchronized (Foo.class) {
              }
          }
      
          public synchronized void t3() {
          }
      
          public void t4() {
              synchronized (this) {
              }
          }
      
          public void t5() {
              // 如果使用下面的o对象,多线程并发访问t5(),结果是什么?
              // Object o = new Object();
              synchronized (o) {
                  count++;
              }
          }
      }
      

    1.2 字节码实现

    1. 可以通过javac Foo.java命令生成.class文件,然后通过javap -v Foo.class将字节码反编译后显示在terminal中,或者使用idea的插件ASM Bytecode Viewer:
    2. 下面是t1(),t2(),t3()和t5()的字节码:
        public static synchronized void t1();
        descriptor: ()V
        // JVM会解析方法的符号引用,然后根据flags去锁定(类或者对象)
        flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
        Code:
          stack=0, locals=0, args_size=0
             0: return
          LineNumberTable:
             line 12: 0
            
        public void t2();
        descriptor: ()V
        flags: ACC_PUBLIC
        Code:
          stack=2, locals=3, args_size=1
             0: ldc           #5                  // class juc/Foo
             2: dup
             3: astore_1
             4: monitorenter
             5: aload_1
             6: monitorexit
             7: goto          15
            10: astore_2
            11: aload_1
            12: monitorexit
            13: aload_2
            14: athrow
            15: return
          Exception table:
             from    to  target type
                 5     7    10   any
                10    13    10   any
          LineNumberTable:
            line 15: 0
            line 16: 5
            line 17: 15
          StackMapTable: number_of_entries = 2
            frame_type = 255 /* full_frame */
              offset_delta = 10
              locals = [ class juc/Foo, class java/lang/Object ]
              stack = [ class java/lang/Throwable ]
            frame_type = 250 /* chop */
              offset_delta = 4
    
        public synchronized void t3();
        descriptor: ()V
        flags: ACC_PUBLIC, ACC_SYNCHRONIZED
        Code:
          stack=0, locals=1, args_size=1
             0: return
          LineNumberTable:
            line 20: 0
    
        public void t5();
        descriptor: ()V
        flags: ACC_PUBLIC
        Code:
          stack=3, locals=3, args_size=1
             // 偏移量: 指令
             // 将局部变量表中第一个局部变量的引用推到栈顶,对于非static方法,aload_0表示this
             0: aload_0
             1: getfield      #3                  // Field o:Ljava/lang/Object;
             // dup是复制栈顶元素,并压入栈顶
             4: dup
             // 0~5表示:将this对象入栈,使用duplicate指令复制栈顶元素并放入局部变量表位置1的地方,此时当前栈帧(方法执行的数据结构)的操作数栈(后进先出)中只有一个元素this
             5: astore_1
             // synchronized的具体实现由monitorenter和monitorexit指令完成
             6: monitorenter
             7: aload_0
             8: dup
             9: getfield      #4                  // Field count:I
            12: iconst_1
            13: iadd
            14: putfield      #4                  // Field count:I
            17: aload_1
            18: monitorexit
            19: goto          27
            // 以上是正常执行完方法的步骤,下面是编译器自动生成的异常处理步骤,必须保证不管正常执行完毕还是异常退出方法,都必须释放锁
            22: astore_2
            23: aload_1
            24: monitorexit
            25: aload_2
            26: athrow
            27: return
          Exception table:
             from    to  target type
                 7    19    22   any
                22    25    22   any
          LineNumberTable:
            line 29: 0
            line 30: 7
            line 31: 17
            line 32: 27
          StackMapTable: number_of_entries = 2
            frame_type = 255 /* full_frame */
              offset_delta = 22
              locals = [ class juc/Foo, class java/lang/Object ]
              stack = [ class java/lang/Throwable ]
            frame_type = 250 /* chop */
              offset_delta = 4
    

    1.3 JVM(ART)实现

    1. 在进入使用synchronized关键字的代码块时,会生成一条monitorenter指令,退出该代码块时生成一条monitorexit指令;

    2. 在ART虚拟机中,关于线程同步涉及到的类如下图所示,真正实现线程同步功能的是Monitor类(art/runtime/monitor.cc文件中定义);

      ART中线程同步相关类.png
      • Monitor::monitor_lock_:类型为Mutex(art/runtime/base/Mutex.h),Mutex是ART虚拟机对Linux操作系统提供的同步机制的封装,后面介绍;
      • Monitor::monitor_id_:类型为32位的无符号整型,每个Monitor对象都有一个id,这个id字段比较重要,后期就是通过这个id从Monitor统一的管理类MonitorPool获取真正的Monitor对象的;
      • Monitor::obj_:类型为GcRoot<mirror::Object>,每个Monitor对象都会关联一个Object对象,但不一定每个Object都关联一个Monitor对象【1】,后面解释;
      • MonitorPool:Monitor和OS关联紧密(因为调用了系统提供的同步方法),所以每个Monitor很重要,由专门的MonitorPool管理其内存分配和回收;
      • MonitorList:ART虚拟机用来管理虚拟机进程中分配的Monitor对象,可以看作一个list容器,这两个类是不参与线程同步的,了解一下即可;
      • Object::monitor_:类型为32位无符号整型,这个monitor_对象指向的不是Monitor对象,而是一个LockWord对象;
      • LockWord::value_:类型为32位无符号整型,存储着关于锁的信息【2】,后面介绍;
    3. 先看问题【1】,既然Monitor对象是和Object对象关联的,为什么Object.monitor_指向的却不是真正的Monitor对象呢?

      • Java层每个对象都可以作为同步锁,对应ART虚拟机层就是每个Object对象都包含一个可作为"同步锁"的成员变量;
      • 而Object::monitor_指向了一个LockWord对象?这是因为Monitor和OS关联紧密,属于重型资源,内存中有很多Object对象,但只有很少一部分被作为线程同步锁使用,所以不能每个Object对象都分配一个真正的Monitor对象;所以ART设计了LockWord类,代表Object对象中的"锁";
    4. 再看问题【2】,LockWord是干什么的。LockWord本身所需内存很小,只有一个成员变量uint32_t value_,即Object::monitor_只有32位,而这个value_包含了多种信息。当synchronized修饰的代码块只有单个线程访问时,monitor_起到轻量级保护作用(后面解释),此时monitor_字段中保存的锁的类型是"Thin Lock";当多个线程访问synchronized代码块时,monitor_字段中的锁类型标记位就需要膨胀成"Fat Lock"类型,而胖锁需要使用系统调用来确保多线程并发访问的正确性,这个膨胀的过程就是关联一个Monitor对象,通过Monitor对象实现多线程同步;LockWord的主要代码如下:

      class LockWord {
       public:
        enum SizeShiftsAndMasks : uint32_t {
          ...
          // 锁的类型:瘦、胖锁等
          kStateThinOrUnlocked = 0,
          kStateFat = 1,
          kStateHash = 2,
          kStateForwardingAddress = 3,
          ...
        };
      
        static LockWord FromThinLockId(uint32_t thread_id, uint32_t count, uint32_t gc_state) {
          ...
        }
        ...
      
        static LockWord FromDefault(uint32_t gc_state) {
          return LockWord(gc_state << kGCStateShift);
        }
      
        static LockWord Default() {
          // 默认创建一个状态为kUnlocked的瘦锁
          return LockWord();
        }
      
        // 锁的状态:表明是上锁还是没上锁
        enum LockState { 
          kUnlocked,    // No lock owners.
          kThinLocked,  // Single uncontended owner.
          kFatLocked,   // See associated monitor.
          ...
        };
        ...
      
       private:
        ...
      
        // The encoded value holding all the state.
        uint32_t value_;
      };
      }  // namespace art
      
    5. value_字段中不同bit保存不同的信息,具体看如下表格:

      锁类型\bit范围 31-30 29-28 27-16 15-0
      kStateThinOrUnlocked 00 rb Lock Count Thread Id Owner
      kStateFat 01 rb Monitor Id Monitor Id
      kStateHash 10 rb Hash Code Hash Code
      kStateForwardingAddress 11 Forwarding Address Forwarding Address Forwarding Address
      • 31-30位:表示锁的类型,目前只需要关注前两种即可;
      • kStateThinOrUnlocked类型:29-28位包含一个Read Barrier;27-16位是一个计数器(lock count),最大4096;15-0位是持有当前锁的线程ID;
      • kStateFat类型:27-0位是Monitor Id;
    6. 前面提到,默认创建的类型是瘦锁,而瘦锁是可以通过一系列步骤转换为胖锁的,具体的操作在Monitor::Inflate函数中,该函数实现在art/runtime/monitor.cc文件中:

      void Monitor::Inflate(Thread* self, Thread* owner, ObjPtr<mirror::Object> obj, int32_t hash_code) {
        // 该函数的目标是将obj对象中的monitor_(LockWord类型)描述的瘦锁,转化为一个胖锁,其实就是将monitor_.value_中27-0位设置一个Monitor Id
        // 这样就可以在省内存的情况下,给obj对象关联了一个Monitor对象
        ...
        // 根据线程和obj参数,从Monitor池中获取一个Monitor对象
        Monitor* m = MonitorPool::CreateMonitor(self, owner, obj, hash_code);
        DCHECK(m != nullptr);
        // 真正的操作是调用Install函数
        if (m->Install(self)) {
          ...
          // 把这个monitor对象添加到Monitor List中进行管理
          Runtime::Current()->GetMonitorList()->Add(m);
          CHECK_EQ(obj->GetLockWord(true).GetState(), LockWord::kFatLocked);
        } else {
          MonitorPool::ReleaseMonitor(self, m);
        }
      }
      
      bool Monitor::Install(Thread* self) {
        MutexLock mu(self, monitor_lock_);  // Uncontended mutex acquisition as monitor isn't yet public.
        LockWord lw(GetObject()->GetLockWord(false));
        switch (lw.GetState()) {
          case LockWord::kThinLocked: {
            CHECK_EQ(owner_->GetThreadId(), lw.ThinLockOwner());
            // 如果当前是瘦锁,则需要更新该锁被上锁的次数,前面提到单线程多次上锁的情况,在转化为胖锁时,需要保留原瘦锁的信息,以防后续减肥时需要还原瘦锁及其信息
            lock_count_ = lw.ThinLockCount();
            break;
          }
          ... 
          // 其他状态要么是报错,要么不处理
        }
        // 构建胖锁,this表示的是Monitor,所以胖锁创建时会关联一个Monitor对象
        LockWord fat(this, lw.GCState());
        // 将新建的胖锁,更新到obj对象的monitor_成员变量中,CasLockWord()内部最终通过AtomicInteger对象的CompareAndSet()函数,
        // 将当前obj_对象中monitor_.value_更换为新的胖锁的value_,从始至终更换的都是32位无符号整型value_的值
        bool success = GetObject()->CasLockWord(lw, fat, CASMode::kWeak, std::memory_order_release);
        ...
        return success;
      }  
      

      CAS操作:Compare And Swap,是一种"无锁"算法,不管Java层还是C层,多线程并发访问同一变量,要么用锁把这个变量锁起来,要么使用CAS算法,最终由汇编指令保证更新内存操作的原子性;当且仅当内存地址V的值与预期值A相等时,将内存地址V的值修改为B,否则自旋重复上面的比较,直到达到自旋次数上限;ART虚拟机中使用AtomicXXX来实现原子操作;

    7. 当然也有胖锁转化为瘦锁的操作,在Monitor::Deflate()函数中,该函数实现在art/runtime/monitor.cc文件中:

      bool Monitor::Deflate(Thread* self, ObjPtr<mirror::Object> obj) {
        LockWord lw(obj->GetLockWord(false));
        if (lw.GetState() == LockWord::kFatLocked) {
          // 内部是根据 Monitor Id 通过MonitorPool::MonitorFromMonitorId()获取到obj对象关联的胖锁对象
          Monitor* monitor = lw.FatLockMonitor();
          MutexLock mu(self, monitor->monitor_lock_);
          // 如果发现还有其他线程等待获取该对象锁,则直接return,不进行瘦身
          if (monitor->num_waiters_ > 0) {
            return false;
          }
          Thread* owner = monitor->owner_;
          if (owner != nullptr) {
            if (monitor->HasHashCode()) {
              return false;
            }
            if (static_cast<uint32_t>(monitor->lock_count_) > LockWord::kThinLockMaxCount) {
              return false;
            }
            // 创建瘦锁
            LockWord new_lw = LockWord::FromThinLockId(owner->GetThreadId(),
                                                       monitor->lock_count_,
                                                       lw.GCState());
            // 更新obj的monitor_成员变量
            obj->SetLockWord(new_lw, false);
          }
          // 因为减肥成瘦锁,原来的obj对象不再需要Monitor对象了,所以解绑
          monitor->obj_ = GcRoot<mirror::Object>(nullptr);
        }
        return true;
      }
      

      但Deflate()函数并没有被调用到,是因为 胖->瘦 涉及到内存回收,ART放到了GC部分处理了,此处只需要了解一下就行;

    8. 有了以上基础,看一下Java对synchronized的处理,是在进入synchronized修饰的代码块之前,执行monitorenter指令,退出代码块时执行monitorexit指令。但ART运行时有2种方法:解释执行模式,机器码执行模式,不同模式下执行指令的方式不一样。

      ART运行时模式:默认是机器码执行模式,可以通过-Xint指定。【解释执行模式】是取出dex code,由解释器逐条执行;【机器码执行模式】是在安装时,把dex优化成机器码,运行时直接执行机器码;

    9. 解释执行模式:Android 10 的代码执行入口会走到MONITOR_ENTER()方法(runtime/interpreter/interpreter_switch_impl-inl.h文件中),接着走到静态方法DoMonitorEnter()中(在runtime/interpreter/interpreter_common.h文件):

      template <bool kMonitorCounting>
      static inline void DoMonitorEnter(Thread* self, ShadowFrame* frame, ObjPtr<mirror::Object> ref)
          NO_THREAD_SAFETY_ANALYSIS
          REQUIRES(!Roles::uninterruptible_) {
        ...
        StackHandleScope<1> hs(self);
        Handle<mirror::Object> h_ref(hs.NewHandle(ref));
        // 这里拿到准备被锁住的obj对象并执行Object::MonitorEnter()函数
        h_ref->MonitorEnter(self);
        ...
        if (kMonitorCounting && frame->GetMethod()->MustCountLocks()) {
          frame->GetLockCountData().AddMonitor(self, h_ref.Get());
        }
      }
      
      • 接着进入当前锁对象obj的Object::MonitorEnter()函数(位于runtime/mirror/object-inl.h文件中):
      inline ObjPtr<mirror::Object> Object::MonitorEnter(Thread* self) {
        return Monitor::MonitorEnter(self, this, /*trylock=*/false);
      }
      
      • 可以看到,传入了想要争用锁的线程对象,并调用Monitor::MonitorEnter()函数,进入查看(在runtime/monitor.cc文件中):
      ObjPtr<mirror::Object> Monitor::MonitorEnter(Thread* self,
                                                   ObjPtr<mirror::Object> obj,
                                                   bool trylock) {
        ...
        // 这个参数很重要,是ART优化时使用到的参数
        size_t contention_count = 0;
        StackHandleScope<1> hs(self);
        Handle<mirror::Object> h_obj(hs.NewHandle(obj));
        // 循环!!!
        while (true) {
          // 获取obj的monitor_(32位无符号整型),并赋值到lock_word.value_中
          LockWord lock_word = h_obj->GetLockWord(false);
          switch (lock_word.GetState()) {
            case LockWord::kUnlocked: {
              // 如果当前obj对象处于未上锁状态,则创建瘦锁并更新到obj.monitor_中
              LockWord thin_locked(LockWord::FromThinLockId(thread_id, 0, lock_word.GCState()));
              // 执行CasLockWord()函数,内部使用C++ AtomicInteger保证原子操作,将瘦锁更新到当前obj.monitor_中
              if (h_obj->CasLockWord(lock_word, thin_locked, CASMode::kWeak, std::memory_order_acquire)) {
                AtraceMonitorLock(self, h_obj.Get(), /* is_wait= */ false);
                return h_obj.Get();  // Success!
              }
              continue;  // Go again.
            }
            case LockWord::kThinLocked: {
              // 如果当前obj对象已经上了瘦锁,说明有一个线程在使用这个锁,先拿到这个线程id
              uint32_t owner_thread_id = lock_word.ThinLockOwner();
              // 如果发现准备争用对象锁的线程就是当前线程,即同一个线程多次获取obj对象锁
              if (owner_thread_id == thread_id) {
                // 持有该锁的线程再次获取锁,只需要lock count位加一即可
                uint32_t new_count = lock_word.ThinLockCount() + 1;
                if (LIKELY(new_count <= LockWord::kThinLockMaxCount)) {
                  // lock count 有个默认最大值4096,因为使用27-16位来保存的这个字段
                  // 创建新的瘦锁,并将lock count 加一即可
                  LockWord thin_locked(LockWord::FromThinLockId(thread_id,
                                                                new_count,
                                                                lock_word.GCState()));
                  if (!kUseReadBarrier) {
                    h_obj->SetLockWord(thin_locked, /* as_volatile= */ false);
                    AtraceMonitorLock(self, h_obj.Get(), /* is_wait= */ false);
                    return h_obj.Get();  // Success!
                  } else {
                    if (h_obj->CasLockWord(lock_word,
                                           thin_locked,
                                           CASMode::kWeak,
                                           std::memory_order_relaxed)) {
                      AtraceMonitorLock(self, h_obj.Get(), /* is_wait= */ false);
                      return h_obj.Get();  // Success!
                    }
                  }
                  continue;  // Go again.
                } else {
                  // 如果一个线程获取锁的次数大于4096,则需要膨胀了
                  InflateThinLocked(self, h_obj, lock_word, 0);
                }
              } else {
                // 说明是第二个线程来争用这个obj对象的锁,这时候也需要膨胀了
                ...
                contention_count++;
                ...
                // 这里的逻辑和最外层的while循环配合使用,这是ART做出的优化操作:
                if (contention_count <= runtime->GetMaxSpinsBeforeThinLockInflation()) {
                  // 1. 如果循环的次数contention_count<50(即runtime->GetMaxSpinsBeforeThinLockInflation()最终获取的是Opt::MaxSpinsBeforeThinLockInflation的值,默认50次),
                  //    则通过sched_yield系统调用,当前调用线程B(新的竞争锁的线程)会让出执行资格,操作系统将在后续某个时间恢复该新竞争线程B的执行,这就存在一个时间间隔;
                  // 2. 在这个时间间隔中,有可能正在占用锁的线程A释放了锁,那就不用使用Monitor这个重型资源了,达到优化的目的,但循环的次数上限为50次,因为多核CPU可能会导致这个时间间隔很短,
                  //    接近于0,会导致新竞争线程B不必要的一直等待;
                  // 3. 表现:新的竞争锁的线程等待一会,不继续执行,但新竞争线程并没有放弃CPU,还是会占用CPU资源;
                  sched_yield();
                } else {
                  contention_count = 0;
                  // 3. 所以如果循环了50次后新竞争线程B还是拿不到锁,那就只能将锁升级为胖锁,使用Monitor实现同步了。
                  InflateThinLocked(self, h_obj, lock_word, 0);
                }
              }
              continue;  // Start from the beginning.
            }
            case LockWord::kFatLocked: {
              // 如果是已经是胖锁了,其他线程来争用这个锁,则使用monitor->lock()函数获取锁,实现线程间同步
              std::atomic_thread_fence(std::memory_order_acquire);
              Monitor* mon = lock_word.FatLockMonitor();
              if (trylock) {
                return mon->TryLock(self) ? h_obj.Get() : nullptr;
              } else {
                // 【1】如果锁已经被占用,则新的竞争线程B会在此处等待,直到lock()函数返回,lock()内部使用操作系统的futex实现线程间同步
                mon->Lock(self);
                return h_obj.Get();  // Success!
              }
            }
            ...
            }
          }
        }
      }
      

      总结如下:

      MonitorEnter实现.png
      • 了解了ART做的一些优化,接着看瘦锁膨胀的流程,在Monitor::InflateThinLocked(art/runtime/monitor.cc文件中):
      void Monitor::InflateThinLocked(Thread* self, Handle<mirror::Object> obj, LockWord lock_word,
                                      uint32_t hash_code) {
        uint32_t owner_thread_id = lock_word.ThinLockOwner();
        if (owner_thread_id == self->GetThreadId()) {
          // 如果是同一个线程,直接调用Inflate()函数,这种情况一般是同一个线程多次获取锁并且获取次数超过4096,不能不转化为胖锁
          Inflate(self, self, obj.Get(), hash_code);
        } else {
          // 如果是多个线程竞争同一个对象锁,就需要膨胀了,过程和把大象装冰箱总共分3步一样
          ThreadList* thread_list = Runtime::Current()->GetThreadList();
          self->SetMonitorEnterObject(obj.Get());
          bool timed_out;
          Thread* owner;
          {
            ScopedThreadSuspension sts(self, kWaitingForLockInflation);
            // 1. 先将当前拥有锁的线程暂停,只是暂停当前线程的运行,并不是让线程放弃CPU的执行
            owner = thread_list->SuspendThreadByThreadId(owner_thread_id,
                                                         SuspendReason::kInternal,
                                                         &timed_out);
          }
          if (owner != nullptr) {
            lock_word = obj->GetLockWord(true);
            if (lock_word.GetState() == LockWord::kThinLocked &&
                lock_word.ThinLockOwner() == owner_thread_id) {
              // 2. 然后将瘦锁转化为胖锁
              Inflate(self, owner, obj.Get(), hash_code);
            }
            // 3. 再把线程恢复运行
            bool resumed = thread_list->Resume(owner, SuspendReason::kInternal);
          }
          self->SetMonitorEnterObject(nullptr);
        }
      }
      
      • 所以,锁膨胀的过程是需要将当前持有锁的线程暂停运行的;
      • Monitor::MonitorExit()函数的过程其实涉及到 胖->瘦 的过程,比较简单,也是判断锁状态,如果是瘦锁,那就将锁的lockcount--,但如果是胖锁,直接执行Monitor::Unlock()函数,而不是执行Monitor::Deflate()函数,因为涉及到锁的内存回收,所以放在了GC部分,就不展开讨论了;
      • 上述就是monitorenter指令获取对象锁的流程;
    10. 机器码执行模式
      机器码执行模式,放到1.5节去介绍,主要内容涉及到汇编程序代码;

    11. 总结:

      • Java层的synchronized字段的实现,是由编译器编译时,在synchronized代码块开始时添加monitorenter指令,在离开代码块时添加monitorexit指令(至于这两条指令是怎么生成但,他们的执行入口到底在哪,1.5节进行介绍),而这两条指令的JVM实现主要在art/runtime/monitor.cc中,monitor是使用ART封装的Mutex(互斥锁)实现的,而Mutex又是使用了futex系统调用或pthread中pthread_mutex_t相关函数(具体使用哪种由ART的ART_USE_FUTEXES宏控制);
      • 在ART虚拟机的两种不同的执行模式下会有不同的实现,但针对arm架构,最终,不论解释执行模式还是机器码执行模式,都会调用Monitor::MonitorEnter和Monitor::MonitorExit,处理争用锁和释放锁的流程;

    1.4 操作系统层实现

    其实Mutex(art/runtime/base/mutex.cc文件)是ART虚拟机封装的一个互斥锁,最终使用的是Android操作系统(Linux)的同步机制【futex系统调用】或【pthread中pthread_mutex_t的相关方法】实现的,先简单介绍一下Mutex:

    1. 在art/runtime/base/mutex.h文件头部中定义了宏ART_USE_FUTEXES:

      #if defined(__linux__)
      #define ART_USE_FUTEXES 1
      #else
      #define ART_USE_FUTEXES 0
      #endif
      

      Android系统是基于Linux的,所以默认使用futex机制实现线程同步;

    2. 介绍futex机制之前先看一下Mutex::ExclusiveLock()的实现,上述在1.3.9中介绍【1】位置mon->lock(),该lock()函数中会使用即使用Mutex::ExclusiveLock()进行上锁的操作:

      void Mutex::ExclusiveLock(Thread* self) {
        ...
        if (!recursive_ || !IsExclusiveHeld(self)) {
      // 从这开始!
      #if ART_USE_FUTEXES
          bool done = false;
          do {
            int32_t cur_state = state_and_contenders_.load(std::memory_order_relaxed);
            if (LIKELY((cur_state & kHeldMask) == 0) /* lock not held */) {
              // 判断如果锁没有被任何线程持有,则更新state_and_contenders的低位值,标记此时锁被持有了
              // state_and_contenders是AtomicInteger类型(32位有符号整型)的对象,低位0表示没有被持有,低位1表示被持有
              done = state_and_contenders_.CompareAndSetWeakAcquire(cur_state, cur_state | kHeldMask);
            } else {
              // 此处代表争用锁失败,则需要将线程挂起
              ScopedContentionRecorder scr(this, SafeGetTid(self), GetExclusiveOwnerTid());
              // 1. 记录竞争线程的个数+1
              increment_contenders();
              cur_state += kContenderIncrement;
              // 2. 在执行futex系统调用前,当前线程需要先检查标记位as_struct类型(在art/runtime/thread.h文件中),优先执行其他事情(指的是as_struct里标记的事情)
              if (UNLIKELY(should_respond_to_empty_checkpoint_request_)) {
                self->CheckEmptyCheckpointFromMutex();
              }
              // 3. 然后执行futex()函数,此处会让新的竞争线程进入睡眠等待,对应线程将不再执行,必须唤醒后才能执行后面的操作:
              if (futex(state_and_contenders_.Address(), FUTEX_WAIT_PRIVATE, cur_state,
                        nullptr, nullptr, 0) != 0) {
                if ((errno != EAGAIN) && (errno != EINTR)) {
                  PLOG(FATAL) << "futex wait failed for " << name_;
                }
              }
              decrement_contenders();
            }
          } while (!done);
          ...
      #else
          ...
      #endif
          exclusive_owner_.store(SafeGetTid(self), std::memory_order_relaxed);
          RegisterAsLocked(self);
        }
        recursion_count_++;
        ...
      }
      

      ART虚拟机Check Point机制:Java线程执行指令时,会时常检查标记位as_struct,如果有变化,就需要让线程先去执行标记位中标记的一些事情,这些事情包括GC等等,这里表现出来的就是程序卡顿、无响应;Check Point机制是为了让线程工作过程当中,可以有转头执行更重要的事情的机会;

      • art::futex()函数使用系统调用,即syscall()函数,指令是SYS_futex(定义在art/runtime/mutex-inl.h文件头部);
      #if ART_USE_FUTEXES
      static inline int futex(volatile int *uaddr, int op, int val, const struct timespec *timeout,
                              volatile int *uaddr2, int val3) {
        return syscall(SYS_futex, uaddr, op, val, timeout, uaddr2, val3);
      }
      #endif  // ART_USE_FUTEXES
      
      • uaddr:用户态下的一块共享内存的地址,内部保存了一个计数器;
      • op:是操作类型,Mutex的实现使用了FUTEX_WAIT_PRIVATE,意思是原子性的检查if(uaddr中的计数器的值==val) ? 让线程挂起,直到timeout : null,也就是说把线程挂到一个等待队列中;
    3. Linux操作系统的【系统调用futex】:

      • Futex:Fast Userspace Mutexes的缩写,快速用户空间互斥体;
      • 传统的进程同步机制semaphores, msgqueues, sockets等,都是对内核空间中的一个内核对象(这个内核对象提供了原子操作)进行操作,这个内核对象对于所有要同步的进程都是可见的,当进程间需要同步的时候,都需要通过系统调用在内核空间完成一些操作,比如操作上述的内核对象,这种系统调用是很耗费资源的;
      • 但发现,很多时候,同步是没有竞争的,就是说一个进程进入互斥区,到退出互斥区,往往没有其他进程要抢着进入互斥区,这种情况下进程仍然需要通过系统调用,操作内核空间的对象,比较浪费,所以为了降低这种消耗,Futex机制诞生;
      • 其实说白了,Futex机制目的就是减少进程同步时对内核空间的不必要的访问,从而减少资源消耗,Futex是一种用户态和内核态混合的同步机制;
      • Futex机制是通过mmap分配一段共享内存,futex变量保存在这段共享内存中,当进程准备进入/退出互斥区时,先去共享内存中查看futex变量的值,而不是通过系统调用直接操作内核空间的对象。如果没竞争,则只需要修改共享内存中的futex变量,如果futex变量的值表示有进程间的竞争,则还是得通过系统调用去完成相应的操作;
      • 对于线程,情况比较简单,因为线程共享虚拟内存空间,虚拟地址就可以唯一的标识出futex变量,即线程用同样的虚拟地址来访问futex变量;
      • 所以Futex在程序low-contention的时候能获得比传统同步机制更好的性能;
      • 【维基百科上这么描述】:Futex 由一块能够被多个进程共享的内存空间(一个对齐后的整型变量)组成;这个整型变量的值能够通过汇编语言调用CPU提供的原子操作指令来增加或减少,并且一个进程可以等待直到那个值变成正数。Futex 的操作几乎全部在用户空间完成;只有当操作结果不一致从而需要仲裁时,才需要进入操作系统内核空间执行。这种机制允许使用 futex 的锁定原语有非常高的执行效率:由于绝大多数的操作并不需要在多个进程之间进行仲裁,所以绝大多数操作都可以在应用程序空间执行,而不需要使用(相对高代价的)内核系统调用;
    4. 总结:

      • Android操作系统中的多线程同步,是使用Android【系统调用futex机制】实现的;
      • 系统调用futex在ART虚拟机Mutex互斥锁中的应用:1)支持对线程的睡眠与唤醒的原子操作;2)管理线程挂起时的等待队列;

    1.5 汇编实现

    1. 在前面1.3.11中留了2个问题:【1】monitorenter和monitoerexit这两条指令在哪生成的?【2】他们的执行入口到底在哪?不同CPU架构,执行不同的汇编代码,下面以ARM架构简单描述一下,有兴趣也可以对比x86的实现,更容易理解;

    2. 【1】先看如何生成这两条指令的,ART虚拟机中dex2oat工具会对dex进行优化(涉及到ART虚拟机的编译原理,足够单独拿出来一大章进行描述,也会在后续的volatile章节中介绍如何使用dex2oat工具),经过dex2oat工具编译后,monitorenter和monitorexit指令将转换为HMonitorOperation IR(Intermediate Representation: 中间代码)对象,该对象会生成对应的机器码,生成机器码的函数是InstructionCodeGeneratorARMVIXL::VisitMonitorOperation(),定义在art/compiler/optimizing/code_generator_arm_vixl.cc文件中:

      void InstructionCodeGeneratorARMVIXL::VisitMonitorOperation(HMonitorOperation* instruction) {
        // 1. 如果是monitorenter,则传入kQuickLockObject,monitorexit的话传入kQuickUnlockObject
        // 2. ARM架构下,kQuickLockObject指向汇编函数art_quick_lock_object(定义在art/runtime/entrypoints/quick/quick_default_externs.h文件中),
        //    即art_quick_lock_object函数由汇编代码实现;kQuickUnlockObject指向汇编函数art_quick_unlock_object
        codegen_->InvokeRuntime(instruction->IsEnter() ? kQuickLockObject : kQuickUnlockObject,
                                instruction,
                                instruction->GetDexPc());
        ...
      }
      

      vixl 是安卓系统使用的针对ARMv8 架构的动态代码生成工具,x86架构对应的汇编程序在code_generator_x86.cc文件中;

      • 上述两个汇编函数的原型定义在art/runtime/entrypoints/quick/quick_default_enterns.h文件中,函数的参数都是Object对象,即线程争用的对象:
      // Lock entrypoints.
      extern "C" void art_quick_lock_object(art::mirror::Object*);
      extern "C" void art_quick_unlock_object(art::mirror::Object*);
      
    3. 【2】再看在哪被执行,由于篇幅过长,下面只介绍lock过程的实现,需要继续找汇编代码,在art/runtime/arch/arm/quick_entrypoints_arm.S文件的art_quick_lock_object方法,.S文件是汇编程序文件格式:

      ENTRY art_quick_lock_object
          ldr    r1, [rSELF, #THREAD_ID_OFFSET]
          // cbz指令:把r0的值和0比较,如果相等,则执行后面的.Lslow_lock指令
          cbz    r0, .Lslow_lock
      .Lretry_lock:
          ...
      .Lnot_unlocked:  @ r2: original lock word, r1: thread_id, r3: r2 ^ r1
          ...
      .Llock_strex_fail:
          b      .Lretry_lock               @ retry
      END art_quick_lock_object
      
      ENTRY art_quick_lock_object_no_inline
          ...
      .Lslow_lock:
          SETUP_SAVE_REFS_ONLY_FRAME r1     @ save callee saves in case we block
          mov    r1, rSELF                  @ pass Thread::Current
          // 注意这里!bl指令:跳转到artLockObjectFromCode函数
          bl     artLockObjectFromCode      @ (Object* obj, Thread*)
          RESTORE_SAVE_REFS_ONLY_FRAME
          REFRESH_MARKING_REGISTER
          RETURN_IF_RESULT_IS_ZERO
          DELIVER_PENDING_EXCEPTION
      END art_quick_lock_object_no_inline
      
      • 当跳转到.Lslow_lock后,会执行artLockObjectFromCode函数,该函数在art/runtime/entrypoints/quick/quick_lock_entrypoints.cc文件中:
      extern "C" int artLockObjectFromCode(mirror::Object* obj, Thread* self)
          NO_THREAD_SAFETY_ANALYSIS
          REQUIRES(!Roles::uninterruptible_)
          REQUIRES_SHARED(Locks::mutator_lock_) /* EXCLUSIVE_LOCK_FUNCTION(Monitor::monitor_lock_) */ {
        ScopedQuickEntrypointChecks sqec(self);
        if (UNLIKELY(obj == nullptr)) {
          ...
        } else {
          // 就是这,终于连起来了...
          ObjPtr<mirror::Object> object = obj->MonitorEnter(self);  // May block
          ...
          return 0;
        }
      }
      

      x86架构的汇编代码中,使用lock cmpxchg命令:(LOCK_IF_MP)-> lock cmpxchg ,即如果是多核CPU,则在执行CAS时加上操作系统的lock指令;其中【上锁操作】最终锁的是总线,多个CPU访问同一块内存是通过总线访问;单核就不加lock命令了;意思是"lock"命令保证"cmpxchg"操作是一个原子性操作,不允许其他CPU打断cmpxchg操作(即:CAS写一块内存时,不允许其他CPU访问这块内存)

    1.6 总结

    • Java代码添加synchronized修饰符后,编译器编译成字节码时,会在进入同步代码块前添加monitorenter指令,退出代码块时添加monitorexit指令;
    • 在机器码执行模式下,monitorenter这个Java指令对应的汇编代码被执行时,执行了quick_lock_entrypoints.cc文件中的artLockObjectFromCode()函数;
    • artLockObjectFromCode()函数调用了obj->MonitorEnter(self)函数,内部调用Monitor::MonitorEnter()函数;
    • Monitor::MonitorEnter()函数根据判断,如果同一个线程获取锁,则更新lock count;如果多线程争用锁,则看情况,严重的话使用ART提供的Mutex互斥锁;
    • 而Mutex的实现是使用了系统调用futex()函数,所以在多线程争用锁的时候,最终是由Linux的系统调用futex()保证多线程同步机制的;

    2 参考

    Java核心技术 卷I
    深入理解Android Java虚拟机ART
    Linux进程同步机制-Futex

    相关文章

      网友评论

          本文标题:Android多线程同步-synchronized

          本文链接:https://www.haomeiwen.com/subject/jwumkltx.html