这几篇主要是分析bthread是什么,怎么用,和实现原理,这里先使用官方链接中的参考来说明。
协程和bthread区别:协程指N:1线程库,bthread是一个M:N线程库;参考之前的分析情况,一个线程中,可能运行一组协程,且不会被挪到其他线程中执行;而此时的bthread就不一样,通过work stealing
,可由其他线程偷bthread来执行,一个bthread被卡住不会影响其他bthread,而协程卡住则会影响同一个线程中的其他协程执行,即没有机会主动yield
。
若bthread因bthread API而阻塞,它会把当前pthread worker让给其他bthread。若bthread因pthread API或系统函数而阻塞,当前pthread worker上待运行的bthread会被其他空闲的pthread worker偷过去运行。个人感觉,协程也是可以实现类似的功能,即协程不再与相关的线程绑定。
在开始分析bthread前,先介绍下WorkStealingQueue
,后面再稍微分析下mutex
中对于bthread时的特殊处理。WorkStealingQueue
类似一种循环队列,如这里DPDK中无锁环形队列实现类似,使用内存屏障指令和CAS指令实现多生产者和多消费者的队列。不过在brpc中的实现,虽然代码实现不多,里面有一些C++11的同步指令,我看的有点晕,效果就是在正确的前提下,性能没问题。最后再回到bthread和coroutine的区别,这里不再说明线程。
bthread源码在bthread.h/cpp中,和线程及协程一样,有锁/条件变量/线程私有存储等,虽然在之前的分析中并未涉及到协程级的锁和条件变量及私有变量,可以参考源码自己分析下,不是很难。万事开头难,本应该先从相关example分析比较简单,但这里先从基础代码分析。
下面是key的实现:
typedef unsigned int pthread_key_t;
50 // Key of thread-local data, created by bthread_key_create.
51 typedef struct {
52 uint32_t index; // index in KeyTable
53 uint32_t version; // ABA avoidance
54 } bthread_key_t;
408 int bthread_key_create(bthread_key_t* key, void (*dtor)(void*)) {
409 if (dtor == NULL) {
410 return bthread_key_create2(key, NULL, NULL);
411 } else {
412 return bthread_key_create2(key, bthread::arg_as_dtor, (const void*)dtor);
413 }
414 }
383 int bthread_key_create2(bthread_key_t* key,
384 void (*dtor)(void*, const void*),
385 const void* dtor_args) {
386 uint32_t index = 0;
387 {
388 BAIDU_SCOPED_LOCK(bthread::s_key_mutex);//lock
389 if (bthread::nfreekey > 0) {
390 index = bthread::s_free_keys[--bthread::nfreekey];
391 } else if (bthread::nkey < bthread::KEYS_MAX) {
392 index = bthread::nkey++;
393 } else {
394 return EAGAIN; // what pthread_key_create returns in this case.
395 }
396 }
397 bthread::s_key_info[index].dtor = dtor;
398 bthread::s_key_info[index].dtor_args = dtor_args;
399 key->index = index;
400 key->version = bthread::s_key_info[index].version;
401 if (key->version == 0) {
402 ++bthread::s_key_info[index].version;
403 ++key->version;
404 }
405 return 0;
406 }
416 int bthread_key_delete(bthread_key_t key) {
417 if (key.index < bthread::KEYS_MAX &&
418 key.version == bthread::s_key_info[key.index].version) {
419 BAIDU_SCOPED_LOCK(bthread::s_key_mutex);
420 if (key.version == bthread::s_key_info[key.index].version) {
421 if (++bthread::s_key_info[key.index].version == 0) {
422 ++bthread::s_key_info[key.index].version;//version为1
423 }
424 bthread::s_key_info[key.index].dtor = NULL;
425 bthread::s_key_info[key.index].dtor_args = NULL;
426 bthread::s_free_keys[bthread::nfreekey++] = key.index;
427 return 0;
428 }
429 }
431 return EINVAL;
432 }
58 // destructors/version of TLS.
59 struct KeyInfo {
60 uint32_t version;
61 void (*dtor)(void*, const void*);
62 const void* dtor_args;
63 };
64 static KeyInfo s_key_info[KEYS_MAX] = {};
65
66 // For allocating keys.
67 static pthread_mutex_t s_key_mutex = PTHREAD_MUTEX_INITIALIZER;
68 static size_t nfreekey = 0;
69 static size_t nkey = 0;
70 static uint32_t s_free_keys[KEYS_MAX]; //992
249 static void arg_as_dtor(void* data, const void* arg) {
250 typedef void (*KeyDtor)(void*);
251 return ((KeyDtor)arg)(data);
252 }
以上申请key的时候,从全局free列表中获取可用的index,一开始是为0,所以直接++,释放的时候把index存入s_free_keys
中,在有可用的index情况下,从最后开始取。
key的设置和获取,其中的TaskGroup
后面再介绍:
434 // NOTE: Can't borrow_keytable in bthread_setspecific, otherwise following
435 // memory leak may occur:
436 // -> bthread_getspecific fails to borrow_keytable and returns NULL.
437 // -> bthread_setspecific succeeds to borrow_keytable and overwrites old data
438 // at the position with newly created data, the old data is leaked.
439 int bthread_setspecific(bthread_key_t key, void* data) {
440 bthread::KeyTable* kt = bthread::tls_bls.keytable;
441 if (NULL == kt) {
442 kt = new (std::nothrow) bthread::KeyTable;
443 if (NULL == kt) {
444 return ENOMEM;
445 }
446 bthread::tls_bls.keytable = kt;
447 bthread::TaskGroup* const g = bthread::tls_task_group;
448 if (g) {
449 g->current_task()->local_storage.keytable = kt;
450 }
451 if (!bthread::tls_ever_created_keytable) {
452 bthread::tls_ever_created_keytable = true;
453 CHECK_EQ(0, butil::thread_atexit(bthread::cleanup_pthread, kt));//结束时销毁函数
454 }
455 }
456 return kt->set_data(key, data);
457 }
459 void* bthread_getspecific(bthread_key_t key) {
460 bthread::KeyTable* kt = bthread::tls_bls.keytable;
461 if (kt) {
462 return kt->get_data(key);
463 }
464 bthread::TaskGroup* const g = bthread::tls_task_group;
465 if (g) {
466 bthread::TaskMeta* const task = g->current_task();
467 kt = bthread::borrow_keytable(task->attr.keytable_pool);
468 if (kt) {
469 g->current_task()->local_storage.keytable = kt;
470 bthread::tls_bls.keytable = kt;
471 return kt->get_data(key);
472 }
473 }
474 return NULL;
475 }
208 static KeyTable* borrow_keytable(bthread_keytable_pool_t* pool) {
209 if (pool != NULL && pool->free_keytables) {
210 BAIDU_SCOPED_LOCK(pool->mutex);
211 KeyTable* p = (KeyTable*)pool->free_keytables;
212 if (p) {
213 pool->free_keytables = p->next;
214 return p;
215 }
216 }
217 return NULL;
218 }
如上面的注释,Can't borrow_keytable in bthread_setspecific,防止出现memory leak。
因为没有在源码中找到框架中使用的地方,在test/example中有使用,且这里说明一些注意点thread-local问题:
调用阻塞的bthread函数后,所在的pthread很可能改变,这使pthread_getspecific,gcc __thread和c++11 thread_local变量,pthread_self()等的值变化了,如下代码的行为是不可预计的:
thread_local SomeObject obj;
SomeObject* p = &obj;
p->bar();
bthread_usleep(1000);
p->bar();
bthread_usleep之后,该bthread很可能身处不同的pthread,这时p指向了之前pthread的thread_local变量,继续访问p的结果无法预计。这种使用模式往往发生在用户使用线程级变量传递业务变量的情况。为了防止这种情况,应该谨记:
- 不使用线程级变量传递业务数据。这是一种槽糕的设计模式,依赖线程级数据的函数也难以单测。判断是否滥用:如果不使用线程级变量,业务逻辑是否还能正常运行?线程级变量应只用作优化手段,使用过程中不应直接或间接调用任何可能阻塞的bthread函数。比如使用线程级变量的tcmalloc就不会和bthread有任何冲突。
- 如果一定要(在业务中)使用线程级变量,使用bthread_key_create和bthread_getspecific。
这里使用session_data_and_thread_local测试中的部分来说明:
28 class DataFactory {
29 public:
30 virtual ~DataFactory() {}
31
32 // Implement this method to create a piece of data.
33 // Notice that this method is const.
34 // Returns the data, NULL on error.
35 virtual void* CreateData() const = 0;
36
37 // Implement this method to destroy a piece of data that was created
38 // by Create().
39 // Notice that this method is const.
40 virtual void DestroyData(void*) const = 0;
41 };
创建数据块的抽象接口类,测试类:
71 class MyThreadLocalDataFactory : public brpc::DataFactory {
72 public:
73 void* CreateData() const {
74 return new MyThreadLocalData;
75 }
76
77 void DestroyData(void* d) const {
78 MyThreadLocalData::deleter(d);
79 }
80 };
57 struct MyThreadLocalData {
58 MyThreadLocalData() : y(0) {
60 }
61 ~MyThreadLocalData() {
63 }
64 static void deleter(void* d) {
66 }
68 int y;
69 };
352 struct ThreadLocalOptions {
353 bthread_key_t tls_key;
354 const DataFactory* thread_local_data_factory;
355
356 ThreadLocalOptions()
357 : tls_key(INVALID_BTHREAD_KEY)
358 , thread_local_data_factory(NULL) {}
359 };
790 _tl_options.thread_local_data_factory = _options.thread_local_data_factory;
791 if (bthread_key_create2(&_tl_options.tls_key, DestroyServerTLS,
792 _options.thread_local_data_factory) != 0) {
793 LOG(ERROR) << "Fail to create thread-local key";
794 return -1;
795 }
在server收到client请求时会进行,设置ThreadLocalOptions
:
364 if (server->thread_local_options().thread_local_data_factory) {
365 bthread_assign_data((void*)&server->thread_local_options());
366 }
测试代码:
138 MyThreadLocalData* tls =
139 static_cast<MyThreadLocalData*>(brpc::thread_local_data());
140 if (tls == NULL) {
141 cntl->SetFailed("Require ServerOptions.thread_local_data_factory "
142 "to be set with a correctly implemented instance");
143 LOG(ERROR) << cntl->ErrorText();
144 return;
145 }
1683 void* thread_local_data() {
1684 const Server::ThreadLocalOptions* tl_options =
1685 static_cast<const Server::ThreadLocalOptions*>(bthread_get_assigned_data());
1686 if (tl_options == NULL) { // not in server threads.
1687 return NULL;
1688 }
1689 if (BAIDU_UNLIKELY(tl_options->thread_local_data_factory == NULL)) {
1691 return NULL;
1692 }
1693 void* data = bthread_getspecific(tl_options->tls_key);
1694 if (data == NULL) {
1695 data = tl_options->thread_local_data_factory->CreateData();
1696 if (data != NULL) {
1697 CHECK_EQ(0, bthread_setspecific(tl_options->tls_key, data));
1698 }
1699 }
1700 return data;
1701 }
以上根据bthread所在的线程,通过给bthread分配bthread_key来索引tls局部存储数据,当bthread切换到其他线程时,也不会出现上面失效的情况。
会在切换bthread时重置:
589 if (__builtin_expect(next_meta != cur_meta, 1)) {
590 g->_cur_meta = next_meta;
591 // Switch tls_bls
595 cur_meta->local_storage = tls_bls;
596 tls_bls = next_meta->local_storage;
下面是KeyTable
的类声明:
138 class BAIDU_CACHELINE_ALIGNMENT KeyTable {
139 public:
140 KeyTable() : next(NULL) {
141 memset(_subs, 0, sizeof(_subs));
142 nkeytable.fetch_add(1, butil::memory_order_relaxed);
143 }
144
145 ~KeyTable() {
146 nkeytable.fetch_sub(1, butil::memory_order_relaxed);
147 for (int ntry = 0; ntry < PTHREAD_DESTRUCTOR_ITERATIONS; ++ntry) {
148 for (uint32_t i = 0; i < KEY_1STLEVEL_SIZE; ++i) {
149 if (_subs[i]) {
150 _subs[i]->clear(i * KEY_2NDLEVEL_SIZE);
151 }
152 }
153 bool all_cleared = true;
154 for (uint32_t i = 0; i < KEY_1STLEVEL_SIZE; ++i) {
155 if (_subs[i] != NULL && !_subs[i]->cleared()) {
156 all_cleared = false;
157 break;
158 }
159 }
160 if (all_cleared) {
161 for (uint32_t i = 0; i < KEY_1STLEVEL_SIZE; ++i) {
162 delete _subs[i];
163 }
164 return;
165 }
166 }
168 }
170 inline void* get_data(bthread_key_t key) const {
171 const uint32_t subidx = key.index / KEY_2NDLEVEL_SIZE;
172 if (subidx < KEY_1STLEVEL_SIZE) {
173 const SubKeyTable* sub_kt = _subs[subidx];
174 if (sub_kt) {
175 return sub_kt->get_data(
176 key.index - subidx * KEY_2NDLEVEL_SIZE, key.version);
177 }
178 }
179 return NULL;
180 }
181
182 inline int set_data(bthread_key_t key, void* data) {
183 const uint32_t subidx = key.index / KEY_2NDLEVEL_SIZE;//32
184 if (subidx < KEY_1STLEVEL_SIZE &&
185 key.version == s_key_info[key.index].version) {
186 SubKeyTable* sub_kt = _subs[subidx];
187 if (sub_kt == NULL) {
188 sub_kt = new (std::nothrow) SubKeyTable;
189 if (NULL == sub_kt) {
190 return ENOMEM;
191 }
192 _subs[subidx] = sub_kt;
193 }
194 sub_kt->set_data(key.index - subidx * KEY_2NDLEVEL_SIZE,
195 key.version, data);
196 return 0;
197 }
199 return EINVAL;
200 }
201
202 public:
203 KeyTable* next;
204 private:
205 SubKeyTable* _subs[KEY_1STLEVEL_SIZE];//31
206 };
其中bthread_key_t
会根据index索引到SubKeyTable
某个位置,并对version进行校验,并set到subkeytable中的某个偏移处key.index - subidx * KEY_2NDLEVEL_SIZE
。
78 class BAIDU_CACHELINE_ALIGNMENT SubKeyTable {
79 public:
80 SubKeyTable() {
81 memset(_data, 0, sizeof(_data));
82 nsubkeytable.fetch_add(1, butil::memory_order_relaxed);
83 }
84
85 // NOTE: Call clear first.
86 ~SubKeyTable() {
87 nsubkeytable.fetch_sub(1, butil::memory_order_relaxed);
88 }
89
90 void clear(uint32_t offset) {
91 for (uint32_t i = 0; i < KEY_2NDLEVEL_SIZE; ++i) {
92 void* p = _data[i].ptr;
93 if (p) {
94 // Set the position to NULL before calling dtor which may set
95 // the position again.
96 _data[i].ptr = NULL;
97
98 KeyInfo info = bthread::s_key_info[offset + i];
99 if (info.dtor && _data[i].version == info.version) {
100 info.dtor(p, info.dtor_args);
101 }
102 }
103 }
104 }
106 bool cleared() const {
107 // We need to iterate again to check if every slot is empty. An
108 // alternative is remember if set_data() was called during clear.
109 for (uint32_t i = 0; i < KEY_2NDLEVEL_SIZE; ++i) {
110 if (_data[i].ptr) {
111 return false;
112 }
113 }
114 return true;
115 }
116
117 inline void* get_data(uint32_t index, uint32_t version) const {
118 if (_data[index].version == version) {
119 return _data[index].ptr;
120 }
121 return NULL;
122 }
123 inline void set_data(uint32_t index, uint32_t version, void* data) {
124 _data[index].version = version;
125 _data[index].ptr = data;
126 }
127
128 private:
129 struct Data {
130 uint32_t version;
131 void* ptr;
132 };
133 Data _data[KEY_2NDLEVEL_SIZE];
134 };
关于bthread的bthread_mutex_t
已经在之前分析过,这里不再分析。
下面是条件变量bthread_cond_t
的分析,复用之前的Butex
:
116 struct BAIDU_CACHELINE_ALIGNMENT Butex {
117 Butex() {}
118 ~Butex() {}
119
120 butil::atomic<int> value;
121 ButexWaiterList waiters;
122 internal::FastPthreadMutex waiter_lock;
123 };
155 typedef struct {
156 int64_t duration_ns;
157 size_t sampling_range;
158 } bthread_contention_site_t;
160 typedef struct {
161 unsigned* butex;
162 bthread_contention_site_t csite;
163 } bthread_mutex_t;
164
168 typedef struct {
169 bthread_mutex_t* m;
170 int* seq;
171 } bthread_cond_t;
初始化:
48 int bthread_cond_init(bthread_cond_t* __restrict c,
49 const bthread_condattr_t*) {
50 c->m = NULL;
51 c->seq = bthread::butex_create_checked<int>();
52 *c->seq = 0;
53 return 0;
54 }
55
56 int bthread_cond_destroy(bthread_cond_t* c) {
57 bthread::butex_destroy(c->seq);
58 c->seq = NULL;
59 return 0;
60 }
29 struct CondInternal {
30 butil::atomic<bthread_mutex_t*> m;
31 butil::atomic<int>* seq;
32 };
87 int bthread_cond_wait(bthread_cond_t* __restrict c,
88 bthread_mutex_t* __restrict m) {
89 bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
90 const int expected_seq = ic->seq->load(butil::memory_order_relaxed);
91 if (ic->m.load(butil::memory_order_relaxed) != m) {
92 // bind m to c
93 bthread_mutex_t* expected_m = NULL;
94 if (!ic->m.compare_exchange_strong(
95 expected_m, m, butil::memory_order_relaxed)) {
96 return EINVAL;
97 }
98 }
99 bthread_mutex_unlock(m);
100 int rc1 = 0;
101 if (bthread::butex_wait(ic->seq, expected_seq, NULL) < 0 &&
102 errno != EWOULDBLOCK && errno != EINTR/*note*/) {
103 // EINTR should not be returned by cond_*wait according to docs on
104 // pthread, however spurious wake-up is OK, just as we do here
105 // so that users can check flags in the loop often companioning
106 // with the cond_wait ASAP. For example:
107 // mutex.lock();
108 // while (!stop && other-predicates) {
109 // cond_wait(&mutex);
110 // }
111 // mutex.unlock();
112 // After interruption, above code should wake up from the cond_wait
113 // soon and check the `stop' flag and other predicates.
114 rc1 = errno;
115 }
116 const int rc2 = bthread_mutex_lock_contended(m);
117 return (rc2 ? rc2 : rc1);
118 }
62 int bthread_cond_signal(bthread_cond_t* c) {
63 bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
64 // ic is probably dereferenced after fetch_add, save required fields before
65 // this point
66 butil::atomic<int>* const saved_seq = ic->seq;
67 saved_seq->fetch_add(1, butil::memory_order_release);
68 // don't touch ic any more
69 bthread::butex_wake(saved_seq);
70 return 0;
71 }
butex_wait
回来后还要再上锁bthread_mutex_lock_contended
,这里说明一下pthread中的使用及bthread中test相关的bthread_cond_t
例子:
376 int Signal() {
377 int ret = 0;
378 bthread_mutex_lock(&_mutex);
379 _count --;
380 bthread_cond_signal(&_cond);
381 bthread_mutex_unlock(&_mutex);
382 return ret;
383 }
384
385 int Wait() {
386 int ret = 0;
387 bthread_mutex_lock(&_mutex);//lock
388 while (_count > 0) {
389 ret = bthread_cond_wait(&_cond, &_mutex);//wait
390 }
391 bthread_mutex_unlock(&_mutex);//unlock
392 return ret;
393 }
394 private:
395 int _count;
396 bthread_cond_t _cond;
397 bthread_mutex_t _mutex;
在bthread_cond_wait
调用时,bthread_cond_t
是需要与一个bthread_mutex_t
绑定,先取出seq
值再unlock m
,接着butex_wait
,这里的列出ButexBthreadWaiter
大概实现brpc之mutex源码分析:
97 struct ButexBthreadWaiter : public ButexWaiter {
98 TaskMeta* task_meta;
99 TimerThread::TaskId sleep_id;
100 WaiterState waiter_state;
101 int expected_value;
102 Butex* initial_butex;
103 TaskControl* control;
104 };
611 int butex_wait(void* arg, int expected_value, const timespec* abstime) {
612 Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
613 if (b->value.load(butil::memory_order_relaxed) != expected_value) {
614 errno = EWOULDBLOCK;
615 // Sometimes we may take actions immediately after unmatched butex,
616 // this fence makes sure that we see changes before changing butex.
617 butil::atomic_thread_fence(butil::memory_order_acquire);
618 return -1;
619 }
620 TaskGroup* g = tls_task_group;
621 if (NULL == g || g->is_current_pthread_task()) {
622 return butex_wait_from_pthread(g, b, expected_value, abstime);
623 }
624 ButexBthreadWaiter bbw;
625 // tid is 0 iff the thread is non-bthread
626 bbw.tid = g->current_tid();
627 bbw.container.store(NULL, butil::memory_order_relaxed);
628 bbw.task_meta = g->current_task();
629 bbw.sleep_id = 0;
630 bbw.waiter_state = WAITER_STATE_READY;
631 bbw.expected_value = expected_value;
632 bbw.initial_butex = b;
633 bbw.control = g->control();
635 if (abstime != NULL) {
636 // Schedule timer before queueing. If the timer is triggered before
637 // queueing, cancel queueing. This is a kind of optimistic locking.
638 if (butil::timespec_to_microseconds(*abstime) <
639 (butil::gettimeofday_us() + MIN_SLEEP_US)) {
640 // Already timed out.
641 errno = ETIMEDOUT;
642 return -1;
643 }
644 bbw.sleep_id = get_global_timer_thread()->schedule(
645 erase_from_butex_and_wakeup, &bbw, *abstime);
646 if (!bbw.sleep_id) { // TimerThread stopped.
647 errno = ESTOP;
648 return -1;
649 }
650 }
656 // release fence matches with acquire fence in interrupt_and_consume_waiters
657 // in task_group.cpp to guarantee visibility of `interrupted'.
658 bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
659 g->set_remained(wait_for_butex, &bbw);
660 TaskGroup::sched(&g);
661
662 // erase_from_butex_and_wakeup (called by TimerThread) is possibly still
663 // running and using bbw. The chance is small, just spin until it's done.
664 BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0,
665 30/*nops before sched_yield*/);
666
667 // If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw.
668 // Spin until current_waiter != NULL.
669 BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange(
670 NULL, butil::memory_order_acquire) == NULL,
671 30/*nops before sched_yield*/);
676 bool is_interrupted = false;//return value, more code...
对于bthread等待的实现,这里需要把当前跑的bthread上下文保存起来,然后挂到某个条件变量上去。task_group
中的set_remained
作用会在后面再详细分析。
495 static void wait_for_butex(void* arg) {
496 ButexBthreadWaiter* const bw = static_cast<ButexBthreadWaiter*>(arg);
497 Butex* const b = bw->initial_butex;
512 {
513 BAIDU_SCOPED_LOCK(b->waiter_lock);
514 if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) {
515 bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
516 } else if (bw->waiter_state == WAITER_STATE_READY/*1*/ &&
517 !bw->task_meta->interrupted) {
518 b->waiters.Append(bw);
519 bw->container.store(b, butil::memory_order_relaxed);
520 return;
521 }
522 }
528 unsleep_if_necessary(bw, get_global_timer_thread());
529 tls_task_group->ready_to_run(bw->tid);
540 }
wait
的bthread即将执行。
453 static void erase_from_butex_and_wakeup(void* arg) {
454 erase_from_butex(static_cast<ButexWaiter*>(arg), true, WAITER_STATE_TIMEDOUT);
455 }
462 inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) {
463 // `bw' is guaranteed to be valid inside this function because waiter
464 // will wait until this function being cancelled or finished.
465 // NOTE: This function must be no-op when bw->container is NULL.
466 bool erased = false;
467 Butex* b;
468 int saved_errno = errno;
469 while ((b = bw->container.load(butil::memory_order_acquire))) {
470 // b can be NULL when the waiter is scheduled but queued.
471 BAIDU_SCOPED_LOCK(b->waiter_lock);
472 if (b == bw->container.load(butil::memory_order_relaxed)) {
473 bw->RemoveFromList();
474 bw->container.store(NULL, butil::memory_order_relaxed);
475 if (bw->tid) {
476 static_cast<ButexBthreadWaiter*>(bw)->waiter_state = state;
477 }
478 erased = true;
479 break;
480 }
481 }
482 if (erased && wakeup) {
483 if (bw->tid) {//bthread
484 ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(bw);
485 get_task_group(bbw->control)->ready_to_run_general(bw->tid);
486 } else {//pthread
487 ButexPthreadWaiter* pw = static_cast<ButexPthreadWaiter*>(bw);
488 wakeup_pthread(pw);
489 }
490 }
491 errno = saved_errno;
492 return erased;
493 }
以上butex_wait
可能会把当前bthread切出去,在sched
中处理,后面再分析。如果有等待时间的话,未超时则加入定时器否则直接返回,后面超时后会自己唤醒,如erase_from_butex
实现,并在后面进行调度ready_to_run_general
。
如果直接返回EWOULDBLOCK
此时重新对m
加锁,整个wait的实现思路差不多与pthread_cond_wait
相同。
265 int butex_wake(void* arg) {
266 Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
267 ButexWaiter* front = NULL;
268 {
269 BAIDU_SCOPED_LOCK(b->waiter_lock);
270 if (b->waiters.empty()) {
271 return 0;
272 }
273 front = b->waiters.head()->value();
274 front->RemoveFromList();
275 front->container.store(NULL, butil::memory_order_relaxed);
276 }
277 if (front->tid == 0) {//pthread
278 wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
279 return 1;
280 }
281 ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
282 unsleep_if_necessary(bbw, get_global_timer_thread());
283 TaskGroup* g = tls_task_group;
284 if (g) {
285 TaskGroup::exchange(&g, bbw->tid);
286 } else {
287 bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
288 }
289 return 1;
290 }
唤醒时,可能要把bthread(front->tid
)从定时器中删除,并把自己从队列中移走,并进行调度exchange
。
这里列出了man pthread中的cond条件变量的wait/signal实现伪代码供参考:
[pthread_cond_wait(mutex, cond)]
value = cond->value;
pthread_mutex_unlock(mutex);
pthread_mutex_lock(cond->mutex);
if (value == cond->value) {
me->next_cond = cond->waiter;
cond->waiter = me;
pthread_mutex_unlock(cond->mutex);
unable_to_run(me);
} else
pthread_mutex_unlock(cond->mutex);
pthread_mutex_lock(mutex);
[pthread_cond_signal(cond)]
pthread_mutex_lock(cond->mutex);
cond->value++;
if (cond->waiter) {
sleeper = cond->waiter;
cond->waiter = sleeper->next_cond;
able_to_run(sleeper);
}
pthread_mutex_unlock(cond->mutex);
剩下的接口,其中bthread_cond_timedwait
和bthread_cond_wait
一样,只不过多个等待时间:
73 int bthread_cond_broadcast(bthread_cond_t* c) {
74 bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
75 bthread_mutex_t* m = ic->m.load(butil::memory_order_relaxed);
76 butil::atomic<int>* const saved_seq = ic->seq;
77 if (!m) {
78 return 0;
79 }
80 void* const saved_butex = m->butex;
81 // Wakeup one thread and requeue the rest on the mutex.
82 ic->seq->fetch_add(1, butil::memory_order_release);
83 bthread::butex_requeue(saved_seq, saved_butex);
84 return 0;
85 }
120 int bthread_cond_timedwait(bthread_cond_t* __restrict c,
121 bthread_mutex_t* __restrict m,
122 const struct timespec* __restrict abstime) {
123 bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
124 const int expected_seq = ic->seq->load(butil::memory_order_relaxed);
125 if (ic->m.load(butil::memory_order_relaxed) != m) {
126 // bind m to c
127 bthread_mutex_t* expected_m = NULL;
128 if (!ic->m.compare_exchange_strong(
129 expected_m, m, butil::memory_order_relaxed)) {
130 return EINVAL;
131 }
132 }
133 bthread_mutex_unlock(m);
134 int rc1 = 0;
135 if (bthread::butex_wait(ic->seq, expected_seq, abstime) < 0 &&
136 errno != EWOULDBLOCK && errno != EINTR/*note*/) {
137 // note: see comments in bthread_cond_wait on EINTR.
138 rc1 = errno;
139 }
140 const int rc2 = bthread_mutex_lock_contended(m);
141 return (rc2 ? rc2 : rc1);
142 }
412 int butex_requeue(void* arg, void* arg2) {
413 Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);//条件变量的互斥锁
414 Butex* m = container_of(static_cast<butil::atomic<int>*>(arg2), Butex, value);//外层的互斥锁
415
416 ButexWaiter* front = NULL;
417 {
418 std::unique_lock<internal::FastPthreadMutex> lck1(b->waiter_lock, std::defer_lock);
419 std::unique_lock<internal::FastPthreadMutex> lck2(m->waiter_lock, std::defer_lock);
420 butil::double_lock(lck1, lck2);//按地址大小加锁,避免死锁
421 if (b->waiters.empty()) {
422 return 0;//等待队列为空
423 }
424
425 front = b->waiters.head()->value();
426 front->RemoveFromList();//唤醒队头的bthread
427 front->container.store(NULL, butil::memory_order_relaxed);
428
429 while (!b->waiters.empty()) {//bthread重新入mutex队列待下一次唤醒
430 ButexWaiter* bw = b->waiters.head()->value();
431 bw->RemoveFromList();
432 m->waiters.Append(bw);
433 bw->container.store(m, butil::memory_order_relaxed);
434 }
435 }
437 if (front->tid == 0) { // which is a pthread
438 wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
439 return 1;
440 }
441 ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
442 unsleep_if_necessary(bbw, get_global_timer_thread());//删除定时器
443 TaskGroup* g = tls_task_group;
444 if (g) {
445 TaskGroup::exchange(&g, front->tid);//等待调度
446 } else {
447 bbw->control->choose_one_group()->ready_to_run_remote(front->tid);
448 }
449 return 1;
450 }
对于bthread_cond_broadcast
,因为pthread中的实现pthread_cond_broadcast可能有点区别,为了避免惊群以及性能问题(早期版本)。
这里额外引用bthread主要作者gejun在知乎上的回答“如果题主说的"惊群"是这个意思: 对有N个线程等着的condition调用broadcast, 最后所有线程醒来的次数是O(N)而不是O(N^2). 那么现在的broadcast实现应该都不会惊群.常见的broadcast实现是只唤醒等待在condition上的第一个线程, 其他线程都转移到condition使用的那个mutex中去(在linux下是futex requeue操作), 这样第一个线程醒来做完事并unlock那个mutex时, 就会唤醒下一个线程, 以此类推. 其他平台同理这里”。
而在bthread_cond_broadcast
实现中,如注释中“Wakeup one thread and requeue the rest on the mutex.”只唤醒一个而其他的再重新入队列butex_requeue
,相关分析已经注释。
其他一些接口如调度让出cpu执行权限,sleep(非真的挂起),增加工作线程接口:
367 int bthread_yield(void) {
368 bthread::TaskGroup* g = bthread::tls_task_group;
369 if (NULL != g && !g->is_current_pthread_task()) {
370 bthread::TaskGroup::yield(&g);////bthread让出cpu让其他bthread执行,并非把线程切走
371 return 0;
372 }
373 // pthread_yield is not available on MAC
374 return sched_yield();//相当于该线程让出cpu
375 }
886 void TaskGroup::yield(TaskGroup** pg) {
887 TaskGroup* g = *pg;
888 ReadyToRunArgs args = { g->current_tid(), false };
889 g->set_remained(ready_to_run_in_worker, &args);
890 sched(pg); //后面再分析
891 }
359 int bthread_usleep(uint64_t microseconds) {
360 bthread::TaskGroup* g = bthread::tls_task_group;
361 if (NULL != g && !g->is_current_pthread_task()) {
362 return bthread::TaskGroup::usleep(&g, microseconds);//bthread让出cpu让其他bthread执行,并非把线程切走
363 }
364 return ::usleep(microseconds);//该线程sleep去
365 }
781 // To be consistent with sys_usleep, set errno and return -1 on error.
782 int TaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us) {
783 if (0 == timeout_us) {
784 yield(pg);
785 return 0;
786 }
787 TaskGroup* g = *pg;
788 // We have to schedule timer after we switched to next bthread otherwise
789 // the timer may wake up(jump to) current still-running context.
790 SleepArgs e = { timeout_us, g->current_tid(), g->current_task(), g };
791 g->set_remained(_add_sleep_event, &e);
792 sched(pg);
793 g = *pg;
794 e.meta->current_sleep = 0;
795 if (e.meta->interrupted) {
796 // Race with set and may consume multiple interruptions, which are OK.
797 e.meta->interrupted = false;
803 errno = (e.meta->stop ? ESTOP : EINTR);
804 return -1;
805 }
806 return 0;
807 }
268 int bthread_setconcurrency(int num) {
269 //more code...
308 if (num > bthread::FLAGS_bthread_concurrency) {
309 // Create more workers if needed.
310 bthread::FLAGS_bthread_concurrency +=
311 c->add_workers(num - bthread::FLAGS_bthread_concurrency);//增加工作线程,后面分析
312 return 0;
313 }
314 return (num == bthread::FLAGS_bthread_concurrency ? 0 : EPERM);
315 }
247 int bthread_join(bthread_t tid, void** thread_return) {
248 return bthread::TaskGroup::join(tid, thread_return);
249 }
467 int TaskGroup::join(bthread_t tid, void** return_value) {
468 if (__builtin_expect(!tid, 0)) { // tid of bthread is never 0.
469 return EINVAL;
470 }
471 TaskMeta* m = address_meta(tid);
472 if (__builtin_expect(!m, 0)) {
473 // The bthread is not created yet, this join is definitely wrong.
474 return EINVAL;
475 }
476 TaskGroup* g = tls_task_group;
477 if (g != NULL && g->current_tid() == tid) {
478 // joining self causes indefinite waiting.
479 return EINVAL;
480 }
481 const uint32_t expected_version = get_version(tid);
482 while (*m->version_butex == expected_version) {//wait
483 if (butex_wait(m->version_butex, expected_version, NULL) < 0 &&
484 errno != EWOULDBLOCK && errno != EINTR) {
485 return errno;
486 }
487 }
488 if (return_value) {
489 *return_value = NULL;
490 }
491 return 0;
492 }
因为创建bthread时的idmake_tid(*m->version_butex, slot);
,执行完后更新version_butex
并wake up
,部分代码:
327 // Increase the version and wake up all joiners, if resulting version
328 // is 0, change it to 1 to make bthread_t never be 0. Any access
329 // or join to the bthread after changing version will be rejected.
330 // The spinlock is for visibility of TaskGroup::get_attr.
331 {
332 BAIDU_SCOPED_LOCK(m->version_lock);
333 if (0 == ++*m->version_butex) {
334 ++*m->version_butex;
335 }
336 }
337 butex_wake_except(m->version_butex, 0);
350 int butex_wake_except(void* arg, bthread_t excluded_bthread) {
351 Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
352
353 ButexWaiterList bthread_waiters;
354 ButexWaiterList pthread_waiters;
355 {
356 ButexWaiter* excluded_waiter = NULL;
357 BAIDU_SCOPED_LOCK(b->waiter_lock);
358 while (!b->waiters.empty()) {
359 ButexWaiter* bw = b->waiters.head()->value();
360 bw->RemoveFromList();
361
362 if (bw->tid) {
363 if (bw->tid != excluded_bthread) {
364 bthread_waiters.Append(bw);
365 bw->container.store(NULL, butil::memory_order_relaxed);
366 } else {
367 excluded_waiter = bw;
368 }
369 } else {
370 bw->container.store(NULL, butil::memory_order_relaxed);
371 pthread_waiters.Append(bw);
372 }
373 }
374
375 if (excluded_waiter) {
376 b->waiters.Append(excluded_waiter);//excluded_waiter再次wait
377 }
378 }
380 int nwakeup = 0;
381 while (!pthread_waiters.empty()) {
382 ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>(
383 pthread_waiters.head()->value());
384 bw->RemoveFromList();//不是很明白这里再次RemoveFromList
385 wakeup_pthread(bw);//唤醒pthread
386 ++nwakeup;
387 }
388
389 if (bthread_waiters.empty()) {
390 return nwakeup;
391 }
392 ButexBthreadWaiter* front = static_cast<ButexBthreadWaiter*>(
393 bthread_waiters.head()->value());
394
395 TaskGroup* g = get_task_group(front->control);
396 const int saved_nwakeup = nwakeup;
397 do {
398 // pop reversely
399 ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(
400 bthread_waiters.tail()->value());
401 w->RemoveFromList();
402 unsleep_if_necessary(w, get_global_timer_thread());//删除定时器
403 g->ready_to_run_general(w->tid, true);//唤醒等待调度
404 ++nwakeup;
405 } while (!bthread_waiters.empty());
406 if (saved_nwakeup != nwakeup) {
407 g->flush_nosignal_tasks_general();
408 }
409 return nwakeup;
410 }
对于创建bthread的两个bthread_start_urgent/bthread_start_background
实现在后面再分析,其他的一些关于bthread的在后面的相关代码中注释下,不是难明白。
以上只是一些基础代码分析,有些重要的类声明并未分析,比如TaskMeta/TaskControl/TaskGroup
等,包括bthread切入和切出实现,以及一些思考会在后续分析。
今天去上海看看,好久没跑这么远了。
参考资料:
bthread.md
threading_overview.md
butex_wait超时无法唤醒
linux线程私有数据详解
深入理解pthread_cond_wait、pthread_cond_signal
bthread-local
网友评论