这篇分析的不完整,没有彻底弄明白。在此brpc之前,基本所有使用过或者分析过的mutex只是纯粹的封装下接口,使用类似scope_mutex这种手法(raii)构造时加锁,析构时解锁这样的简单形式。
早期分析过一篇brpc中关于定时器和futex实现brpc中定时器的实现
,其中对于成员变量internal::FastPthreadMutex _mutex
则是简单的跳过未作详细分析,因为都知道mutex用于同步临界区使的此时只有一个工作线程在里面操作,不至于出现数据的不一致性,对于linux版本的mutex的实现,这里有linux 2.6 互斥锁的实现-源码分析,网上大部分都是转来转去,抄来抄去,有兴趣的可以再对着源码分析下mutex,这里主要是分析brpc中的mutex实现。
从声明开始,这里有两种实现,先看第二种,第一种后面再简单介绍下,futex相关的实现参考上面的链接:
71 namespace internal {
72 #ifdef BTHREAD_USE_FAST_PTHREAD_MUTEX
73 class FastPthreadMutex {
74 public:
75 FastPthreadMutex() : _futex(0) {}
76 ~FastPthreadMutex() {}
77 void lock();
78 void unlock();
79 bool try_lock();
80 private:
81 DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex);
82 int lock_contended();
83 unsigned _futex;
84 };
85 #else
86 typedef butil::Mutex FastPthreadMutex;
87 #endif
88 }
26 #if !defined(BUTIL_CXX11_ENABLED)
27 #define BUTIL_DELETE_FUNCTION(decl) decl
28 #else
29 #define BUTIL_DELETE_FUNCTION(decl) decl = delete
30 #endif
42 #define DISALLOW_COPY_AND_ASSIGN(TypeName) \
43 BUTIL_DELETE_FUNCTION(TypeName(const TypeName&)); \
44 BUTIL_DELETE_FUNCTION(void operator=(const TypeName&))
对于mutex是禁止拷贝赋值操作的,由于编译器会在某些情况下(认为它需要的时候)合成构造拷贝等几个特殊成员函数,c++11版本以上使用delete关键字即可,否则只能声明为private不定义,链接时出现error以示错误。
45 // NOTE: Not aligned to cacheline as the container of Mutex is practically aligned
46 class Mutex {
47 public:
48 typedef bthread_mutex_t* native_handler_type;
49 Mutex() {
50 int ec = bthread_mutex_init(&_mutex, NULL);
51 if (ec != 0) {
52 throw std::system_error(std::error_code(ec, std::system_category()), "Mutex construc tor failed");
53 }
54 }
55 ~Mutex() { CHECK_EQ(0, bthread_mutex_destroy(&_mutex)); }
56 native_handler_type native_handler() { return &_mutex; }
57 void lock() {
58 int ec = bthread_mutex_lock(&_mutex);
59 if (ec != 0) {
60 throw std::system_error(std::error_code(ec, std::system_category()), "Mutex lock fai led");
61 }
62 }
63 void unlock() { bthread_mutex_unlock(&_mutex); }
64 bool try_lock() { return !bthread_mutex_trylock(&_mutex); }
66 private:
67 DISALLOW_COPY_AND_ASSIGN(Mutex);
68 bthread_mutex_t _mutex;
69 };
155 typedef struct {
156 int64_t duration_ns;
157 size_t sampling_range;
158 } bthread_contention_site_t;
159
160 typedef struct {
161 unsigned* butex;
162 bthread_contention_site_t csite;
163 } bthread_mutex_t;
以上便是mutex类声明,对象池ObjectPool
实现后期分析一下,初始化如下bthread_mutex_init
:
701 int bthread_mutex_init(bthread_mutex_t* __restrict m,
702 const bthread_mutexattr_t* __restrict) {
703 bthread::make_contention_site_invalid(&m->csite);
704 m->butex = bthread::butex_create_checked<unsigned>();
705 if (!m->butex) {
706 return ENOMEM;
707 }
708 *m->butex = 0; //初始化为0
709 return 0;
710 }
363 BUTIL_FORCE_INLINE void
364 make_contention_site_invalid(bthread_contention_site_t* cs) {
365 cs->sampling_range = 0;
366 }
39 // Check width of user type before casting.
40 template <typename T> T* butex_create_checked() {
41 BAIDU_CASSERT(sizeof(T) == sizeof(int), sizeof_T_must_equal_int);
42 return static_cast<T*>(butex_create());
43 }
243 void* butex_create() {
244 Butex* b = butil::get_object<Butex>();
245 if (b) {
246 return &b->value;
247 }
248 return NULL;
249 }
116 struct BAIDU_CACHELINE_ALIGNMENT Butex {
117 Butex() {}
118 ~Butex() {}
119
120 butil::atomic<int> value;
121 ButexWaiterList waiters;//typedef butil::LinkedList<ButexWaiter> ButexWaiterList;
122 internal::FastPthreadMutex waiter_lock;
123 };
上面的Butex
类声明其实和mutex类差不多:
struct mutex {
atomic_t count; //引用计数器,1: 所可以利用,小于等于0:该锁已被获取,需要等待
spinlock_t wait_lock;//自旋锁类型,保证多cpu下,对等待队列访问是安全的。
struct list_head wait_list;//等待队列,如果该锁被获取,任务将挂在此队列上,等待调度。
};
其中ButexWaiterList
为双向链表,简单起见这里使用ButexPthreadWaiter
:
86 struct ButexWaiter : public butil::LinkNode<ButexWaiter> {
87 // tids of pthreads are 0
88 bthread_t tid;
89
90 // Erasing node from middle of LinkedList is thread-unsafe, we need
91 // to hold its container's lock.
92 butil::atomic<Butex*> container;
93 };
106 // pthread_task or main_task allocates this structure on stack and queue it
107 // in Butex::waiters.
108 struct ButexPthreadWaiter : public ButexWaiter {
109 butil::atomic<int> sig;
110 };
84 template <typename T>
85 class LinkNode {
154 private:
155 LinkNode<T>* previous_;
156 LinkNode<T>* next_;
158 DISALLOW_COPY_AND_ASSIGN(LinkNode);
159 };
729 int bthread_mutex_lock(bthread_mutex_t* m) {
730 bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
731 if (!split->locked.exchange(1, butil::memory_order_acquire)) {
732 return 0;
733 }
734 // Don't sample when contention profiler is off.
735 if (!bthread::g_cp) {
736 return bthread::mutex_lock_contended(m);
737 }
738 //sample code...
753 }
615 // Implement bthread_mutex_t related functions
616 struct MutexInternal {
617 butil::static_atomic<unsigned char> locked;
618 butil::static_atomic<unsigned char> contended;
619 unsigned short padding;
620 };
以上是加锁实现,对unsigned* butex
强转类型struct MutexInternal
。由于初始为0,故第一次加锁是成功的,设置locked
为1并返回为0。代码738到752是统计相关的,这里直接分析mutex_lock_contended
,即当锁不可用时做哪些工作。
622 const MutexInternal MUTEX_CONTENDED_RAW = {{1},{1},0};
623 const MutexInternal MUTEX_LOCKED_RAW = {{1},{0},0};
624 // Define as macros rather than constants which can't be put in read-only
625 // section and affected by initialization-order fiasco.
626 #define BTHREAD_MUTEX_CONTENDED (*(const unsigned*)&bthread::MUTEX_CONTENDED_RAW)
627 #define BTHREAD_MUTEX_LOCKED (*(const unsigned*)&bthread::MUTEX_LOCKED_RAW)
632 inline int mutex_lock_contended(bthread_mutex_t* m) {
633 butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
634 while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
635 if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 &&
636 errno != EWOULDBLOCK && errno != EINTR/*note*/) {
637 // a mutex lock should ignore interrruptions in general since
638 // user code is unlikely to check the return value.
639 return errno;
640 }
641 }
642 return 0;
643 }
以上while
会一直尝试加锁,如果加失败即返回值不为0,则尝试butex_wait
,如果失败的话则提前返回,否则在成功的情况下会返回0。
611 int butex_wait(void* arg, int expected_value, const timespec* abstime) {
612 Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);//根据三个参数求出结构体首地址
613 if (b->value.load(butil::memory_order_relaxed) != expected_value) {
614 errno = EWOULDBLOCK;
615 // Sometimes we may take actions immediately after unmatched butex,
616 // this fence makes sure that we see changes before changing butex.
617 butil::atomic_thread_fence(butil::memory_order_acquire);
618 return -1;
619 }
620 TaskGroup* g = tls_task_group;
621 if (NULL == g || g->is_current_pthread_task()) {
622 return butex_wait_from_pthread(g, b, expected_value, abstime);
623 }
624 //ButexBthreadWaiter bbw code...;
693 return 0;
694 }
这里当尝试获取锁的时候,在wait时,再次load对应的值,如果与BTHREAD_MUTEX_CONTENDED
不相等则表示内容变化则重新加锁,否则进行butex_wait_from_pthread
,不过在真正wait_pthread
前,还会再判断一次。
542 static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
543 const timespec* abstime) {
544 // sys futex needs relative timeout.
545 // Compute diff between abstime and now.
546 timespec* ptimeout = NULL;
547 timespec timeout;
548 if (abstime != NULL) {
549 //more code...
557 }
558
559 TaskMeta* task = NULL;
560 ButexPthreadWaiter pw;
561 pw.tid = 0;
562 pw.sig.store(PTHREAD_NOT_SIGNALLED, butil::memory_order_relaxed);
563 int rc = 0;
564
565 if (g) {
566 //
568 }
569 b->waiter_lock.lock();
570 if (b->value.load(butil::memory_order_relaxed) != expected_value) {
571 b->waiter_lock.unlock();
572 errno = EWOULDBLOCK;
573 rc = -1;
574 } else if (task != NULL && task->interrupted) {
575 //
580 } else {
581 b->waiters.Append(&pw);
582 pw.container.store(b, butil::memory_order_relaxed);
583 b->waiter_lock.unlock();
584
589 rc = wait_pthread(pw, ptimeout);
593 }
594 if (task) {
595 //more code...
607 }
608 return rc;
609 }
这里把自己加到waiters
时再判断一次b->value.load(butil::memory_order_relaxed) != expected_value
,接着:
581 b->waiters.Append(&pw);
582 pw.container.store(b, butil::memory_order_relaxed);
583 b->waiter_lock.unlock();
589 rc = wait_pthread(pw, ptimeout);
140 int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
141 while (true) {
142 const int rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout);
143 if (PTHREAD_NOT_SIGNALLED != pw.sig.load(butil::memory_order_acquire)) {
144 // If `sig' is changed, wakeup_pthread() must be called and `pw'
145 // is already removed from the butex.
146 // Acquire fence makes this thread sees changes before wakeup.
147 return rc;
148 }
149 if (rc != 0 && errno == ETIMEDOUT) {
150 //more code...
165 }
166 }
167 }
下面是bthread_mutex_unlock
实现:
787 int bthread_mutex_unlock(bthread_mutex_t* m) {
788 butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
789 bthread_contention_site_t saved_csite = {0, 0};
790 if (bthread::is_contention_site_valid(m->csite)) {
791 saved_csite = m->csite;
792 bthread::make_contention_site_invalid(&m->csite);
793 }
794 const unsigned prev = whole->exchange(0, butil::memory_order_release);
795 // CAUTION: the mutex may be destroyed, check comments before butex_create
796 if (prev == BTHREAD_MUTEX_LOCKED) {
797 return 0;//就自己不需要唤醒
798 }
799 // Wakeup one waiter
800 if (!bthread::is_contention_site_valid(saved_csite)) {
801 bthread::butex_wake(whole);
802 return 0;
803 }
804 //more code...
809 return 0;
810 }
这里解锁,如果之前只有自己加锁,即没有其他在尝试while中通过BTHREAD_MUTEX_CONTENDED
加锁成功的,表示不需要进行唤醒,否则需要尝试唤醒(因为自己加锁成功的值是BTHREAD_MUTEX_CONTENDED
表示至少有两个线程(bthread))。
265 int butex_wake(void* arg) {
266 Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
267 ButexWaiter* front = NULL;
268 {
269 BAIDU_SCOPED_LOCK(b->waiter_lock);
270 if (b->waiters.empty()) {
271 return 0;
272 }
273 front = b->waiters.head()->value();
274 front->RemoveFromList();
275 front->container.store(NULL, butil::memory_order_relaxed);
276 }
277 if (front->tid == 0) {
278 wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
279 return 1;
280 }
281 //ButexBthreadWaiter code...
290 }
这里从waiters
取出第一个进行唤醒,由于ButexPthreadWaiter
的tid=0
:
128 static void wakeup_pthread(ButexPthreadWaiter* pw) {
129 // release fence makes wait_pthread see changes before wakeup.
130 pw->sig.store(PTHREAD_SIGNALLED, butil::memory_order_release);
131 // At this point, wait_pthread() possibly has woken up and destroyed `pw'.
132 // In which case, futex_wake_private() should return EFAULT.
133 // If crash happens in future, `pw' can be made TLS and never destroyed
134 // to solve the issue.
135 futex_wake_private(&pw->sig, 1);
136 }
另外由于分析过程中并没有考虑到超时的变量,是为了简化起见,但程序中还是要列一下当有超时变量作用时,而wait该时长后超时:
140 int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
141 while (true) {
142 const int rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout);
143 //more code...
149 if (rc != 0 && errno == ETIMEDOUT) {
156 if (!erase_from_butex(&pw, false, WAITER_STATE_TIMEDOUT)) {
159 if (pw.sig.load(butil::memory_order_acquire) == PTHREAD_NOT_SIGNALLED) {
160 ptimeout = NULL; // already timedout, ptimeout is expired.
161 continue;
162 }
163 }
164 return rc;
165 }
166 }
167 }
462 inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) {
463 // `bw' is guaranteed to be valid inside this function because waiter
464 // will wait until this function being cancelled or finished.
465 // NOTE: This function must be no-op when bw->container is NULL.
466 bool erased = false;
467 Butex* b;
468 int saved_errno = errno;
469 while ((b = bw->container.load(butil::memory_order_acquire))) {
470 // b can be NULL when the waiter is scheduled but queued.
471 BAIDU_SCOPED_LOCK(b->waiter_lock);
472 if (b == bw->container.load(butil::memory_order_relaxed)) {
473 bw->RemoveFromList();
474 bw->container.store(NULL, butil::memory_order_relaxed);
475 if (bw->tid) {
476 //ButexBthreadWaiter code...
477 }
478 erased = true;
479 break;
480 }
481 }
482 if (erased && wakeup) {
483 if (bw->tid) {
484 //ButexBthreadWaiter code...
486 } else {
487 ButexPthreadWaiter* pw = static_cast<ButexPthreadWaiter*>(bw);
488 wakeup_pthread(pw);
489 }
490 }
491 errno = saved_errno;
492 return erased;
493 }
如文章开始,使用编译选项来决定使用哪一种mutex,而如宏定义BTHREAD_USE_FAST_PTHREAD_MUTEX
,此时第一种可能更快一些。
662 int FastPthreadMutex::lock_contended() {
663 butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)&_futex;
664 while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
665 if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0
666 && errno != EWOULDBLOCK) {
667 return errno;
668 }
669 }
670 return 0;
671 }
672
673 void FastPthreadMutex::lock() {
674 bthread::MutexInternal* split = (bthread::MutexInternal*)&_futex;
675 if (split->locked.exchange(1, butil::memory_order_acquire)) {
676 (void)lock_contended();
677 }
678 }
685 void FastPthreadMutex::unlock() {
686 butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)&_futex;
687 const unsigned prev = whole->exchange(0, butil::memory_order_release);
688 // CAUTION: the mutex may be destroyed, check comments before butex_create
689 if (prev != BTHREAD_MUTEX_LOCKED) {
690 futex_wake_private(whole, 1);//尝试唤醒
691 }
692 }
_futex
共有三种可能的值:0表示未加锁,1表示 BTHREAD_MUTEX_LOCKED
表示正常加锁,{1,1,0}表示BTHREAD_MUTEX_CONTENDED
表示在加锁失败时后来加锁成功;用以区分后面unlock时是否要进行唤醒。
总结,感觉有些复杂,有一些疑问,会在后面分析其他部分代码时再回过头来想想为什么;还没体会到这种实现的原因,还是要多结合项目经历,同时也要对比着linux's mutex的实现源码分析。
要学(掌握)的基础知识太多太多,brpc值得学习的东西也太多太多。
网友评论