美文网首页
源码阅读笔记--job

源码阅读笔记--job

作者: sxr008 | 来源:发表于2018-09-17 18:09 被阅读0次

    job就是对work的封装(应该对应于咱们项目中的task,只不过facebook做了一个封装)。
    job可以触发或者分派在下面三种情况:

    • 立即,在调用线程中立即派发
    • NBIO,进行IO相关操作,以及一些定时器相关的操作
    • 普通线程池,libphenom可以有多个线程池

    Job definition

    struct ph_job_def {
      // The callback to run when the job is dispatched.
      // Will be copied to job->callback during ph_job_alloc()
      ph_job_func_t callback;
      // The memtype to use to allocate the job
      ph_memtype_t memtype;
      // Function to be called prior to freeing the job
      void (*dtor)(ph_job_t *job);
    };
    

    1 ph_nbio_init()初始化NBIO
    calloc()分配num_schedulers个emitter, 定义在struct ph_nbio_emitter, 每个emitter会跟1个事件循环,和1个thread绑定

    • 初始化每个emitter及其timer wheel(ph_timerwheel_init()ph_nbio_emitter_init()
      timer_fd描述符(timerfd_create())
      ph_job_set_nbio()将timer_job扔进nbio队列,这个job被调度到时执行tick_epoll
    • 初始化counter, 用来进行统计, 可以用ph_nbio_stat进行观察

    2 ph_sched_run调度NBIO
    ph_sched_run()根据num_thread_spawn确定创建线程.每个线程对应一个emitters[i].

    • emitters[0]是ph_sched_run的调用者,其他的调用ph_thread_spawn()生成新的线程,对应的线程函数是sched_loop()
    • sched_loop 调用 ph_nbio_emitter_run
    • ph_nbio_emitter_run进入Reactore模式,epoll_wait得到fd, ph_job_t *job = event[i].data.ptr;
    • ph_nbio_emitter_dispatch_immediate回调job之前设置的callback
    • 定时器任务通过timefd来触发。
      每次执行tick_epoll()判断是否过了一个tick。如果过了一个tick,则执行ph_nbio_emitter_timer_tick(),同时会把自己job调用ph_job_set_nbio加入nbio中。
      ph_nbio_emitter_timer_tick()调用ph_timerwheel_tick处理每一个timer。
    ph_result_t ph_sched_run(void)
    {
      ph_thread_t *me = ph_thread_self();
      uint32_t i;
      void *res;
    
      emitters[0].thread = me;
      me->is_emitter = &emitters[0];
    
      for (i = 1; i < num_schedulers; i++) {
        emitters[i].thread = ph_thread_spawn(sched_loop, &emitters[i]);  //一直不太懂emitter。和time wheel有关,
        emitters[i].thread->is_emitter = &emitters[i];
      }
    
      _ph_job_pool_start_threads(); //各个线程池开始工作
      process_deferred(me, NULL);
    
      gc_interval = ph_config_query_int("$.nbio.epoch_interval", 5000);
      if (gc_interval > 0) {
    #ifdef USE_GIMLI
        if (getenv("GIMLI_HB_FD")) {
          hb = gimli_heartbeat_attach();
          if (hb) {
            gimli_heartbeat_set(hb, GIMLI_HB_STARTING);
          }
        }
    #endif
    
        ph_job_init(&gc_job);
        gc_job.callback = epoch_gc;
        ph_job_set_timer_in_ms(&gc_job, gc_interval);
      }
    
      sched_loop(me->is_emitter);
    
      for (i = 1; i < num_schedulers; i++) {
        ph_thread_join(emitters[i].thread, &res);
      }
      free(emitters);
      emitters = NULL;
    
      ph_job_pool_shutdown();
      ph_thread_epoch_barrier();
      return PH_OK;
    }
    

    相关文章

      网友评论

          本文标题:源码阅读笔记--job

          本文链接:https://www.haomeiwen.com/subject/ccgsgftx.html