美文网首页
brpc之bthread源码浅淅(二)

brpc之bthread源码浅淅(二)

作者: fooboo | 来源:发表于2019-10-12 23:24 被阅读0次

这里准备分析下bthread的创建流程,以及bthread之间的切换,从multi_threaded_echo_c++代码实现分析。
直接贴上部分代码:

 87 int main(int argc, char* argv[]) {
 88     //more code...
123     std::vector<bthread_t> bids;
124     std::vector<pthread_t> pids;
125     if (!FLAGS_use_bthread) {
126         pids.resize(FLAGS_thread_num);
127         for (int i = 0; i < FLAGS_thread_num; ++i) {
128             if (pthread_create(&pids[i], NULL, sender, &channel) != 0) {
130                 return -1;
131             }
132         }
133     } else {
134         bids.resize(FLAGS_thread_num);
135         for (int i = 0; i < FLAGS_thread_num; ++i) {
136             if (bthread_start_background(
137                     &bids[i], NULL, sender, &channel) != 0) {
139                 return -1;
140             }
141         }
142     }
144     while (!brpc::IsAskedToQuit()) {
145         sleep(1);
148     }
149 
151     for (int i = 0; i < FLAGS_thread_num; ++i) {
152         if (!FLAGS_use_bthread) {
153             pthread_join(pids[i], NULL);
154         } else {
155             bthread_join(bids[i], NULL);
156         }
157     }
159     return 0;
160 }

 48 static void* sender(void* arg) {
 51     example::EchoService_Stub stub(static_cast<google::protobuf::RpcChannel*>(arg));
 52     //这里是进行rpc请求操作
 84     return NULL;
 85 }

以上是根据选项参数是创建pthread还是bthread,这里以后者为说明,最后join bthread_id。

185 int bthread_start_background(bthread_t* __restrict tid,
186                              const bthread_attr_t* __restrict attr,
187                              void * (*fn)(void*),
188                              void* __restrict arg) {
189     bthread::TaskGroup* g = bthread::tls_task_group;
190     if (g) {
192         // start from worker
193         return g->start_background<false>(tid, attr, fn, arg);
194     }   
196     return bthread::start_from_non_worker(tid, attr, fn, arg);
197 }

 61 __thread TaskGroup* tls_task_group = NULL;
 62 // Sync with TaskMeta::local_storage when a bthread is created or destroyed.
 63 // During running, the two fields may be inconsistent, use tls_bls as the
 64 // groundtruth.              
 65 __thread LocalStorage tls_bls = BTHREAD_LOCAL_STORAGE_INITIALIZER;

因为主线程启动bthread_start_background时,tls_task_group肯定为null的。

124 BUTIL_FORCE_INLINE int       
125 start_from_non_worker(bthread_t* __restrict tid,
126                       const bthread_attr_t* __restrict attr,
127                       void * (*fn)(void*),
128                       void* __restrict arg) {
129     TaskControl* c = get_or_new_task_control();
130     if (NULL == c) {
131         return ENOMEM;
132     }
133     if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) {
134         //more code...
144     }   
145     return c->choose_one_group()->start_background<true>(
146         tid, attr, fn, arg);
147 }
214 TaskGroup* TaskControl::choose_one_group() {
215     const size_t ngroup = _ngroup.load(butil::memory_order_acquire);
216     if (ngroup != 0) {
217         return _groups[butil::fast_rand_less_than(ngroup)];
218     }
220     return NULL;
221 }

随机选择一个TaskGroup

其中启动时会创建一个任务控制对象TaskControl,管理着若干个TaskGroup,而后者由每个线程拥有。

 74 inline TaskControl* get_or_new_task_control() {
 75     butil::atomic<TaskControl*>* p = (butil::atomic<TaskControl*>*)&g_task_control;
 76     TaskControl* c = p->load(butil::memory_order_consume);
 77     if (c != NULL) {
 78         return c;
 79     }
 80     BAIDU_SCOPED_LOCK(g_task_control_mutex);
 81     c = p->load(butil::memory_order_consume);
 82     if (c != NULL) {
 83         return c;
 84     }
 85     c = new (std::nothrow) TaskControl;
 86     if (NULL == c) {
 87         return NULL;
 88     }
 89     int concurrency = FLAGS_bthread_min_concurrency > 0 ?
 90         FLAGS_bthread_min_concurrency :
 91         FLAGS_bthread_concurrency;
 92     if (c->init(concurrency) != 0) {
 94         delete c;
 95         return NULL;
 96     }
 97     p->store(c, butil::memory_order_release);
 98     return c;
 99 }

默认构造时_groups_groups((TaskGroup**)calloc(BTHREAD_MAX_CONCURRENCY, sizeof(TaskGroup*))),其中BTHREAD_MAX_CONCURRENCY=1024,接着创建concurrency个工作线程:

147 int TaskControl::init(int concurrency) {
148     if (_concurrency != 0) {
150         return -1;
151     }
152     if (concurrency <= 0) {
154         return -1;
155     }
156     _concurrency = concurrency;
157 
158     // Make sure TimerThread is ready.
159     if (get_or_create_global_timer_thread() == NULL) {
161         return -1;
162     }
163 
164     _workers.resize(_concurrency);
165     for (int i = 0; i < _concurrency; ++i) {
166         const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
167         if (rc) {
169             return -1;
170         }
171     }
177     // Wait for at least one group is added so that choose_one_group()
178     // never returns NULL.
179     // TODO: Handle the case that worker quits before add_group
180     while (_ngroup == 0) {
181         usleep(100);  // TODO: Elaborate
182     }   
183     return 0;
184 }

以上创建一个全局定时器和若干个工作线程,工作线程的callback为:

 59 void* TaskControl::worker_thread(void* arg) {
 64     //more code...
 65     TaskControl* c = static_cast<TaskControl*>(arg);
 66     TaskGroup* g = c->create_group();
 68     if (NULL == g) {
 70         return NULL;
 71     }
 74     
 75     tls_task_group = g;
 77     g->run_main_task();
 78     
 83     tls_task_group = NULL;
 84     g->destroy_self();
 86     return NULL;
 87 }

每个工作线程创建一个TaskGroup,接着run_main_task,结束时销毁destroy_self

 89 TaskGroup* TaskControl::create_group() {
 90     TaskGroup* g = new (std::nothrow) TaskGroup(this);
 91     if (NULL == g) {
 93         return NULL;
 94     }
 95     if (g->init(FLAGS_task_group_runqueue_capacity) != 0) {
 97         delete g;
 98         return NULL;
 99     }
100     if (_add_group(g) != 0) {
101         delete g;
102         return NULL;
103     }
104     return g;
105 }
214 int TaskGroup::init(size_t runqueue_capacity) {
215     if (_rq.init(runqueue_capacity) != 0) {
217         return -1; 
218     }   
219     if (_remote_rq.init(runqueue_capacity / 2) != 0) {
221         return -1; 
222     }   
223     ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
224     if (NULL == stk) {
226         return -1; 
227     }   
228     butil::ResourceId<TaskMeta> slot;
229     TaskMeta* m = butil::get_resource<TaskMeta>(&slot);
230     if (NULL == m) {
232         return -1; 
233     }   
234     m->stop = false;
235     m->interrupted = false;
236     m->about_to_quit = false;
237     m->fn = NULL;
238     m->arg = NULL;
239     m->local_storage = LOCAL_STORAGE_INIT;
240     m->cpuwide_start_ns = butil::cpuwide_time_ns();
241     m->stat = EMPTY_STAT; 
242     m->attr = BTHREAD_ATTR_TASKGROUP;
243     m->tid = make_tid(*m->version_butex, slot);
244     m->set_stack(stk);
245     
246     _cur_meta = m;
247     _main_tid = m->tid;
248     _main_stack = stk;
249     _last_run_ns = butil::cpuwide_time_ns();
250     return 0;
251 }

以上创建main_bthread,设置main_bthread的栈信息和其他一些数据。下面分别介绍,其中栈结构类型:

 57 enum StackType {
 58     STACK_TYPE_MAIN = 0,
 59     STACK_TYPE_PTHREAD = BTHREAD_STACKTYPE_PTHREAD,
 60     STACK_TYPE_SMALL = BTHREAD_STACKTYPE_SMALL,
 61     STACK_TYPE_NORMAL = BTHREAD_STACKTYPE_NORMAL, 
 62     STACK_TYPE_LARGE = BTHREAD_STACKTYPE_LARGE
 63 };      
 64     
 65 struct ContextualStack { 
 66     bthread_fcontext_t context;
 67     StackType stacktype;
 68     StackStorage storage;
 69 };

 71 typedef void* bthread_fcontext_t;

 34 struct StackStorage {
 35      int stacksize;
 36      int guardsize;
 37     // Assume stack grows upwards.
 38     // http://www.boost.org/doc/libs/1_55_0/libs/context/doc/html/context/stack.html
 39     void* bottom;
 40     unsigned valgrind_stack_id;
 41 
 42     // Clears all members.
 43     void zeroize() {
 44         stacksize = 0;
 45         guardsize = 0;
 46         bottom = NULL;
 47         valgrind_stack_id = 0;
 48     }
 49 };
 97 inline ContextualStack* get_stack(StackType type, void (*entry)(intptr_t)) {
 98     switch (type) {
 99     case STACK_TYPE_PTHREAD:
100         return NULL;
101     case STACK_TYPE_SMALL:
102         return StackFactory<SmallStackClass>::get_stack(entry);
103     case STACK_TYPE_NORMAL:
104         return StackFactory<NormalStackClass>::get_stack(entry);
105     case STACK_TYPE_LARGE:
106         return StackFactory<LargeStackClass>::get_stack(entry);
107     case STACK_TYPE_MAIN:
108         return StackFactory<MainStackClass>::get_stack(entry);
109     }
110     return NULL;
111 }

 80 template <> struct StackFactory<MainStackClass> {
 81     static ContextualStack* get_stack(void (*)(intptr_t)) {
 82         ContextualStack* s = new (std::nothrow) ContextualStack;
 83         if (NULL == s) {
 84             return NULL;
 85         }
 86         s->context = NULL;
 87         s->stacktype = STACK_TYPE_MAIN;
 88         s->storage.zeroize();
 89         return s;
 90     }
 91 
 92     static void return_stack(ContextualStack* s) {
 93         delete s;
 94     }
 95 };

每个bthread关联一个struct TaskMeta,部分类型声明如下:

 52 struct TaskMeta {
 74     bthread_t tid;
 77     void* (*fn)(void*);
 78     void* arg;
 81     ContextualStack* stack;
 84     bthread_attr_t attr;
 93     LocalStorage local_storage;
126 };

接着工作线程run_main_task

146 void TaskGroup::run_main_task() {
151     TaskGroup* dummy = this;
152     bthread_t tid;
153     while (wait_task(&tid)) {
154         TaskGroup::sched_to(&dummy, tid);
155         DCHECK_EQ(this, dummy);
156         DCHECK_EQ(_cur_meta->stack, _main_stack);
157         if (_cur_meta->tid != _main_tid) {
158             TaskGroup::task_runner(1/*skip remained*/);
159         }
172     }
173     // stop_main_task() was called.
176 }

119 bool TaskGroup::wait_task(bthread_t* tid) {
120     do {
130         const ParkingLot::State st = _pl->get_state();
131         if (st.stopped()) {
132             return false;
133         }
134         if (steal_task(tid)) {
135             return true;
136         }
137         _pl->wait(st);
139     } while (true);
140 }

215     bool steal_task(bthread_t* tid) {
216         if (_remote_rq.pop(tid)) {
217             return true;
218         }
222         return _control->steal_task(tid, &_steal_seed, _steal_offset);
223     }

以上等待一个任务,其中从_remote_rq取一个任务,如果没有的话从_control-> steal_task随机取一个,否则的话wait:

336 bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
337     // 1: Acquiring fence is paired with releasing fence in _add_group to
338     // avoid accessing uninitialized slot of _groups.
339     const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
340     if (0 == ngroup) {
341         return false; 
342     }
343     
344     // NOTE: Don't return inside `for' iteration since we need to update |seed|
345     bool stolen = false;
346     size_t s = *seed;
347     for (size_t i = 0; i < ngroup; ++i, s += offset) {
348         TaskGroup* g = _groups[s % ngroup];
349         // g is possibly NULL because of concurrent _destroy_group
350         if (g) {
351             if (g->_rq.steal(tid)) {
352                 stolen = true;
353                 break;
354             }
355             if (g->_remote_rq.pop(tid)) {
356                 stolen = true;
357                 break;
358             }
359         }
360     }
361     *seed = s;
362     return stolen;
363 }
 59     // If the `expected_state' does not match, wait() may finish directly.
 60     void wait(const State& expected_state) {
 61         futex_wait_private(&_pending_signal, expected_state.val, NULL);
 62     }

接着主线程继续start_background,从后台创建bthread并入队列待调度,相应立即执行的start_foreground

415 template <bool REMOTE>
416 int TaskGroup::start_background(bthread_t* __restrict th,
417                                 const bthread_attr_t* __restrict attr,
418                                 void * (*fn)(void*),
419                                 void* __restrict arg) {
421     if (__builtin_expect(!fn, 0)) {
422         return EINVAL;
423     }
424     const int64_t start_ns = butil::cpuwide_time_ns();
425     const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
426     butil::ResourceId<TaskMeta> slot;
427     TaskMeta* m = butil::get_resource(&slot);
428     if (__builtin_expect(!m, 0)) { 
429         return ENOMEM;
430     }
431     CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
432     m->stop = false;
433     m->interrupted = false;
434     m->about_to_quit = false;
435     m->fn = fn;//callback
436     m->arg = arg;//arg
437     CHECK(m->stack == NULL);
438     m->attr = using_attr;
439     m->local_storage = LOCAL_STORAGE_INIT;
440     m->cpuwide_start_ns = start_ns;
441     m->stat = EMPTY_STAT;
442     m->tid = make_tid(*m->version_butex, slot);
443     *th = m->tid;
444     if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
446     }
448     if (REMOTE) {
449         ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
450     } else {
451         ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
452     }
453     return 0;
454 }

ready_to_run_remote由其他bthread创建,为bthread指定要创建BTHREAD_STACKTYPE_NORMAL类型栈,这会在后面分析;接着投递一个任务给工作线程,即将唤醒:

673 void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
674     _remote_rq._mutex.lock();
675     while (!_remote_rq.push_locked(tid)) {
676         flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
679         ::usleep(1000);
680         _remote_rq._mutex.lock();
681     }
682     if (nosignal) {
683         ++_remote_num_nosignal;
684         _remote_rq._mutex.unlock();
685     } else {
686         const int additional_signal = _remote_num_nosignal;
687         _remote_num_nosignal = 0;
688         _remote_nsignaled += 1 + additional_signal;
689         _remote_rq._mutex.unlock();
690         _control->signal_task(1 + additional_signal);
691     }
692 }
 33 // A queue for storing bthreads created by non-workers. Since non-workers
 34 // randomly choose a TaskGroup to push which distributes the contentions,
 35 // this queue is simply implemented as a queue protected with a lock.
 36 // The function names should be self-explanatory.

_remote_rq的实现注释,这里简单使用了_mutex。如果队列满了则等待,否则通知有任务到来,会随机到哪个parkinglot,其中当任务过多时可能会增加工作线程:

367 void TaskControl::signal_task(int num_task) {
368     if (num_task <= 0) {
369         return;
370     }
371     // TODO(gejun): Current algorithm does not guarantee enough threads will
372     // be created to match caller's requests. But in another side, there's also
373     // many useless signalings according to current impl. Capping the concurrency
374     // is a good balance between performance and timeliness of scheduling.
375     if (num_task > 2) {
376         num_task = 2;
377     }   
378     int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
379     num_task -= _pl[start_index].signal(1);
380     if (num_task > 0) {
381         for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
382             if (++start_index >= PARKING_LOT_NUM) {
383                 start_index = 0;
384             }
385             num_task -= _pl[start_index].signal(1); 
386         }
387     }
388     if (num_task > 0 &&
389         FLAGS_bthread_min_concurrency > 0 &&    // test min_concurrency for performance
390         _concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) {
391         // TODO: Reduce this lock
392         BAIDU_SCOPED_LOCK(g_task_control_mutex);
393         if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) {
394             add_workers(1);
395         }
396     }
397 }

当工作线程被唤醒时,从wait_task醒来,接着sched_to

 64     TaskMeta* next_meta = address_meta(next_tid);
 65     if (next_meta->stack == NULL) { 
 66         ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
 67         if (stk) {
 68             next_meta->set_stack(stk);
 69         } else {
 70             // stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
 71             // In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
 72             // This basically means that if we can't allocate stack, run
 73             // the task in pthread directly.
 74             next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
 75             next_meta->set_stack((*pg)->_main_stack);
 76         }            
 77     }
 78     // Update now_ns only when wait_task did yield.
 79     sched_to(pg, next_meta); 
 80 }

此时才真正开始创建栈内存,因为是STACK_TYPE_NORMAL类型,这里以堆上为例,另外还有mmap;bthread_make_fcontext是一段汇编,暂时可认为是一些上下文的状态数据,晚些再分析:

 40 struct NormalStackClass {
 41     static int* stack_size_flag;
 42     static const int stacktype = (int)STACK_TYPE_NORMAL;
 43 };

 50 template <typename StackClass> struct StackFactory {
 51     struct Wrapper : public ContextualStack {
 52         explicit Wrapper(void (*entry)(intptr_t)) {
 53             if (allocate_stack_storage(&storage, *StackClass::stack_size_flag,
 54                                        FLAGS_guard_page_size) != 0) {
 55                 storage.zeroize();
 56                 context = NULL;
 57                 return;
 58             }
 59             context = bthread_make_fcontext(storage.bottom, storage.stacksize, entry);
 60             stacktype = (StackType)StackClass::stacktype;
 61         }
 62         ~Wrapper() {
 63             if (context) {
 64                 context = NULL;
 65                 deallocate_stack_storage(&storage);
 66                 storage.zeroize();
 67             }
 68         }
 69     };  
 70 
 71     static ContextualStack* get_stack(void (*entry)(intptr_t)) {
 72         return butil::get_object<Wrapper>(entry);
 73     }
 74 
 75     static void return_stack(ContextualStack* sc) {
 76         butil::return_object(static_cast<Wrapper*>(sc));
 77     }
 78 };

 57 int allocate_stack_storage(StackStorage* s, int stacksize_in, int guardsize_in) {
 58     const static int PAGESIZE = getpagesize();
 59     const int PAGESIZE_M1 = PAGESIZE - 1;
 60     const int MIN_STACKSIZE = PAGESIZE * 2;
 61     const int MIN_GUARDSIZE = PAGESIZE;
 62 
 63     // Align stacksize
 64     const int stacksize =
 65         (std::max(stacksize_in, MIN_STACKSIZE) + PAGESIZE_M1) &
 66         ~PAGESIZE_M1;
 67 
 68     if (guardsize_in <= 0) {
 69         void* mem = malloc(stacksize);
 70         if (NULL == mem) {
 73             return -1;
 74         }
 76         s->bottom = (char*)mem + stacksize;
 77         s->stacksize = stacksize;
 78         s->guardsize = 0;
 83         s->valgrind_stack_id = 0;
 85         return 0;
131     }
132 }

134 void deallocate_stack_storage(StackStorage* s) {
138     const int memsize = s->stacksize + s->guardsize;
139     if ((uintptr_t)s->bottom <= (uintptr_t)memsize) {
140         return;
141     }
143     if (s->guardsize <= 0) {
144         free((char*)s->bottom - memsize);
145     } else {
146         munmap((char*)s->bottom - memsize, memsize);
147     }   
148 }

以上分配栈,根据要分配的栈类型,STACK_TYPE_NORMAL默认大小1024kb,

567 void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
568     TaskGroup* g = *pg;
575     // Save errno so that errno is bthread-specific.
576     const int saved_errno = errno;
577     void* saved_unique_user_ptr = tls_unique_user_ptr;
578     
579     TaskMeta* const cur_meta = g->_cur_meta;
580     const int64_t now = butil::cpuwide_time_ns();
581     const int64_t elp_ns = now - g->_last_run_ns;
582     g->_last_run_ns = now;
583     cur_meta->stat.cputime_ns += elp_ns; 
584     if (cur_meta->tid != g->main_tid()) {
585         g->_cumulated_cputime_ns += elp_ns;
586     }
587     ++cur_meta->stat.nswitch;
588     ++ g->_nswitch;
589     // Switch to the task
590     if (__builtin_expect(next_meta != cur_meta, 1)) {
591         g->_cur_meta = next_meta;
592         // Switch tls_bls
595 
596         cur_meta->local_storage = tls_bls;
597         tls_bls = next_meta->local_storage;
598 
599         // Logging must be done after switching the local storage, since the logging lib 
600         // use bthread local storage internally, or will cause memory leak.
601         if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) ||
602             (next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) {
605         }
607         if (cur_meta->stack != NULL) {
608             if (next_meta->stack != cur_meta->stack) {
609                 jump_stack(cur_meta->stack, next_meta->stack);
610                 // probably went to another group, need to assign g again.
611                 g = tls_task_group;
612             }
613 #ifndef NDEBUG
614             else {
615                 // else pthread_task is switching to another pthread_task, sc
616                 // can only equal when they're both _main_stack
617                 CHECK(cur_meta->stack == g->_main_stack);
618             }
619 #endif
620         }
621         // else because of ending_sched(including pthread_task->pthread_task)
622     }
625 
626     while (g->_last_context_remained) {
627         RemainedFn fn = g->_last_context_remained;
628         g->_last_context_remained = NULL;
629         fn(g->_last_context_remained_arg);
630         g = tls_task_group;
631     }
633     // Restore errno
634     errno = saved_errno;
635     tls_unique_user_ptr = saved_unique_user_ptr;
640     *pg = g;
641 }

jump_stack的实现后面再分析,这里可以认为是保存cur_meta->stack当前的寄存器状态, 并恢复next_meta->stack状态至寄存器,反正一堆push/pop操作,跟libco协程有些类似但有些不同。后面会拿这两个作些对比和调试过程中数据的变化。最终会执行到task_runner

253 void TaskGroup::task_runner(intptr_t skip_remained) {
254     // NOTE: tls_task_group is volatile since tasks are moved around
255     //       different groups.
256     TaskGroup* g = tls_task_group;
257 
258     if (!skip_remained) {
259         while (g->_last_context_remained) {
260             RemainedFn fn = g->_last_context_remained;
261             g->_last_context_remained = NULL;
262             fn(g->_last_context_remained_arg);
263             g = tls_task_group;
264         }
269     }

271     do {
280         // Meta and identifier of the task is persistent in this run.
281         TaskMeta* const m = g->_cur_meta;
290 
291         // Not catch exceptions except ExitException which is for implementing
292         // bthread_exit(). User code is intended to crash when an exception is 
293         // not caught explicitly. This is consistent with other threading 
294         // libraries.
295         void* thread_return;
296         try {
297             thread_return = m->fn(m->arg);
298         } catch (ExitException& e) {
299             thread_return = e.value();
300         }
301 
302         // Group is probably changed
303         g = tls_task_group;
316         // Clean tls variables, must be done before changing version_butex
317         // otherwise another thread just joined this thread may not see side
318         // effects of destructing tls variables.
319         KeyTable* kt = tls_bls.keytable;
320         if (kt != NULL) {
321             return_keytable(m->attr.keytable_pool, kt);
322             // After deletion: tls may be set during deletion.
323             tls_bls.keytable = NULL;
324             m->local_storage.keytable = NULL; // optional
325         }
331         {
332             BAIDU_SCOPED_LOCK(m->version_lock);
333             if (0 == ++*m->version_butex) {
334                 ++*m->version_butex;
335             }
336         }
337         butex_wake_except(m->version_butex, 0);
338 
340         g->set_remained(TaskGroup::_release_last_context, m);
341         ending_sched(&g);
342 
343     } while (g->_cur_meta->tid != g->_main_tid);
347 }

以上在真正开始执行用户回调前,先执行由切换前设置的函数,其实这里我也不是很清楚为什么要这么,相反在结束返回时由最开始切换的地方再执行,即_release_last_context

626     while (g->_last_context_remained) {
627         RemainedFn fn = g->_last_context_remained;
628         g->_last_context_remained = NULL;
629         fn(g->_last_context_remained_arg);
630         g = tls_task_group;
631     }

349 void TaskGroup::_release_last_context(void* arg) {
350     TaskMeta* m = static_cast<TaskMeta*>(arg);
351     if (m->stack_type() != STACK_TYPE_PTHREAD) {
352         return_stack(m->release_stack()/*may be NULL*/);
353     } else {
354         // it's _main_stack, don't return.
355         m->set_stack(NULL);
356     }
357     return_resource(get_slot(m->tid));
358 }

接着执行回调函数,并清理bthread的tls相关keytable,接着换醒可能等待某个bthread的其他bthreads,设置set_remained跟开始时形成呼应,并完成调度。

510 void TaskGroup::ending_sched(TaskGroup** pg) {
511     TaskGroup* g = *pg;
512     bthread_t next_tid = 0;
513     // Find next task to run, if none, switch to idle thread of the group.
520     const bool popped = g->_rq.steal(&next_tid);
522     if (!popped && !g->steal_task(&next_tid)) {
523         // Jump to main task if there's no task to run.
524         next_tid = g->_main_tid;
525     }

527     TaskMeta* const cur_meta = g->_cur_meta;
528     TaskMeta* next_meta = address_meta(next_tid);
529     if (next_meta->stack == NULL) {
530         if (next_meta->stack_type() == cur_meta->stack_type()) {
531             // also works with pthread_task scheduling to pthread_task, the
532             // transfered stack is just _main_stack.
533             next_meta->set_stack(cur_meta->release_stack());
534         } else {
535             ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
536             if (stk) {
537                 next_meta->set_stack(stk);
538             } else {
539                 // stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
540                 // In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
541                 // This basically means that if we can't allocate stack, run
542                 // the task in pthread directly.
543                 next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
544                 next_meta->set_stack(g->_main_stack);
545             }
546         }
547     }
548     sched_to(pg, next_meta);
549 }

以上尝试跑下一个bthread。如TaskGroup::usleepTaskGroup::yield以及TaskGroup::start_foreground都会设置set_remained并切换:

399         RemainedFn fn = NULL;
400         if (g->current_task()->about_to_quit) {
401             fn = ready_to_run_in_worker_ignoresignal;
402         } else {
403             fn = ready_to_run_in_worker;
404         }
405         ReadyToRunArgs args = {
406             g->current_tid(),
407             (bool)(using_attr.flags & BTHREAD_NOSIGNAL)
408         };
409         g->set_remained(fn, &args);
410         TaskGroup::sched_to(pg, m->tid); //TaskGroup::start_foreground

795     TaskGroup* g = *pg;
796     // We have to schedule timer after we switched to next bthread otherwise
797     // the timer may wake up(jump to) current still-running context.
798     SleepArgs e = { timeout_us, g->current_tid(), g->current_task(), g };
799     g->set_remained(_add_sleep_event, &e);
800     sched(pg);
801     g = *pg;
802     e.meta->current_sleep = 0;//TaskGroup::usleep

894 void TaskGroup::yield(TaskGroup** pg) {
895     TaskGroup* g = *pg;
896     ReadyToRunArgs args = { g->current_tid(), false };
897     g->set_remained(ready_to_run_in_worker, &args);
898     sched(pg);//TaskGroup::yield
899 }

贴上来自brpc_internal ppt中的taskcontrol的介绍:

  • Singleton to manage all TaskGroups
    -start, stop, add

  • What if we create a bthread in non-worker (normal) pthreads
    -non-worker doesn’t have TaskGroup
    -TaskControl choose a TaskGroup
    -push into its RemoteTaskQueue
     A queue for tasks from non-workers
     use mutex to protect from concurrent push/pop/steal

  • signal_task -- only notify some ParkingLots (thus some TaskGroups)

  • steal_task -- steal bthreads from all TaskGroups to prevent starvation

  • Overall scheduling order
    -local run queue
    -local remote queue
    -other workers’ run queue
    -other workers’remote queue

butex_wait超时无法唤醒

相关文章

  • brpc之bthread源码浅淅(二)

    这里准备分析下bthread的创建流程,以及bthread之间的切换,从multi_threaded_echo_c...

  • brpc之bthread源码浅淅(一)

    这几篇主要是分析bthread是什么,怎么用,和实现原理,这里先使用官方链接中的参考来说明。协程和bthread区...

  • brpc之rpc流程分析(上)

    之前关于brpc的几篇分析:brpc之mutex源码分析brpc之ResourcePool源码分析brpc之bth...

  • brpc之bthread_id源码分析

    在之前分析rpc时,发现在代码中使用到此结构,当时看到时候感觉挺复杂的,从名字上看还以为仅仅是个uint64_t的...

  • brpc之mutex源码分析

    这篇分析的不完整,没有彻底弄明白。在此brpc之前,基本所有使用过或者分析过的mutex只是纯粹的封装下接口,使用...

  • brpc之ResourcePool源码分析

    该类在多个模块中使用到,是一种资源预分配的获取及回收,value是uint64_t一般作为index。从memor...

  • brpc之负载均衡源码分析

    在分析bthread相关实现时,从使用中发现有LoadBalancerWithNaming类的使用,这块也比较独立...

  • brpc之消息处理流程

    中间大约有段时间没有继续分析brpc源码,因为有些其他事情,这里分析下当client发送消息后,server收到请...

  • Netty 源码分析系列

    Netty 源码分析系列 Netty入门简介 深入浅出NIO之Channel、Buffer 深入浅出NIO之Sel...

  • java源码分析之LinkedHashMap

    相关文章java源码分析之HashMap(一)java源码分析之HashMap(二)java源码分析之HashMa...

网友评论

      本文标题:brpc之bthread源码浅淅(二)

      本文链接:https://www.haomeiwen.com/subject/vexxpctx.html