这里准备分析下bthread的创建流程,以及bthread之间的切换,从multi_threaded_echo_c++代码实现分析。
直接贴上部分代码:
87 int main(int argc, char* argv[]) {
88 //more code...
123 std::vector<bthread_t> bids;
124 std::vector<pthread_t> pids;
125 if (!FLAGS_use_bthread) {
126 pids.resize(FLAGS_thread_num);
127 for (int i = 0; i < FLAGS_thread_num; ++i) {
128 if (pthread_create(&pids[i], NULL, sender, &channel) != 0) {
130 return -1;
131 }
132 }
133 } else {
134 bids.resize(FLAGS_thread_num);
135 for (int i = 0; i < FLAGS_thread_num; ++i) {
136 if (bthread_start_background(
137 &bids[i], NULL, sender, &channel) != 0) {
139 return -1;
140 }
141 }
142 }
144 while (!brpc::IsAskedToQuit()) {
145 sleep(1);
148 }
149
151 for (int i = 0; i < FLAGS_thread_num; ++i) {
152 if (!FLAGS_use_bthread) {
153 pthread_join(pids[i], NULL);
154 } else {
155 bthread_join(bids[i], NULL);
156 }
157 }
159 return 0;
160 }
48 static void* sender(void* arg) {
51 example::EchoService_Stub stub(static_cast<google::protobuf::RpcChannel*>(arg));
52 //这里是进行rpc请求操作
84 return NULL;
85 }
以上是根据选项参数是创建pthread还是bthread,这里以后者为说明,最后join bthread_id。
185 int bthread_start_background(bthread_t* __restrict tid,
186 const bthread_attr_t* __restrict attr,
187 void * (*fn)(void*),
188 void* __restrict arg) {
189 bthread::TaskGroup* g = bthread::tls_task_group;
190 if (g) {
192 // start from worker
193 return g->start_background<false>(tid, attr, fn, arg);
194 }
196 return bthread::start_from_non_worker(tid, attr, fn, arg);
197 }
61 __thread TaskGroup* tls_task_group = NULL;
62 // Sync with TaskMeta::local_storage when a bthread is created or destroyed.
63 // During running, the two fields may be inconsistent, use tls_bls as the
64 // groundtruth.
65 __thread LocalStorage tls_bls = BTHREAD_LOCAL_STORAGE_INITIALIZER;
因为主线程启动bthread_start_background
时,tls_task_group
肯定为null的。
124 BUTIL_FORCE_INLINE int
125 start_from_non_worker(bthread_t* __restrict tid,
126 const bthread_attr_t* __restrict attr,
127 void * (*fn)(void*),
128 void* __restrict arg) {
129 TaskControl* c = get_or_new_task_control();
130 if (NULL == c) {
131 return ENOMEM;
132 }
133 if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) {
134 //more code...
144 }
145 return c->choose_one_group()->start_background<true>(
146 tid, attr, fn, arg);
147 }
214 TaskGroup* TaskControl::choose_one_group() {
215 const size_t ngroup = _ngroup.load(butil::memory_order_acquire);
216 if (ngroup != 0) {
217 return _groups[butil::fast_rand_less_than(ngroup)];
218 }
220 return NULL;
221 }
随机选择一个TaskGroup
。
其中启动时会创建一个任务控制对象TaskControl
,管理着若干个TaskGroup
,而后者由每个线程拥有。
74 inline TaskControl* get_or_new_task_control() {
75 butil::atomic<TaskControl*>* p = (butil::atomic<TaskControl*>*)&g_task_control;
76 TaskControl* c = p->load(butil::memory_order_consume);
77 if (c != NULL) {
78 return c;
79 }
80 BAIDU_SCOPED_LOCK(g_task_control_mutex);
81 c = p->load(butil::memory_order_consume);
82 if (c != NULL) {
83 return c;
84 }
85 c = new (std::nothrow) TaskControl;
86 if (NULL == c) {
87 return NULL;
88 }
89 int concurrency = FLAGS_bthread_min_concurrency > 0 ?
90 FLAGS_bthread_min_concurrency :
91 FLAGS_bthread_concurrency;
92 if (c->init(concurrency) != 0) {
94 delete c;
95 return NULL;
96 }
97 p->store(c, butil::memory_order_release);
98 return c;
99 }
默认构造时_groups
时_groups((TaskGroup**)calloc(BTHREAD_MAX_CONCURRENCY, sizeof(TaskGroup*)))
,其中BTHREAD_MAX_CONCURRENCY=1024
,接着创建concurrency个工作线程:
147 int TaskControl::init(int concurrency) {
148 if (_concurrency != 0) {
150 return -1;
151 }
152 if (concurrency <= 0) {
154 return -1;
155 }
156 _concurrency = concurrency;
157
158 // Make sure TimerThread is ready.
159 if (get_or_create_global_timer_thread() == NULL) {
161 return -1;
162 }
163
164 _workers.resize(_concurrency);
165 for (int i = 0; i < _concurrency; ++i) {
166 const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
167 if (rc) {
169 return -1;
170 }
171 }
177 // Wait for at least one group is added so that choose_one_group()
178 // never returns NULL.
179 // TODO: Handle the case that worker quits before add_group
180 while (_ngroup == 0) {
181 usleep(100); // TODO: Elaborate
182 }
183 return 0;
184 }
以上创建一个全局定时器和若干个工作线程,工作线程的callback为:
59 void* TaskControl::worker_thread(void* arg) {
64 //more code...
65 TaskControl* c = static_cast<TaskControl*>(arg);
66 TaskGroup* g = c->create_group();
68 if (NULL == g) {
70 return NULL;
71 }
74
75 tls_task_group = g;
77 g->run_main_task();
78
83 tls_task_group = NULL;
84 g->destroy_self();
86 return NULL;
87 }
每个工作线程创建一个TaskGroup
,接着run_main_task
,结束时销毁destroy_self
。
89 TaskGroup* TaskControl::create_group() {
90 TaskGroup* g = new (std::nothrow) TaskGroup(this);
91 if (NULL == g) {
93 return NULL;
94 }
95 if (g->init(FLAGS_task_group_runqueue_capacity) != 0) {
97 delete g;
98 return NULL;
99 }
100 if (_add_group(g) != 0) {
101 delete g;
102 return NULL;
103 }
104 return g;
105 }
214 int TaskGroup::init(size_t runqueue_capacity) {
215 if (_rq.init(runqueue_capacity) != 0) {
217 return -1;
218 }
219 if (_remote_rq.init(runqueue_capacity / 2) != 0) {
221 return -1;
222 }
223 ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
224 if (NULL == stk) {
226 return -1;
227 }
228 butil::ResourceId<TaskMeta> slot;
229 TaskMeta* m = butil::get_resource<TaskMeta>(&slot);
230 if (NULL == m) {
232 return -1;
233 }
234 m->stop = false;
235 m->interrupted = false;
236 m->about_to_quit = false;
237 m->fn = NULL;
238 m->arg = NULL;
239 m->local_storage = LOCAL_STORAGE_INIT;
240 m->cpuwide_start_ns = butil::cpuwide_time_ns();
241 m->stat = EMPTY_STAT;
242 m->attr = BTHREAD_ATTR_TASKGROUP;
243 m->tid = make_tid(*m->version_butex, slot);
244 m->set_stack(stk);
245
246 _cur_meta = m;
247 _main_tid = m->tid;
248 _main_stack = stk;
249 _last_run_ns = butil::cpuwide_time_ns();
250 return 0;
251 }
以上创建main_bthread,设置main_bthread的栈信息和其他一些数据。下面分别介绍,其中栈结构类型:
57 enum StackType {
58 STACK_TYPE_MAIN = 0,
59 STACK_TYPE_PTHREAD = BTHREAD_STACKTYPE_PTHREAD,
60 STACK_TYPE_SMALL = BTHREAD_STACKTYPE_SMALL,
61 STACK_TYPE_NORMAL = BTHREAD_STACKTYPE_NORMAL,
62 STACK_TYPE_LARGE = BTHREAD_STACKTYPE_LARGE
63 };
64
65 struct ContextualStack {
66 bthread_fcontext_t context;
67 StackType stacktype;
68 StackStorage storage;
69 };
71 typedef void* bthread_fcontext_t;
34 struct StackStorage {
35 int stacksize;
36 int guardsize;
37 // Assume stack grows upwards.
38 // http://www.boost.org/doc/libs/1_55_0/libs/context/doc/html/context/stack.html
39 void* bottom;
40 unsigned valgrind_stack_id;
41
42 // Clears all members.
43 void zeroize() {
44 stacksize = 0;
45 guardsize = 0;
46 bottom = NULL;
47 valgrind_stack_id = 0;
48 }
49 };
97 inline ContextualStack* get_stack(StackType type, void (*entry)(intptr_t)) {
98 switch (type) {
99 case STACK_TYPE_PTHREAD:
100 return NULL;
101 case STACK_TYPE_SMALL:
102 return StackFactory<SmallStackClass>::get_stack(entry);
103 case STACK_TYPE_NORMAL:
104 return StackFactory<NormalStackClass>::get_stack(entry);
105 case STACK_TYPE_LARGE:
106 return StackFactory<LargeStackClass>::get_stack(entry);
107 case STACK_TYPE_MAIN:
108 return StackFactory<MainStackClass>::get_stack(entry);
109 }
110 return NULL;
111 }
80 template <> struct StackFactory<MainStackClass> {
81 static ContextualStack* get_stack(void (*)(intptr_t)) {
82 ContextualStack* s = new (std::nothrow) ContextualStack;
83 if (NULL == s) {
84 return NULL;
85 }
86 s->context = NULL;
87 s->stacktype = STACK_TYPE_MAIN;
88 s->storage.zeroize();
89 return s;
90 }
91
92 static void return_stack(ContextualStack* s) {
93 delete s;
94 }
95 };
每个bthread关联一个struct TaskMeta
,部分类型声明如下:
52 struct TaskMeta {
74 bthread_t tid;
77 void* (*fn)(void*);
78 void* arg;
81 ContextualStack* stack;
84 bthread_attr_t attr;
93 LocalStorage local_storage;
126 };
接着工作线程run_main_task
:
146 void TaskGroup::run_main_task() {
151 TaskGroup* dummy = this;
152 bthread_t tid;
153 while (wait_task(&tid)) {
154 TaskGroup::sched_to(&dummy, tid);
155 DCHECK_EQ(this, dummy);
156 DCHECK_EQ(_cur_meta->stack, _main_stack);
157 if (_cur_meta->tid != _main_tid) {
158 TaskGroup::task_runner(1/*skip remained*/);
159 }
172 }
173 // stop_main_task() was called.
176 }
119 bool TaskGroup::wait_task(bthread_t* tid) {
120 do {
130 const ParkingLot::State st = _pl->get_state();
131 if (st.stopped()) {
132 return false;
133 }
134 if (steal_task(tid)) {
135 return true;
136 }
137 _pl->wait(st);
139 } while (true);
140 }
215 bool steal_task(bthread_t* tid) {
216 if (_remote_rq.pop(tid)) {
217 return true;
218 }
222 return _control->steal_task(tid, &_steal_seed, _steal_offset);
223 }
以上等待一个任务,其中从_remote_rq
取一个任务,如果没有的话从_control-> steal_task
随机取一个,否则的话wait:
336 bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
337 // 1: Acquiring fence is paired with releasing fence in _add_group to
338 // avoid accessing uninitialized slot of _groups.
339 const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
340 if (0 == ngroup) {
341 return false;
342 }
343
344 // NOTE: Don't return inside `for' iteration since we need to update |seed|
345 bool stolen = false;
346 size_t s = *seed;
347 for (size_t i = 0; i < ngroup; ++i, s += offset) {
348 TaskGroup* g = _groups[s % ngroup];
349 // g is possibly NULL because of concurrent _destroy_group
350 if (g) {
351 if (g->_rq.steal(tid)) {
352 stolen = true;
353 break;
354 }
355 if (g->_remote_rq.pop(tid)) {
356 stolen = true;
357 break;
358 }
359 }
360 }
361 *seed = s;
362 return stolen;
363 }
59 // If the `expected_state' does not match, wait() may finish directly.
60 void wait(const State& expected_state) {
61 futex_wait_private(&_pending_signal, expected_state.val, NULL);
62 }
接着主线程继续start_background
,从后台创建bthread并入队列待调度,相应立即执行的start_foreground
:
415 template <bool REMOTE>
416 int TaskGroup::start_background(bthread_t* __restrict th,
417 const bthread_attr_t* __restrict attr,
418 void * (*fn)(void*),
419 void* __restrict arg) {
421 if (__builtin_expect(!fn, 0)) {
422 return EINVAL;
423 }
424 const int64_t start_ns = butil::cpuwide_time_ns();
425 const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
426 butil::ResourceId<TaskMeta> slot;
427 TaskMeta* m = butil::get_resource(&slot);
428 if (__builtin_expect(!m, 0)) {
429 return ENOMEM;
430 }
431 CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
432 m->stop = false;
433 m->interrupted = false;
434 m->about_to_quit = false;
435 m->fn = fn;//callback
436 m->arg = arg;//arg
437 CHECK(m->stack == NULL);
438 m->attr = using_attr;
439 m->local_storage = LOCAL_STORAGE_INIT;
440 m->cpuwide_start_ns = start_ns;
441 m->stat = EMPTY_STAT;
442 m->tid = make_tid(*m->version_butex, slot);
443 *th = m->tid;
444 if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
446 }
448 if (REMOTE) {
449 ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
450 } else {
451 ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
452 }
453 return 0;
454 }
ready_to_run_remote
由其他bthread创建,为bthread指定要创建BTHREAD_STACKTYPE_NORMAL
类型栈,这会在后面分析;接着投递一个任务给工作线程,即将唤醒:
673 void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
674 _remote_rq._mutex.lock();
675 while (!_remote_rq.push_locked(tid)) {
676 flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
679 ::usleep(1000);
680 _remote_rq._mutex.lock();
681 }
682 if (nosignal) {
683 ++_remote_num_nosignal;
684 _remote_rq._mutex.unlock();
685 } else {
686 const int additional_signal = _remote_num_nosignal;
687 _remote_num_nosignal = 0;
688 _remote_nsignaled += 1 + additional_signal;
689 _remote_rq._mutex.unlock();
690 _control->signal_task(1 + additional_signal);
691 }
692 }
33 // A queue for storing bthreads created by non-workers. Since non-workers
34 // randomly choose a TaskGroup to push which distributes the contentions,
35 // this queue is simply implemented as a queue protected with a lock.
36 // The function names should be self-explanatory.
_remote_rq
的实现注释,这里简单使用了_mutex
。如果队列满了则等待,否则通知有任务到来,会随机到哪个parkinglot,其中当任务过多时可能会增加工作线程:
367 void TaskControl::signal_task(int num_task) {
368 if (num_task <= 0) {
369 return;
370 }
371 // TODO(gejun): Current algorithm does not guarantee enough threads will
372 // be created to match caller's requests. But in another side, there's also
373 // many useless signalings according to current impl. Capping the concurrency
374 // is a good balance between performance and timeliness of scheduling.
375 if (num_task > 2) {
376 num_task = 2;
377 }
378 int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
379 num_task -= _pl[start_index].signal(1);
380 if (num_task > 0) {
381 for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
382 if (++start_index >= PARKING_LOT_NUM) {
383 start_index = 0;
384 }
385 num_task -= _pl[start_index].signal(1);
386 }
387 }
388 if (num_task > 0 &&
389 FLAGS_bthread_min_concurrency > 0 && // test min_concurrency for performance
390 _concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) {
391 // TODO: Reduce this lock
392 BAIDU_SCOPED_LOCK(g_task_control_mutex);
393 if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) {
394 add_workers(1);
395 }
396 }
397 }
当工作线程被唤醒时,从wait_task
醒来,接着sched_to
:
64 TaskMeta* next_meta = address_meta(next_tid);
65 if (next_meta->stack == NULL) {
66 ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
67 if (stk) {
68 next_meta->set_stack(stk);
69 } else {
70 // stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
71 // In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
72 // This basically means that if we can't allocate stack, run
73 // the task in pthread directly.
74 next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
75 next_meta->set_stack((*pg)->_main_stack);
76 }
77 }
78 // Update now_ns only when wait_task did yield.
79 sched_to(pg, next_meta);
80 }
此时才真正开始创建栈内存,因为是STACK_TYPE_NORMAL
类型,这里以堆上为例,另外还有mmap;bthread_make_fcontext
是一段汇编,暂时可认为是一些上下文的状态数据,晚些再分析:
40 struct NormalStackClass {
41 static int* stack_size_flag;
42 static const int stacktype = (int)STACK_TYPE_NORMAL;
43 };
50 template <typename StackClass> struct StackFactory {
51 struct Wrapper : public ContextualStack {
52 explicit Wrapper(void (*entry)(intptr_t)) {
53 if (allocate_stack_storage(&storage, *StackClass::stack_size_flag,
54 FLAGS_guard_page_size) != 0) {
55 storage.zeroize();
56 context = NULL;
57 return;
58 }
59 context = bthread_make_fcontext(storage.bottom, storage.stacksize, entry);
60 stacktype = (StackType)StackClass::stacktype;
61 }
62 ~Wrapper() {
63 if (context) {
64 context = NULL;
65 deallocate_stack_storage(&storage);
66 storage.zeroize();
67 }
68 }
69 };
70
71 static ContextualStack* get_stack(void (*entry)(intptr_t)) {
72 return butil::get_object<Wrapper>(entry);
73 }
74
75 static void return_stack(ContextualStack* sc) {
76 butil::return_object(static_cast<Wrapper*>(sc));
77 }
78 };
57 int allocate_stack_storage(StackStorage* s, int stacksize_in, int guardsize_in) {
58 const static int PAGESIZE = getpagesize();
59 const int PAGESIZE_M1 = PAGESIZE - 1;
60 const int MIN_STACKSIZE = PAGESIZE * 2;
61 const int MIN_GUARDSIZE = PAGESIZE;
62
63 // Align stacksize
64 const int stacksize =
65 (std::max(stacksize_in, MIN_STACKSIZE) + PAGESIZE_M1) &
66 ~PAGESIZE_M1;
67
68 if (guardsize_in <= 0) {
69 void* mem = malloc(stacksize);
70 if (NULL == mem) {
73 return -1;
74 }
76 s->bottom = (char*)mem + stacksize;
77 s->stacksize = stacksize;
78 s->guardsize = 0;
83 s->valgrind_stack_id = 0;
85 return 0;
131 }
132 }
134 void deallocate_stack_storage(StackStorage* s) {
138 const int memsize = s->stacksize + s->guardsize;
139 if ((uintptr_t)s->bottom <= (uintptr_t)memsize) {
140 return;
141 }
143 if (s->guardsize <= 0) {
144 free((char*)s->bottom - memsize);
145 } else {
146 munmap((char*)s->bottom - memsize, memsize);
147 }
148 }
以上分配栈,根据要分配的栈类型,STACK_TYPE_NORMAL
默认大小1024kb,
567 void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
568 TaskGroup* g = *pg;
575 // Save errno so that errno is bthread-specific.
576 const int saved_errno = errno;
577 void* saved_unique_user_ptr = tls_unique_user_ptr;
578
579 TaskMeta* const cur_meta = g->_cur_meta;
580 const int64_t now = butil::cpuwide_time_ns();
581 const int64_t elp_ns = now - g->_last_run_ns;
582 g->_last_run_ns = now;
583 cur_meta->stat.cputime_ns += elp_ns;
584 if (cur_meta->tid != g->main_tid()) {
585 g->_cumulated_cputime_ns += elp_ns;
586 }
587 ++cur_meta->stat.nswitch;
588 ++ g->_nswitch;
589 // Switch to the task
590 if (__builtin_expect(next_meta != cur_meta, 1)) {
591 g->_cur_meta = next_meta;
592 // Switch tls_bls
595
596 cur_meta->local_storage = tls_bls;
597 tls_bls = next_meta->local_storage;
598
599 // Logging must be done after switching the local storage, since the logging lib
600 // use bthread local storage internally, or will cause memory leak.
601 if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) ||
602 (next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) {
605 }
607 if (cur_meta->stack != NULL) {
608 if (next_meta->stack != cur_meta->stack) {
609 jump_stack(cur_meta->stack, next_meta->stack);
610 // probably went to another group, need to assign g again.
611 g = tls_task_group;
612 }
613 #ifndef NDEBUG
614 else {
615 // else pthread_task is switching to another pthread_task, sc
616 // can only equal when they're both _main_stack
617 CHECK(cur_meta->stack == g->_main_stack);
618 }
619 #endif
620 }
621 // else because of ending_sched(including pthread_task->pthread_task)
622 }
625
626 while (g->_last_context_remained) {
627 RemainedFn fn = g->_last_context_remained;
628 g->_last_context_remained = NULL;
629 fn(g->_last_context_remained_arg);
630 g = tls_task_group;
631 }
633 // Restore errno
634 errno = saved_errno;
635 tls_unique_user_ptr = saved_unique_user_ptr;
640 *pg = g;
641 }
jump_stack
的实现后面再分析,这里可以认为是保存cur_meta->stack
当前的寄存器状态, 并恢复next_meta->stack
状态至寄存器,反正一堆push/pop操作,跟libco协程有些类似但有些不同。后面会拿这两个作些对比和调试过程中数据的变化。最终会执行到task_runner
:
253 void TaskGroup::task_runner(intptr_t skip_remained) {
254 // NOTE: tls_task_group is volatile since tasks are moved around
255 // different groups.
256 TaskGroup* g = tls_task_group;
257
258 if (!skip_remained) {
259 while (g->_last_context_remained) {
260 RemainedFn fn = g->_last_context_remained;
261 g->_last_context_remained = NULL;
262 fn(g->_last_context_remained_arg);
263 g = tls_task_group;
264 }
269 }
271 do {
280 // Meta and identifier of the task is persistent in this run.
281 TaskMeta* const m = g->_cur_meta;
290
291 // Not catch exceptions except ExitException which is for implementing
292 // bthread_exit(). User code is intended to crash when an exception is
293 // not caught explicitly. This is consistent with other threading
294 // libraries.
295 void* thread_return;
296 try {
297 thread_return = m->fn(m->arg);
298 } catch (ExitException& e) {
299 thread_return = e.value();
300 }
301
302 // Group is probably changed
303 g = tls_task_group;
316 // Clean tls variables, must be done before changing version_butex
317 // otherwise another thread just joined this thread may not see side
318 // effects of destructing tls variables.
319 KeyTable* kt = tls_bls.keytable;
320 if (kt != NULL) {
321 return_keytable(m->attr.keytable_pool, kt);
322 // After deletion: tls may be set during deletion.
323 tls_bls.keytable = NULL;
324 m->local_storage.keytable = NULL; // optional
325 }
331 {
332 BAIDU_SCOPED_LOCK(m->version_lock);
333 if (0 == ++*m->version_butex) {
334 ++*m->version_butex;
335 }
336 }
337 butex_wake_except(m->version_butex, 0);
338
340 g->set_remained(TaskGroup::_release_last_context, m);
341 ending_sched(&g);
342
343 } while (g->_cur_meta->tid != g->_main_tid);
347 }
以上在真正开始执行用户回调前,先执行由切换前设置的函数,其实这里我也不是很清楚为什么要这么,相反在结束返回时由最开始切换的地方再执行,即_release_last_context
:
626 while (g->_last_context_remained) {
627 RemainedFn fn = g->_last_context_remained;
628 g->_last_context_remained = NULL;
629 fn(g->_last_context_remained_arg);
630 g = tls_task_group;
631 }
349 void TaskGroup::_release_last_context(void* arg) {
350 TaskMeta* m = static_cast<TaskMeta*>(arg);
351 if (m->stack_type() != STACK_TYPE_PTHREAD) {
352 return_stack(m->release_stack()/*may be NULL*/);
353 } else {
354 // it's _main_stack, don't return.
355 m->set_stack(NULL);
356 }
357 return_resource(get_slot(m->tid));
358 }
接着执行回调函数,并清理bthread的tls相关keytable,接着换醒可能等待某个bthread的其他bthreads,设置set_remained跟开始时形成呼应,并完成调度。
510 void TaskGroup::ending_sched(TaskGroup** pg) {
511 TaskGroup* g = *pg;
512 bthread_t next_tid = 0;
513 // Find next task to run, if none, switch to idle thread of the group.
520 const bool popped = g->_rq.steal(&next_tid);
522 if (!popped && !g->steal_task(&next_tid)) {
523 // Jump to main task if there's no task to run.
524 next_tid = g->_main_tid;
525 }
527 TaskMeta* const cur_meta = g->_cur_meta;
528 TaskMeta* next_meta = address_meta(next_tid);
529 if (next_meta->stack == NULL) {
530 if (next_meta->stack_type() == cur_meta->stack_type()) {
531 // also works with pthread_task scheduling to pthread_task, the
532 // transfered stack is just _main_stack.
533 next_meta->set_stack(cur_meta->release_stack());
534 } else {
535 ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
536 if (stk) {
537 next_meta->set_stack(stk);
538 } else {
539 // stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
540 // In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
541 // This basically means that if we can't allocate stack, run
542 // the task in pthread directly.
543 next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
544 next_meta->set_stack(g->_main_stack);
545 }
546 }
547 }
548 sched_to(pg, next_meta);
549 }
以上尝试跑下一个bthread。如TaskGroup::usleep
和TaskGroup::yield
以及TaskGroup::start_foreground
都会设置set_remained
并切换:
399 RemainedFn fn = NULL;
400 if (g->current_task()->about_to_quit) {
401 fn = ready_to_run_in_worker_ignoresignal;
402 } else {
403 fn = ready_to_run_in_worker;
404 }
405 ReadyToRunArgs args = {
406 g->current_tid(),
407 (bool)(using_attr.flags & BTHREAD_NOSIGNAL)
408 };
409 g->set_remained(fn, &args);
410 TaskGroup::sched_to(pg, m->tid); //TaskGroup::start_foreground
795 TaskGroup* g = *pg;
796 // We have to schedule timer after we switched to next bthread otherwise
797 // the timer may wake up(jump to) current still-running context.
798 SleepArgs e = { timeout_us, g->current_tid(), g->current_task(), g };
799 g->set_remained(_add_sleep_event, &e);
800 sched(pg);
801 g = *pg;
802 e.meta->current_sleep = 0;//TaskGroup::usleep
894 void TaskGroup::yield(TaskGroup** pg) {
895 TaskGroup* g = *pg;
896 ReadyToRunArgs args = { g->current_tid(), false };
897 g->set_remained(ready_to_run_in_worker, &args);
898 sched(pg);//TaskGroup::yield
899 }
贴上来自brpc_internal ppt中的taskcontrol的介绍:
-
Singleton to manage all TaskGroups
-start, stop, add -
What if we create a bthread in non-worker (normal) pthreads
-non-worker doesn’t have TaskGroup
-TaskControl choose a TaskGroup
-push into its RemoteTaskQueue
A queue for tasks from non-workers
use mutex to protect from concurrent push/pop/steal -
signal_task -- only notify some ParkingLots (thus some TaskGroups)
-
steal_task -- steal bthreads from all TaskGroups to prevent starvation
-
Overall scheduling order
-local run queue
-local remote queue
-other workers’ run queue
-other workers’remote queue
网友评论