美文网首页
Google Abseil 源码阅读笔记:Mutex

Google Abseil 源码阅读笔记:Mutex

作者: 找不到工作 | 来源:发表于2020-09-11 23:17 被阅读0次

    mutex 可以防止多个线程同时访问共享的资源时的 race condition。

    absl::Mutex 和 std::mutex

    absl::Mutex 相比于 std::mutex 额外提供了以下功能:

    • absl::Mutex 中添加了 Mutex::Await()Mutex::LockWhen() 以允许等待某个条件,并且不需要条件变量。因此不需要写while()循环,也不需要signal。
    • absl::Mutex 支持死锁检测。
    • absl::Mutex 可以使用 ReaderLock()ReaderUnlock() 实现读写锁的行为。

    std::mutex 相似,absl::Mutex 不是可重入的。它也不提供严格的 FIFO 行为。

    absl::Mutex

    absl::Mutex 类中有很多方法,但是数据成员只有一个 std::atomic<intptr_t>

    // constructors and destructor
    class ABSL_LOCKABLE Mutex {
     public:
      Mutex();
      explicit constexpr Mutex(absl::ConstInitType);
    
      ~Mutex();
    // other methods...
    
    private:
      std::atomic<intptr_t> mu_;  // The Mutex state.
    // other methods...
    };
    
    inline Mutex::Mutex() : mu_(0) {
      ABSL_TSAN_MUTEX_CREATE(this, __tsan_mutex_not_static);
    }
    
    inline constexpr Mutex::Mutex(absl::ConstInitType) : mu_(0) {}
    
    Mutex::~Mutex() {
      intptr_t v = mu_.load(std::memory_order_relaxed);
      if ((v & kMuEvent) != 0 && !DebugOnlyIsExiting()) {
        ForgetSynchEvent(&this->mu_, kMuEvent, kMuSpin);
      }
      if (kDebugMode) {
        this->ForgetDeadlockInfo();
      }
      ABSL_TSAN_MUTEX_DESTROY(this, __tsan_mutex_not_static);
    }
    

    mu_ 只是一堆 flag bit 组成的整形,为了跨平台选择了 intptr_t 作为类型。各 flag 位如下:

    // --------------------------Mutexes
    
    // In the layout below, the msb of the bottom byte is currently unused.  Also,
    // the following constraints were considered in choosing the layout:
    //  o Both the debug allocator's "uninitialized" and "freed" patterns (0xab and
    //    0xcd) are illegal: reader and writer lock both held.
    //  o kMuWriter and kMuEvent should exceed kMuDesig and kMuWait, to enable the
    //    bit-twiddling trick in Mutex::Unlock().
    //  o kMuWriter / kMuReader == kMuWrWait / kMuWait,
    //    to enable the bit-twiddling trick in CheckForMutexCorruption().
    static const intptr_t kMuReader      = 0x0001L;  // a reader holds the lock
    static const intptr_t kMuDesig       = 0x0002L;  // there's a designated waker
    static const intptr_t kMuWait        = 0x0004L;  // threads are waiting
    static const intptr_t kMuWriter      = 0x0008L;  // a writer holds the lock
    static const intptr_t kMuEvent       = 0x0010L;  // record this mutex's events
    // INVARIANT1:  there's a thread that was blocked on the mutex, is
    // no longer, yet has not yet acquired the mutex.  If there's a
    // designated waker, all threads can avoid taking the slow path in
    // unlock because the designated waker will subsequently acquire
    // the lock and wake someone.  To maintain INVARIANT1 the bit is
    // set when a thread is unblocked(INV1a), and threads that were
    // unblocked reset the bit when they either acquire or re-block
    // (INV1b).
    static const intptr_t kMuWrWait      = 0x0020L;  // runnable writer is waiting
                                                     // for a reader
    static const intptr_t kMuSpin        = 0x0040L;  // spinlock protects wait list
    static const intptr_t kMuLow         = 0x00ffL;  // mask all mutex bits
    static const intptr_t kMuHigh        = ~kMuLow;  // mask pointer/reader count
    

    各个 bit 作用如下:

    bit name comment
    0 kMuReader 是否有 reader 锁定
    1 kMuDesig 是否有已经唤醒的线程
    2 kMuWait 有等待获取锁的线程
    3 kMuWriter 是否有 writer 锁定
    4 kMuEvent 是否需要记录这个锁的事件(通过 log 等)
    5 kMuWrWait 是否有 writer 在等待获取锁
    6 kMuSpin 用来保护等待列表的自旋锁
    7 / 未使用
    8~15 高 8 位,保存 reader 的数量

    对于某些复杂的bit,在附录有更加详细的解释。

    互斥锁

    Mutex 的基本操作就是 Mutex::Lock()Mutex::Unlock()

    Lock()

    ABSL_XRAY_LOG_ARGS(1) void Mutex::Lock() {
      ABSL_TSAN_MUTEX_PRE_LOCK(this, 0);
      GraphId id = DebugOnlyDeadlockCheck(this);
      intptr_t v = mu_.load(std::memory_order_relaxed);
      // try fast acquire, then spin loop
      if ((v & (kMuWriter | kMuReader | kMuEvent)) != 0 ||
          !mu_.compare_exchange_strong(v, kMuWriter | v,
                                       std::memory_order_acquire,
                                       std::memory_order_relaxed)) {
        // try spin acquire, then slow loop
        if (!TryAcquireWithSpinning(&this->mu_)) {
          this->LockSlow(kExclusive, nullptr, 0);
        }
      }
      DebugOnlyLockEnter(this, id);
      ABSL_TSAN_MUTEX_POST_LOCK(this, 0, 0);
    }
    

    加锁的流程是:

    1. 尝试用 CAS 操作获取锁
    2. 失败则用 spin loop 尝试获取 N 次
    3. 还是失败则使用更慢的方式获取

    这里需要先讲一个比较复杂的函数compare_exchange_strong()

    bool compare_exchange_strong( T& expected, T desired,
                                  std::memory_order success, 
                                  std::memory_order failure ) noexcept;
    

    这个函数包含了很多操作,文档中说:

    Atomically compares the object representation (until C++20) value representation (since C++20) of *this with that of expected, and if those are bitwise-equal, replaces the former with desired (performs read-modify-write operation). Otherwise, loads the actual value stored in *this into expected (performs load operation).

    Lock() 的源码中:

    1. 首先获取当前的 mu_,存入变量 v
    2. 如果 mu_ 等于 v ,则将mu_kMuWriter 位置1(使用std::memory_order_acquire),以获取锁,返回 true;否则,将当前最新的 mu_ 值存入 v(使用 std::memory_order_relax),返回 false。

    注意,1 操作和 2 操作之间是可以被其他线程抢占的,因此 2 操作是可能返回 false 的。

    如果失败,则继续尝试。这里不再展开讲,例如 spin loop 实际就是多次(对多核机器1500次)尝试 CAS 操作。

    如果 spin loop 也是失败,那就进入最复杂的函数

    void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags);
    

    这个函数非常复杂,具体分析见附录。

    大体上,这个函数在一个 for 循环中,不断尝试加锁,若失败则调用 synchronization_internal::MutexDelay() 。主要逻辑(对于 writer):

    1. 如果符合获取锁的条件:
      • 尝试使用 CAS 操作。
    2. 如果不符合,则:
      • 获取自旋锁;
      • 加入等待列表;
      • 释放自选锁;
      • 阻塞直到从等待列表中取出并处理。

    之所以要加入等待列表,是因为该线程现在无法继续执行。可能是因为阻塞在 io,也可能因为被条件变量阻塞,所以需要等待。

    // Attempt to acquire *mu, and return whether successful.  The implementation
    // may spin for a short while if the lock cannot be acquired immediately.
    static bool TryAcquireWithSpinning(std::atomic<intptr_t>* mu) {
      int c = GetMutexGlobals().spinloop_iterations;
      do {  // do/while somewhat faster on AMD
        intptr_t v = mu->load(std::memory_order_relaxed);
        if ((v & (kMuReader|kMuEvent)) != 0) {
          return false;  // a reader or tracing -> give up
        } else if (((v & kMuWriter) == 0) &&  // no holder -> try to acquire
                   mu->compare_exchange_strong(v, kMuWriter | v,
                                               std::memory_order_acquire,
                                               std::memory_order_relaxed)) {
          return true;
        }
      } while (--c > 0);
      return false;
    }
    

    Unlock()

    Unlock() 函数试图通过将 kMuWrWaitkMuWriter 置 0 从而释放锁。释放锁相比获取锁要复杂,因为释放时可能根据需要唤醒等待的线程。

    ABSL_XRAY_LOG_ARGS(1) void Mutex::Unlock() {
      ABSL_TSAN_MUTEX_PRE_UNLOCK(this, 0);
      DebugOnlyLockLeave(this);
      intptr_t v = mu_.load(std::memory_order_relaxed);
    
      if (kDebugMode && ((v & (kMuWriter | kMuReader)) != kMuWriter)) {
        ABSL_RAW_LOG(FATAL, "Mutex unlocked when destroyed or not locked: v=0x%x",
                     static_cast<unsigned>(v));
      }
    
      // should_try_cas is whether we'll try a compare-and-swap immediately.
      // NOTE: optimized out when kDebugMode is false.
      bool should_try_cas = ((v & (kMuEvent | kMuWriter)) == kMuWriter &&
                              (v & (kMuWait | kMuDesig)) != kMuWait);
      // But, we can use an alternate computation of it, that compilers
      // currently don't find on their own.  When that changes, this function
      // can be simplified.
      intptr_t x = (v ^ (kMuWriter | kMuWait)) & (kMuWriter | kMuEvent);
      intptr_t y = (v ^ (kMuWriter | kMuWait)) & (kMuWait | kMuDesig);
      // Claim: "x == 0 && y > 0" is equal to should_try_cas.
      // Also, because kMuWriter and kMuEvent exceed kMuDesig and kMuWait,
      // all possible non-zero values for x exceed all possible values for y.
      // Therefore, (x == 0 && y > 0) == (x < y).
      if (kDebugMode && should_try_cas != (x < y)) {
        // We would usually use PRIdPTR here, but is not correctly implemented
        // within the android toolchain.
        ABSL_RAW_LOG(FATAL, "internal logic error %llx %llx %llx\n",
                     static_cast<long long>(v), static_cast<long long>(x),
                     static_cast<long long>(y));
      }
      if (x < y &&
          mu_.compare_exchange_strong(v, v & ~(kMuWrWait | kMuWriter),
                                      std::memory_order_release,
                                      std::memory_order_relaxed)) {
        // fast writer release (writer with no waiters or with designated waker)
      } else {
        this->UnlockSlow(nullptr /*no waitp*/);  // take slow path
      }
      ABSL_TSAN_MUTEX_POST_UNLOCK(this, 0);
    }
    

    非常值得注意的是这里会利用 kMuDesig 来判断能否仅用 CAS 操作快速释放锁。具体判断标准是:

    1. 该 Mutex 目前被 writer 锁定(kMuWriter),并且没有 logging event 需要处理(kMuEvent),并且:
    2. 该 Mutex 上没有其他线程正在等待获取锁(kMuWait)。或者是有等待线程,但是同时也有指定的唤醒线程(kMuDesig)。
      // should_try_cas is whether we'll try a compare-and-swap immediately.
      // NOTE: optimized out when kDebugMode is false.
      bool should_try_cas = ((v & (kMuEvent | kMuWriter)) == kMuWriter &&
                              (v & (kMuWait | kMuDesig)) != kMuWait);
    

    对于 kMuDesig 的细节见附录。

    读写锁

    absl::Mutex 也可以用读写锁,读写锁一般用于读多写少的场景。读的一方通过 ReaderLock()ReaderUnlock() 使用这个锁。写的一方还是需要通过 Lock()Unlock() 使用。

    读写锁保证在同一个时刻:

    • 至多有一个 writer,如果有一个 writer 不能有任何 reader
    • 若没有 writer,可以多个 reader 同时读取

    ReaderLock()

    ABSL_XRAY_LOG_ARGS(1) void Mutex::ReaderLock() {
      ABSL_TSAN_MUTEX_PRE_LOCK(this, __tsan_mutex_read_lock);
      GraphId id = DebugOnlyDeadlockCheck(this);
      intptr_t v = mu_.load(std::memory_order_relaxed);
      // try fast acquire, then slow loop
      if ((v & (kMuWriter | kMuWait | kMuEvent)) != 0 ||
          !mu_.compare_exchange_strong(v, (kMuReader | v) + kMuOne,
                                       std::memory_order_acquire,
                                       std::memory_order_relaxed)) {
        this->LockSlow(kShared, nullptr, 0);
      }
      DebugOnlyLockEnter(this, id);
      ABSL_TSAN_MUTEX_POST_LOCK(this, __tsan_mutex_read_lock, 0);
    }
    

    这个函数在每次调用时会把 reader 数量增加 1。reader 数量保存在高 8 位。

    ReaderUnlock()

    ABSL_XRAY_LOG_ARGS(1) void Mutex::ReaderUnlock() {
      ABSL_TSAN_MUTEX_PRE_UNLOCK(this, __tsan_mutex_read_lock);
      DebugOnlyLockLeave(this);
      intptr_t v = mu_.load(std::memory_order_relaxed);
      assert((v & (kMuWriter|kMuReader)) == kMuReader);
      if ((v & (kMuReader|kMuWait|kMuEvent)) == kMuReader) {
        // fast reader release (reader with no waiters)
        intptr_t clear = ExactlyOneReader(v) ? kMuReader|kMuOne : kMuOne;
        if (mu_.compare_exchange_strong(v, v - clear,
                                        std::memory_order_release,
                                        std::memory_order_relaxed)) {
          ABSL_TSAN_MUTEX_POST_UNLOCK(this, __tsan_mutex_read_lock);
          return;
        }
      }
      this->UnlockSlow(nullptr /*no waitp*/);  // take slow path
      ABSL_TSAN_MUTEX_POST_UNLOCK(this, __tsan_mutex_read_lock);
    }
    

    这个函数就是每次调用将 reader 数量减少 1。
    值得注意的点:

    • assert((v & (kMuWriter|kMuReader)) == kMuReader);
      仅在没有 writer 的时候调用

    附录

    记录一些过于细节的内容。

    一些不太好理解的 flags

    Mutex 实际就是一个整形,它的每个 bit 都是一个 flag。其中许多 flag 意义不是很好理解,这里挨个详细说明。

    kMuEvent

    一个 flag 标记我们是否需要记录这个 Mutex 的事件(例如是否需要 logging)。例如当我们启用 Debug Log 的时候就会置1:

    void Mutex::EnableDebugLog(const char *name) {
      SynchEvent *e = EnsureSynchEvent(&this->mu_, name, kMuEvent, kMuSpin);
      e->log = true;
      UnrefSynchEvent(e);
    }
    

    kMuWait 以及 kMuWrWait

    kMuWait 是一个 flag 标记是否有等待获取锁的线程。kMuWrWait 是明确表明正在等待的线程是否是一个 writer。
    从以下代码可以看出:

          if ((v & (kMuSpin|kMuWait)) == 0) {   // no waiters
            // This thread tries to become the one and only waiter.
            PerThreadSynch *new_h = Enqueue(nullptr, waitp, v, flags);
            intptr_t nv = (v & zap_desig_waker[flags & kMuHasBlocked] & kMuLow) |
                          kMuWait;
            ABSL_RAW_CHECK(new_h != nullptr, "Enqueue to empty list failed");
            if (waitp->how == kExclusive && (v & kMuReader) != 0) {
              nv |= kMuWrWait;
            }
            if (mu_.compare_exchange_strong(
                    v, reinterpret_cast<intptr_t>(new_h) | nv,
                    std::memory_order_release, std::memory_order_relaxed)) {
              dowait = true;
            } else {            // attempted Enqueue() failed
              // zero out the waitp field set by Enqueue()
              waitp->thread->waitp = nullptr;
            }
          } 
    

    这里是第一次加入新的等待线程,可以看到使用 compare_exchange_strongkMuWait 置为 1。同时,如果该线程是一个 writer,需要排他性地获取锁(不能允许有 reader 的存在),则还需要把 kMuWrWait 置1。

    kMuSpin

    这是一个自旋锁。当操作等待线程信息 PerThreadSynch* 组成的链表时使用。例如:

          } else if ((v & kMuSpin) == 0 &&  // attempt to queue ourselves
                     mu_.compare_exchange_strong(
                         v, (v & zap_desig_waker[flags & kMuHasBlocked]) | kMuSpin |
                                kMuWait,
                         std::memory_order_acquire, std::memory_order_relaxed)) {
            PerThreadSynch *h = GetPerThreadSynch(v);
            PerThreadSynch *new_h = Enqueue(h, waitp, v, flags);
            intptr_t wr_wait = 0;
            ABSL_RAW_CHECK(new_h != nullptr, "Enqueue to list failed");
            if (waitp->how == kExclusive && (v & kMuReader) != 0) {
              wr_wait = kMuWrWait;      // give priority to a waiting writer
            }
            do {                        // release spinlock
              v = mu_.load(std::memory_order_relaxed);
            } while (!mu_.compare_exchange_weak(
                v, (v & (kMuLow & ~kMuSpin)) | kMuWait | wr_wait |
                reinterpret_cast<intptr_t>(new_h),
                std::memory_order_release, std::memory_order_relaxed));
            dowait = true;
          }
    

    这里是典型的 spinlock 用法。首先以 std::memory_order_acquire 的 CAS 操作获取自旋锁,再在 while 中通过 std::memory_order_release 释放自旋锁。

    kMuDesig

    这个 flag 用于标记是否已经唤醒了等待获取锁的线程。我们在上文中已经提到,将一个 Mutex 的 kMuDesig 位置 1,其他需要解锁的线程可以在 Unlock() 时采用更快的 CAS 操作,获得性能上的提升。

    例如,现在线程 A,B,C 都在竞争一个锁 mu。A 抢到了,B 和 C 都陷入了 sleep 等待,此时 kMuDesig=0。A 随后调用解锁 Unlock(),由于不满足 CAS 条件:

      bool should_try_cas = ((v & (kMuEvent | kMuWriter)) == kMuWriter &&
                              (v & (kMuWait | kMuDesig)) != kMuWait);
    

    A 将通过 UnlockSlow() 方法,设置 kMuDesig=1,并在解锁时唤醒 B 和 C。

    假设此后 B 获得了锁。B 在解锁时由于 kMuDesig 还是 1,可以直接通过 CAS 解锁。若此时 C 还在通过TryAcquireWithSpinning() 获取锁,未将自己再次加入等待线程列表,那么 C 可以立即获取锁。

    这样做的好处避免让线程频繁进入 sleep-wait-wake,能用 CAS 和 spin 解决可以快很多。

    何时置 1

    UnlockSlow() 中:

    1. 寻找到解锁后打算唤醒的等待线程 w,以及它的前一个等待线程 pw
    2. 将循环链表 w->...->h(不包含 h.next->...->pw)的所有可唤醒的线程都加入 wake_list
    3. kMuDesig 置1
    4. 跳出死循环 for(;;)
    5. 依次唤醒 wake_list 中的等待线程。
          // The first (and perhaps only) waiter we've chosen to wake is w, whose
          // predecessor is pw.  If w is a reader, we must wake all the other
          // waiters with wake==true as well.  We may also need to queue
          // ourselves if waitp != null.  The spinlock and the lock are still
          // held.
    
          // This traverses the list in [ pw->next, h ], where h is the head,
          // removing all elements with wake==true and placing them in the
          // singly-linked list wake_list.  Returns the new head.
          h = DequeueAllWakeable(h, pw, &wake_list);
    
          intptr_t nv = (v & kMuEvent) | kMuDesig;
                                                 // assume no waiters left,
                                                 // set kMuDesig for INV1a
    
          if (waitp != nullptr) {  // we must queue ourselves and sleep
            h = Enqueue(h, waitp, v, kMuIsCond);
            // h is new last waiter; could be null if we queued ourselves on a
            // CondVar
          }
    
          ABSL_RAW_CHECK(wake_list != kPerThreadSynchNull,
                         "unexpected empty wake list");
    
          if (h != nullptr) {  // there are waiters left
            h->readers = 0;
            h->maybe_unlocking = false;     // finished unlocking
            nv |= wr_wait | kMuWait | reinterpret_cast<intptr_t>(h);
          }
    
          // release both spinlock & lock
          // can release with a store because there were waiters
          mu_.store(nv, std::memory_order_release);
          break;  // out of for(;;)-loop
        }
    

    如何确定可以唤醒的线程呢?有两种情况:

    • 若没有 writer,则可以唤醒所有的 reader
    • 若有 writer,则唤醒到第一个 writer 为止

    无论哪种情况,我们都在 UnlockSlow() 执行结束后,唤醒了等待的线程。只要有可以运行的线程,其他的线程就不必再去遍历等待线程的队列。所以我们此时标记好“已经指派好唤醒的线程了”(kMuDesig),通知其他线程在 Unlock() 时不必再进入 UnlockSlow() 去遍历等待线程的队列。

    何时清 0

    在加锁时,如果该 Mutex 刚 block 过某个线程,说明需要更新等待中的线程,于是将 kMuDesig 清 0,让下次 Unlock() 时线程能够通过 UnlockSlow() 遍历等待的线程。

    首先定义了两个 bitmask,用于和 mu_ 按位与。

    // The zap_desig_waker bitmask is used to clear the designated waker flag in
    // the mutex if this thread has blocked, and therefore may be the designated
    // waker.
    static const intptr_t zap_desig_waker[] = {
        ~static_cast<intptr_t>(0),  // not blocked
        ~static_cast<intptr_t>(
            kMuDesig)  // blocked; turn off the designated waker bit
    };
    

    我们总是固定使用 [flags & kMuHasBlocked] 来选择使用哪个 bitmask。在获取锁时,如果 Mutex 已经 block 过某个线程(kMuHasBlocked),我们选择 ~kMuDesig bitmask 来将它的 kMuDesig 位置 0,否则保留原来的值。例如:

    bool Mutex::LockSlowWithDeadline(MuHow how, const Condition *cond,
                                     KernelTimeout t, int flags) {
      intptr_t v = mu_.load(std::memory_order_relaxed);
      bool unlock = false;
      if ((v & how->fast_need_zero) == 0 &&  // try fast acquire
          mu_.compare_exchange_strong(
              v, (how->fast_or | (v & zap_desig_waker[flags & kMuHasBlocked])) +
                     how->fast_add,
              std::memory_order_acquire, std::memory_order_relaxed)) {
        if (cond == nullptr ||
            EvalConditionAnnotated(cond, this, true, false, how == kShared)) {
          return true;
        }
        unlock = true;
      }
      SynchWaitParams waitp(
          how, cond, t, nullptr /*no cvmu*/, Synch_GetPerThreadAnnotated(this),
          nullptr /*no cv_word*/);
      if (!Condition::GuaranteedEqual(cond, nullptr)) {
        flags |= kMuIsCond;
      }
      if (unlock) {
        this->UnlockSlow(&waitp);
        this->Block(waitp.thread);
        flags |= kMuHasBlocked;
      }
      this->LockSlowLoop(&waitp, flags);
      return waitp.cond != nullptr ||  // => cond known true from LockSlowLoop
             cond == nullptr ||
             EvalConditionAnnotated(cond, this, true, false, how == kShared);
    }
    

    这个函数主要做了:

    1. 尝试 CAS 操作获取锁
    2. 若获取成功,并且该 Mutex 不用于条件变量,或者条件变量成立,则立即返回。

    否则,说明该 Mutex 是条件变量绑定的,并且现在条件并未满足。此时我们需要:

    1. 尝试解锁,并将自身加入等待队列
    2. 阻塞直到自身被唤醒
    3. 再次尝试加锁,同时清 0 kMuDesig,在此后 Unlock() 时将唤醒等待中的线程。

    这也说明了条件变量的工作原理(加锁-检查失败-解锁-等待唤醒-加锁-检查成功-执行-解锁-通知),可以参见官方文档的一个 例子

    线程 main,worker 和条件变量 cv。

    1. worker 获取锁检查是否 ready==true,若不符合则解锁等待。
    2. main 会修改 ready 的值,修改完成会通过 notify_one() 唤醒 worker,并等待 processed==true
    3. worker 获得锁,执行结束后,设置好 processed=true,然后再 notify_one() 告诉 main。
    4. main 继续执行到结束。

    操作锁的不同模式

    对于互斥锁和读写锁,在加锁/解锁时,对当前的状态有不同的要求。

    // This struct contains various bitmasks to be used in
    // acquiring and releasing a mutex in a particular mode.
    struct MuHowS {
      // if all the bits in fast_need_zero are zero, the lock can be acquired by
      // adding fast_add and oring fast_or.  The bit kMuDesig should be reset iff
      // this is the designated waker.
      intptr_t fast_need_zero;
      intptr_t fast_or;
      intptr_t fast_add;
    
      intptr_t slow_need_zero;  // fast_need_zero with events (e.g. logging)
    
      intptr_t slow_inc_need_zero;  // if all the bits in slow_inc_need_zero are
                                    // zero a reader can acquire a read share by
                                    // setting the reader bit and incrementing
                                    // the reader count (in last waiter since
                                    // we're now slow-path).  kMuWrWait be may
                                    // be ignored if we already waited once.
    };
    
    

    其中,need_zero 表示需要这些值为 0 才能获得锁。

    互斥模式

    static const MuHowS kExclusiveS = {
        // exclusive or write lock
        kMuWriter | kMuReader | kMuEvent,  // fast_need_zero
        kMuWriter,                         // fast_or
        0,                                 // fast_add
        kMuWriter | kMuReader,             // slow_need_zero
        ~static_cast<intptr_t>(0),         // slow_inc_need_zero
    };
    

    使用 Lock() 获取锁时,要求不能存在其他 reader 或者 writer,所以 kMuWriter | kMuReader 要为 0。

    获取锁后需要把 writer 置 1,所以 fast_or = kMuWriter

    不需要计数,所以 fast_add = 0

    只读模式

    static const MuHowS kSharedS = {
        // shared or read lock
        kMuWriter | kMuWait | kMuEvent,   // fast_need_zero
        kMuReader,                        // fast_or
        kMuOne,                           // fast_add
        kMuWriter | kMuWait,              // slow_need_zero
        kMuSpin | kMuWriter | kMuWrWait,  // slow_inc_need_zero
    };
    

    使用 ReaderLock() 获取锁时,要求不能存在 writer,但是允许存在其他 reader。

    参考

    1. Google Abseil
    2. absl::Mutex Design Notes
    3. Abseil Synchronization Guide
    4. compare_exchange

    相关文章

      网友评论

          本文标题:Google Abseil 源码阅读笔记:Mutex

          本文链接:https://www.haomeiwen.com/subject/spfssktx.html