美文网首页
brpc之mutex源码分析

brpc之mutex源码分析

作者: fooboo | 来源:发表于2019-08-04 23:36 被阅读0次

    这篇分析的不完整,没有彻底弄明白。在此brpc之前,基本所有使用过或者分析过的mutex只是纯粹的封装下接口,使用类似scope_mutex这种手法(raii)构造时加锁,析构时解锁这样的简单形式。

    早期分析过一篇brpc中关于定时器和futex实现brpc中定时器的实现
    ,其中对于成员变量internal::FastPthreadMutex _mutex则是简单的跳过未作详细分析,因为都知道mutex用于同步临界区使的此时只有一个工作线程在里面操作,不至于出现数据的不一致性,对于linux版本的mutex的实现,这里有linux 2.6 互斥锁的实现-源码分析,网上大部分都是转来转去,抄来抄去,有兴趣的可以再对着源码分析下mutex,这里主要是分析brpc中的mutex实现。

    从声明开始,这里有两种实现,先看第二种,第一种后面再简单介绍下,futex相关的实现参考上面的链接:

     71 namespace internal {
     72 #ifdef BTHREAD_USE_FAST_PTHREAD_MUTEX
     73 class FastPthreadMutex {
     74 public:
     75     FastPthreadMutex() : _futex(0) {}
     76     ~FastPthreadMutex() {}
     77     void lock();
     78     void unlock();
     79     bool try_lock();
     80 private:
     81     DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex);
     82     int lock_contended();
     83     unsigned _futex;
     84 };  
     85 #else
     86 typedef butil::Mutex FastPthreadMutex;
     87 #endif
     88 }
    
     26 #if !defined(BUTIL_CXX11_ENABLED)
     27 #define BUTIL_DELETE_FUNCTION(decl) decl
     28 #else
     29 #define BUTIL_DELETE_FUNCTION(decl) decl = delete
     30 #endif
    
     42 #define DISALLOW_COPY_AND_ASSIGN(TypeName)                      \
     43     BUTIL_DELETE_FUNCTION(TypeName(const TypeName&));            \
     44     BUTIL_DELETE_FUNCTION(void operator=(const TypeName&))
    

    对于mutex是禁止拷贝赋值操作的,由于编译器会在某些情况下(认为它需要的时候)合成构造拷贝等几个特殊成员函数,c++11版本以上使用delete关键字即可,否则只能声明为private不定义,链接时出现error以示错误。

     45 // NOTE: Not aligned to cacheline as the container of Mutex is practically aligned
     46 class Mutex {
     47 public:
     48     typedef bthread_mutex_t* native_handler_type;
     49     Mutex() {
     50         int ec = bthread_mutex_init(&_mutex, NULL);
     51         if (ec != 0) {
     52             throw std::system_error(std::error_code(ec, std::system_category()), "Mutex construc    tor failed");
     53         }
     54     }
     55     ~Mutex() { CHECK_EQ(0, bthread_mutex_destroy(&_mutex)); }
     56     native_handler_type native_handler() { return &_mutex; }
     57     void lock() {
     58         int ec = bthread_mutex_lock(&_mutex);
     59         if (ec != 0) {
     60             throw std::system_error(std::error_code(ec, std::system_category()), "Mutex lock fai    led");
     61         }
     62     }
     63     void unlock() { bthread_mutex_unlock(&_mutex); }
     64     bool try_lock() { return !bthread_mutex_trylock(&_mutex); }
     66 private:
     67     DISALLOW_COPY_AND_ASSIGN(Mutex);
     68     bthread_mutex_t _mutex;
     69 };
    
    155 typedef struct {
    156     int64_t duration_ns;
    157     size_t sampling_range;
    158 } bthread_contention_site_t;
    159 
    160 typedef struct {
    161     unsigned* butex;
    162     bthread_contention_site_t csite;
    163 } bthread_mutex_t;
    

    以上便是mutex类声明,对象池ObjectPool实现后期分析一下,初始化如下bthread_mutex_init

    701 int bthread_mutex_init(bthread_mutex_t* __restrict m,
    702                        const bthread_mutexattr_t* __restrict) {
    703     bthread::make_contention_site_invalid(&m->csite);
    704     m->butex = bthread::butex_create_checked<unsigned>();
    705     if (!m->butex) {
    706         return ENOMEM;
    707     }   
    708     *m->butex = 0; //初始化为0
    709     return 0;
    710 } 
    
    363 BUTIL_FORCE_INLINE void
    364 make_contention_site_invalid(bthread_contention_site_t* cs) {
    365     cs->sampling_range = 0;
    366 }
    
     39 // Check width of user type before casting.
     40 template <typename T> T* butex_create_checked() {
     41     BAIDU_CASSERT(sizeof(T) == sizeof(int), sizeof_T_must_equal_int);
     42     return static_cast<T*>(butex_create());
     43 }
    
    243 void* butex_create() {
    244     Butex* b = butil::get_object<Butex>();
    245     if (b) {
    246         return &b->value;
    247     } 
    248     return NULL;
    249 }
    
    116 struct BAIDU_CACHELINE_ALIGNMENT Butex {
    117     Butex() {}
    118     ~Butex() {}
    119     
    120     butil::atomic<int> value;
    121     ButexWaiterList waiters;//typedef butil::LinkedList<ButexWaiter> ButexWaiterList;
    122     internal::FastPthreadMutex waiter_lock;
    123 };
    

    上面的Butex类声明其实和mutex类差不多:

    struct mutex {
       atomic_t  count; //引用计数器,1: 所可以利用,小于等于0:该锁已被获取,需要等待
       spinlock_t  wait_lock;//自旋锁类型,保证多cpu下,对等待队列访问是安全的。
       struct list_head wait_list;//等待队列,如果该锁被获取,任务将挂在此队列上,等待调度。
    };
    

    其中ButexWaiterList为双向链表,简单起见这里使用ButexPthreadWaiter

     86 struct ButexWaiter : public butil::LinkNode<ButexWaiter> {
     87     // tids of pthreads are 0
     88     bthread_t tid;
     89 
     90     // Erasing node from middle of LinkedList is thread-unsafe, we need
     91     // to hold its container's lock.
     92     butil::atomic<Butex*> container;
     93 };
    
    106 // pthread_task or main_task allocates this structure on stack and queue it
    107 // in Butex::waiters. 
    108 struct ButexPthreadWaiter : public ButexWaiter {
    109     butil::atomic<int> sig;
    110 };
    
     84 template <typename T>
     85 class LinkNode {
    154  private:
    155   LinkNode<T>* previous_;
    156   LinkNode<T>* next_;
    158   DISALLOW_COPY_AND_ASSIGN(LinkNode);
    159 };
    
    729 int bthread_mutex_lock(bthread_mutex_t* m) { 
    730     bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
    731     if (!split->locked.exchange(1, butil::memory_order_acquire)) {
    732         return 0;
    733     }   
    734     // Don't sample when contention profiler is off.
    735     if (!bthread::g_cp) {
    736         return bthread::mutex_lock_contended(m);
    737     }
    738     //sample code...
    753 }
    
    615 // Implement bthread_mutex_t related functions
    616 struct MutexInternal {
    617     butil::static_atomic<unsigned char> locked;
    618     butil::static_atomic<unsigned char> contended;
    619     unsigned short padding;
    620 };
    

    以上是加锁实现,对unsigned* butex强转类型struct MutexInternal。由于初始为0,故第一次加锁是成功的,设置locked为1并返回为0。代码738到752是统计相关的,这里直接分析mutex_lock_contended,即当锁不可用时做哪些工作。

    622 const MutexInternal MUTEX_CONTENDED_RAW = {{1},{1},0};
    623 const MutexInternal MUTEX_LOCKED_RAW = {{1},{0},0};
    624 // Define as macros rather than constants which can't be put in read-only
    625 // section and affected by initialization-order fiasco.
    626 #define BTHREAD_MUTEX_CONTENDED (*(const unsigned*)&bthread::MUTEX_CONTENDED_RAW)
    627 #define BTHREAD_MUTEX_LOCKED (*(const unsigned*)&bthread::MUTEX_LOCKED_RAW)
    
    632 inline int mutex_lock_contended(bthread_mutex_t* m) {
    633     butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
    634     while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
    635         if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 &&
    636             errno != EWOULDBLOCK && errno != EINTR/*note*/) {
    637             // a mutex lock should ignore interrruptions in general since
    638             // user code is unlikely to check the return value.
    639             return errno;
    640         } 
    641     }
    642     return 0;
    643 }
    

    以上while会一直尝试加锁,如果加失败即返回值不为0,则尝试butex_wait,如果失败的话则提前返回,否则在成功的情况下会返回0。

    611 int butex_wait(void* arg, int expected_value, const timespec* abstime) {
    612     Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);//根据三个参数求出结构体首地址
    613     if (b->value.load(butil::memory_order_relaxed) != expected_value) {
    614         errno = EWOULDBLOCK;
    615         // Sometimes we may take actions immediately after unmatched butex,
    616         // this fence makes sure that we see changes before changing butex.
    617         butil::atomic_thread_fence(butil::memory_order_acquire);
    618         return -1;
    619     }
    620     TaskGroup* g = tls_task_group;
    621     if (NULL == g || g->is_current_pthread_task()) {
    622         return butex_wait_from_pthread(g, b, expected_value, abstime);
    623     }
    624     //ButexBthreadWaiter bbw code...;
    693     return 0;
    694 }
    

    这里当尝试获取锁的时候,在wait时,再次load对应的值,如果与BTHREAD_MUTEX_CONTENDED不相等则表示内容变化则重新加锁,否则进行butex_wait_from_pthread,不过在真正wait_pthread前,还会再判断一次。

    542 static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
    543                                    const timespec* abstime) {
    544     // sys futex needs relative timeout.
    545     // Compute diff between abstime and now.
    546     timespec* ptimeout = NULL;
    547     timespec timeout;
    548     if (abstime != NULL) {
    549         //more code...
    557     }       
    558             
    559     TaskMeta* task = NULL;
    560     ButexPthreadWaiter pw;
    561     pw.tid = 0;
    562     pw.sig.store(PTHREAD_NOT_SIGNALLED, butil::memory_order_relaxed);
    563     int rc = 0;
    564 
    565     if (g) {
    566         //
    568     }
    569     b->waiter_lock.lock();
    570     if (b->value.load(butil::memory_order_relaxed) != expected_value) {
    571         b->waiter_lock.unlock();
    572         errno = EWOULDBLOCK;
    573         rc = -1;
    574     } else if (task != NULL && task->interrupted) {
    575         //
    580     } else {
    581         b->waiters.Append(&pw);
    582         pw.container.store(b, butil::memory_order_relaxed);
    583         b->waiter_lock.unlock();
    584 
    589         rc = wait_pthread(pw, ptimeout);
    593     }
    594     if (task) {
    595         //more code...
    607     }
    608     return rc;
    609 }
    

    这里把自己加到waiters时再判断一次b->value.load(butil::memory_order_relaxed) != expected_value,接着:

    581         b->waiters.Append(&pw);
    582         pw.container.store(b, butil::memory_order_relaxed);
    583         b->waiter_lock.unlock();
    
    589         rc = wait_pthread(pw, ptimeout);
    
    140 int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
    141     while (true) {
    142         const int rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout);
    143         if (PTHREAD_NOT_SIGNALLED != pw.sig.load(butil::memory_order_acquire)) {
    144             // If `sig' is changed, wakeup_pthread() must be called and `pw'
    145             // is already removed from the butex.
    146             // Acquire fence makes this thread sees changes before wakeup.
    147             return rc;
    148         }   
    149         if (rc != 0 && errno == ETIMEDOUT) {
    150             //more code...
    165         }
    166     }
    167 }
    

    下面是bthread_mutex_unlock实现:

    787 int bthread_mutex_unlock(bthread_mutex_t* m) {
    788     butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
    789     bthread_contention_site_t saved_csite = {0, 0};
    790     if (bthread::is_contention_site_valid(m->csite)) {
    791         saved_csite = m->csite;
    792         bthread::make_contention_site_invalid(&m->csite);
    793     }
    794     const unsigned prev = whole->exchange(0, butil::memory_order_release);
    795     // CAUTION: the mutex may be destroyed, check comments before butex_create
    796     if (prev == BTHREAD_MUTEX_LOCKED) {
    797         return 0;//就自己不需要唤醒
    798     }
    799     // Wakeup one waiter 
    800     if (!bthread::is_contention_site_valid(saved_csite)) {
    801         bthread::butex_wake(whole);
    802         return 0;
    803     }
    804     //more code...
    809     return 0;
    810 }
    

    这里解锁,如果之前只有自己加锁,即没有其他在尝试while中通过BTHREAD_MUTEX_CONTENDED加锁成功的,表示不需要进行唤醒,否则需要尝试唤醒(因为自己加锁成功的值是BTHREAD_MUTEX_CONTENDED表示至少有两个线程(bthread))。

    265 int butex_wake(void* arg) { 
    266     Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
    267     ButexWaiter* front = NULL;
    268     {   
    269         BAIDU_SCOPED_LOCK(b->waiter_lock);
    270         if (b->waiters.empty()) {
    271             return 0;
    272         }
    273         front = b->waiters.head()->value();
    274         front->RemoveFromList();                                              
    275         front->container.store(NULL, butil::memory_order_relaxed);
    276     } 
    277     if (front->tid == 0) {
    278         wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
    279         return 1;
    280     }
    281     //ButexBthreadWaiter code...
    290 }
    

    这里从waiters取出第一个进行唤醒,由于ButexPthreadWaitertid=0

    128 static void wakeup_pthread(ButexPthreadWaiter* pw) {                          
    129     // release fence makes wait_pthread see changes before wakeup.
    130     pw->sig.store(PTHREAD_SIGNALLED, butil::memory_order_release);
    131     // At this point, wait_pthread() possibly has woken up and destroyed `pw'.
    132     // In which case, futex_wake_private() should return EFAULT.
    133     // If crash happens in future, `pw' can be made TLS and never destroyed
    134     // to solve the issue.
    135     futex_wake_private(&pw->sig, 1);
    136 }
    

    另外由于分析过程中并没有考虑到超时的变量,是为了简化起见,但程序中还是要列一下当有超时变量作用时,而wait该时长后超时:

    140 int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
    141     while (true) {
    142         const int rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout);
    143         //more code...
    149         if (rc != 0 && errno == ETIMEDOUT) {
    156             if (!erase_from_butex(&pw, false, WAITER_STATE_TIMEDOUT)) {
    159                 if (pw.sig.load(butil::memory_order_acquire) == PTHREAD_NOT_SIGNALLED) {
    160                     ptimeout = NULL; // already timedout, ptimeout is expired.
    161                     continue;
    162                 }
    163             }
    164             return rc;
    165         }
    166     }
    167 }
    
    462 inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) {
    463     // `bw' is guaranteed to be valid inside this function because waiter
    464     // will wait until this function being cancelled or finished.
    465     // NOTE: This function must be no-op when bw->container is NULL.
    466     bool erased = false;
    467     Butex* b;
    468     int saved_errno = errno;
    469     while ((b = bw->container.load(butil::memory_order_acquire))) {
    470         // b can be NULL when the waiter is scheduled but queued.
    471         BAIDU_SCOPED_LOCK(b->waiter_lock);
    472         if (b == bw->container.load(butil::memory_order_relaxed)) {
    473             bw->RemoveFromList();
    474             bw->container.store(NULL, butil::memory_order_relaxed);
    475             if (bw->tid) {
    476                 //ButexBthreadWaiter code...
    477             }
    478             erased = true;
    479             break;
    480         }
    481     }
    482     if (erased && wakeup) {
    483         if (bw->tid) {
    484             //ButexBthreadWaiter code...
    486         } else {
    487             ButexPthreadWaiter* pw = static_cast<ButexPthreadWaiter*>(bw);
    488             wakeup_pthread(pw);
    489         }
    490     }
    491     errno = saved_errno;
    492     return erased;
    493 }
    

    如文章开始,使用编译选项来决定使用哪一种mutex,而如宏定义BTHREAD_USE_FAST_PTHREAD_MUTEX,此时第一种可能更快一些。

    662 int FastPthreadMutex::lock_contended() {
    663     butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)&_futex;
    664     while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
    665         if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0
    666             && errno != EWOULDBLOCK) {
    667             return errno;
    668         }
    669     }
    670     return 0;
    671 }
    672 
    673 void FastPthreadMutex::lock() {
    674     bthread::MutexInternal* split = (bthread::MutexInternal*)&_futex;
    675     if (split->locked.exchange(1, butil::memory_order_acquire)) {
    676         (void)lock_contended();
    677     }
    678 }
    
    685 void FastPthreadMutex::unlock() {
    686     butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)&_futex;
    687     const unsigned prev = whole->exchange(0, butil::memory_order_release);
    688     // CAUTION: the mutex may be destroyed, check comments before butex_create
    689     if (prev != BTHREAD_MUTEX_LOCKED) {
    690         futex_wake_private(whole, 1);//尝试唤醒
    691     }   
    692 }
    

    _futex共有三种可能的值:0表示未加锁,1表示 BTHREAD_MUTEX_LOCKED表示正常加锁,{1,1,0}表示BTHREAD_MUTEX_CONTENDED表示在加锁失败时后来加锁成功;用以区分后面unlock时是否要进行唤醒。

    总结,感觉有些复杂,有一些疑问,会在后面分析其他部分代码时再回过头来想想为什么;还没体会到这种实现的原因,还是要多结合项目经历,同时也要对比着linux's mutex的实现源码分析。

    要学(掌握)的基础知识太多太多,brpc值得学习的东西也太多太多。

    参考资料:
    atomic_instructions.md
    brpc的研发经历

    相关文章

      网友评论

          本文标题:brpc之mutex源码分析

          本文链接:https://www.haomeiwen.com/subject/gheedctx.html