job就是对work的封装(应该对应于咱们项目中的task,只不过facebook做了一个封装)。
job可以触发或者分派在下面三种情况:
- 立即,在调用线程中立即派发
- NBIO,进行IO相关操作,以及一些定时器相关的操作
- 普通线程池,libphenom可以有多个线程池
Job definition
struct ph_job_def {
// The callback to run when the job is dispatched.
// Will be copied to job->callback during ph_job_alloc()
ph_job_func_t callback;
// The memtype to use to allocate the job
ph_memtype_t memtype;
// Function to be called prior to freeing the job
void (*dtor)(ph_job_t *job);
};
1 ph_nbio_init()初始化NBIO。
calloc()分配num_schedulers个emitter, 定义在struct ph_nbio_emitter, 每个emitter会跟1个事件循环,和1个thread绑定。
- 初始化每个emitter及其timer wheel(ph_timerwheel_init()、ph_nbio_emitter_init())
timer_fd描述符(timerfd_create())
ph_job_set_nbio()将timer_job扔进nbio队列,这个job被调度到时执行tick_epoll - 初始化counter, 用来进行统计, 可以用ph_nbio_stat进行观察
2 ph_sched_run调度NBIO
在ph_sched_run()根据num_thread_spawn确定创建线程.每个线程对应一个emitters[i].
- emitters[0]是ph_sched_run的调用者,其他的调用ph_thread_spawn()生成新的线程,对应的线程函数是sched_loop()。
- sched_loop 调用 ph_nbio_emitter_run
- ph_nbio_emitter_run进入Reactore模式,epoll_wait得到fd, ph_job_t *job = event[i].data.ptr;
- ph_nbio_emitter_dispatch_immediate回调job之前设置的callback
- 定时器任务通过timefd来触发。
每次执行tick_epoll()判断是否过了一个tick。如果过了一个tick,则执行ph_nbio_emitter_timer_tick(),同时会把自己job调用ph_job_set_nbio加入nbio中。
ph_nbio_emitter_timer_tick()调用ph_timerwheel_tick处理每一个timer。
ph_result_t ph_sched_run(void)
{
ph_thread_t *me = ph_thread_self();
uint32_t i;
void *res;
emitters[0].thread = me;
me->is_emitter = &emitters[0];
for (i = 1; i < num_schedulers; i++) {
emitters[i].thread = ph_thread_spawn(sched_loop, &emitters[i]); //一直不太懂emitter。和time wheel有关,
emitters[i].thread->is_emitter = &emitters[i];
}
_ph_job_pool_start_threads(); //各个线程池开始工作
process_deferred(me, NULL);
gc_interval = ph_config_query_int("$.nbio.epoch_interval", 5000);
if (gc_interval > 0) {
#ifdef USE_GIMLI
if (getenv("GIMLI_HB_FD")) {
hb = gimli_heartbeat_attach();
if (hb) {
gimli_heartbeat_set(hb, GIMLI_HB_STARTING);
}
}
#endif
ph_job_init(&gc_job);
gc_job.callback = epoch_gc;
ph_job_set_timer_in_ms(&gc_job, gc_interval);
}
sched_loop(me->is_emitter);
for (i = 1; i < num_schedulers; i++) {
ph_thread_join(emitters[i].thread, &res);
}
free(emitters);
emitters = NULL;
ph_job_pool_shutdown();
ph_thread_epoch_barrier();
return PH_OK;
}
网友评论