美文网首页
百度 Apollo 8.0 Cyber 源代码分析(五)

百度 Apollo 8.0 Cyber 源代码分析(五)

作者: RonZheng2010 | 来源:发表于2024-11-27 08:00 被阅读0次

相关链接

百度 Apollo 8.0 Cyber 源代码分析(一)
百度 Apollo 8.0 Cyber 源代码分析(二)
百度 Apollo 8.0 Cyber 源代码分析(三)
百度 Apollo 8.0 Cyber 源代码分析(四)
百度 Apollo 8.0 Cyber 源代码分析(五)

9 cyber协程调度

9.1 CRoutine

CRoutine定义协程。

  • 成员name_ 是协程名字,可以转成一个uint64数字,也就是成员id_

  • 成员func_是协程函数

  • 成员state_是协程状态,IO_WAIT/DATA_WAIT是等待状态,READY是准备好执行,FINISHED是已经执行完成。

  • 成员priority_是执行优先级

  • static thread_local成员current_routine_ scheduler的当前执行的CRoutine实例。而成员main_stack_ 是没有协程执行时的执行位置。

  • 成员context_ 是RoutineContext实例。它保存协程的执行栈,成员sp是sp指针,成员stack是栈。在CRoutine的构造函数中,会从CRoutineContext的CObjectPool池中,得到这个实例。

  • HangUp()置DATA_WAIT,Wake() 将状态置READY,Sleep()设置等待时间wake_time_,状态置SLEEP。

  • UpdateState()中,如果wait_time_超时,则置状态为READY;如果状态为IO_WAIT,则置READY。

  • Stop()中,置状态为FINISHED。

  • Resume()中,如果状态为FINISHED,则直接返回;如果状态为READY,调用SwapContext()将当前的调用栈换成协程的调用栈,开始执行。在如下条件满足前,SwapContext()不会返回。

    • 协程函数执行完成
    • 协程调用Yield()。Yield()的作用是将协程的调用栈换出,所以协程不再执行。

9.2 Processor/ProcessorContext

Processor为协程提供处理器资源。ProcessorContext为它提供执行上下文。

ProcessorContext定义processor执行context接口。

  • 虚拟成员函数NextRoutine()得到下一个READY状态的CRoutine实例。
  • Wait()等待协程READY。

关于Processor,

  • 成员thread_ 是执行线程。
  • 成员context_是ProcessorContext实例。成员函数BindContext()将指定的ProcessorContext实例绑定到context_,并且启动线程thread_,线程函数是Run()
  • Run()在while()循环中,调用ProcessorContext::NextRoutine(),得到CRoutine实例;如果有READY的CRoutine实例的话,调用CRoutineL::Resume()执行它;如果没哟,则调用功能ProcessorContext::Wait()等待。

9.3 ClassicContext

ClassicContext实现经典模式的context接口。

  • 成员current_group是当前processor。由于ClassiscContext实例与Processor是一一绑定的,所以一个processor实例,就只能服务与一个组的CRoutine。

  • static成员cr_group_是一个从group名字到array的映射。这个array的大小是MAX_PRIO=20。这是协程优先级最大值。也就是说array中每个元素对应一个优先级。array的元素是一个CRoutine实例的vector。也就是说,CRoutien实例是按照组名、优先级放在对应的vector中。

  • 成员multi_pri_rq_ 是cr_group_ 中,当前组对应的array。这是在成员函数InitGroup()中设置的。

  • static成员notify_grp_是一个从组名到int值的映射。这个int值是当前READY状态的协程个数。调用成员函数Notify(),这个值加1,而调用成员函数Wait(),这个值减1。

  • 成员函数NextRoutine(), 按照优先级从高到低的顺序,遍历multi_pri_rq_的array,看看其中的CRoutine是否状态为READY,如果是,就获得所有权,并返回它。

  • 成员Wait(),等待notify_grp_[]中的协程READY,等待成功则返回。成员Notify()则用于通知READY协程增加了一个。

9.4 Scheduler

Scheduler定义调度器接口。有两种调度器SchedulerClassisc和SchedulerChoreography。这里以SchedulerClassic为例说明。

SchedulerFactory::Instance() 根据配置文件的配置,创建唯一的Scheduler实例。

Scheduler定义调度器接口。

  • 成员processors_ 是使用的处理器Processor实例,而成员pctxs_是一一对应的上下文ClassicContext实例。
  • 成员id_cr_ 是从routine id到CRoutine实例的映射。routine id是基于协程名字计算得到的hash值。
  • 成员函数ProcessLevelResourceControl()根据设置当前线程的cpuset。
  • 成员函数CreateTask()的步骤如下。
    • 创建CRoutine实例
    • 调用虚拟成员函数DispatchTask(),将它推送到调度队列。
    • 如果这个任务指定了DataVisitor参数,则定义一个函数作为它的回调函数,以便在消息到达时,得到通知。这个函数的实现是调用虚拟成员函数NotifyProcessor()。

9.5 SchedulerClassic

SchedulerClassic实现经典的调度器接口。

  • 成员classic_conf_是配置数据。在构造函数中,从配置文件中读入这个配置;然后调用CreateProcessor()。在CreateProcessor()中,对classic_conf配置的每个组,创建一对Processor/ClassicContext实例并绑定,这样协程就开始调度了。

  • 虚拟成员函数DispatchTask()的实现:

    • 在cr_confs_中,根据任务名查找这个任务的配置,如果有的话,使用它配置priority和组名;如果没有,则priority使用默认值0,组名置为classsic_conf_中第一个组classic_conf_.groups(0)。
    • 将这个CRoutine实例放入ClassicContext::cr_group_中指定组/priority的vector。
  • 虚拟成员函数NotifyProcessor()的实现:

    • 在id_cr_按照routine id查找CRoutine实例。找到的话,调用CRoutine::SetUpdateFlag(),将状态从DATA_WAIT改成READY,准备执行;
    • 调用ClassicContext::Notify(),如果processor还处于等待状态,则唤醒它。

9.6 ClassicConf

ClassicConf是SchedulerClassic的配置定义。
在SchedulerClassic的构造函数中,打开配置文件,如/cyber/conf/pv_nn.conf,得到SchedulerConf实例。

其中的Scheduler_conf是给SchedulerClassic准备的。

  • 成员groups是SchedGroup实例,包括协程分组。

关于SchedGroup,

  • 成员name是组名,
  • 成员processor_num是这个组占用的处理器个数,
  • cpuset是处理器编号,
  • 某些任务需要指定更详细的选项,如priority。成员tasks[]可以配置这样的任务。

9.7 CreateRoutineFactory()

CreateRoutineFactory()有多个版本。它们工作都是创建一个RoutineFactory实例,用于创建一个协程。

当消息到达时,这个协程被唤醒。调用DataVisitor::TryFetch()读取消息,如果读到了,就调用用户指定的回调函数处理,并且置协程为READY状态,这样可以继续被调度;如果没有读到,则放弃时间片,协程进入DATA_WAIT状态,等待再次被唤醒。

template <typename M0, typename M1, typename F>
RoutineFactory CreateRoutineFactory(
    F&& f, const std::shared_ptr<data::DataVisitor<M0, M1>>& dv) {
  RoutineFactory factory;
  factory.SetDataVisitor(dv);
  factory.create_routine = [=]() {
    return [=]() {
      std::shared_ptr<M0> msg0;
      std::shared_ptr<M1> msg1;
      for (;;) {
        CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);
        if (dv->TryFetch(msg0, msg1)) {
          f(msg0, msg1);
          CRoutine::Yield(RoutineState::READY);
        } else {
          CRoutine::Yield();
        }   
      }   
    };  
  };  
  return factory;
}

如下是涉及的类。

这里说明CreateRoutineFactory()创建协程的运行逻辑。

  • 协程在一个for()循环中运行,自身不会中止执行。需要外部调用者执行CRoutine::Stop()将它置为FINISHED状态,并通过ClassicContext::RemoveTask(),从调度队列移除。

  • 一开始它将自身置为DATA_WAIT状态。这是为下一轮”等待 - 执行”作准备。

  • 调用DataVisitor::TryFetch()得到消息。
    +如果得到,调用使用者的处理函数;调用CRoutine::Yield(READY),这是假设还有消息没处理完,所以回到调度队列后,下次可以继续被调度,不用等着被唤醒了。

    • 如果没得到,则调用Yield(),将自身的调用栈交换出去。这次回到调度队列,协程处于DATA_WAIT状态,需要被唤醒。
  • 当消息到达时,会调用DataVisitor的成员notifier_,这个回调函数会调用SchedulerClassic::NotifyProcessor()。其中调用CRoutine::SetUpdateFlag(),将CRoutine实例状态从DATA_WAIT改回READY,并调用ClassicContext::Notify()唤醒相应的条件变量。

bool SchedulerClassic::NotifyProcessor(uint64_t crid) {
......
  {
    ReadLockGuard<AtomicRWLock> lk(id_cr_lock_);
    if (id_cr_.find(crid) != id_cr_.end()) {
      auto cr = id_cr_[crid];
      if (cr->state() == RoutineState::DATA_WAIT ||
          cr->state() == RoutineState::IO_WAIT) {
        cr->SetUpdateFlag();
      }

      ClassicContext::Notify(cr->group_name());
      return true;
    }
  }
  return false;
}
  • Processor的处理线程可能正调用ClassicContext::Wait(),等在这个条件变量上,这时就被唤醒了。
  • 之后它调用ClassicContext::NextRoutine()得到下一个CRoutine实例 。因为前面的CRoutine实例状态已经是READY了,所以现在它可以被选中。
  • 然后调用CRoutine::Resume(),将它的调用栈交换进来,所以它就又开始执行了。
void Processor::Run() {
......
  while (cyber_likely(running_.load())) {
    if (cyber_likely(context_ != nullptr)) {
      auto croutine = context_->NextRoutine();
      if (croutine) {
        snap_shot_->execute_start_time.store(cyber::Time::Now().ToNanosecond());
        snap_shot_->routine_name = croutine->name();
        croutine->Resume();
        croutine->Release();
      } else {
        snap_shot_->execute_start_time.store(0);
        context_->Wait();
      }
    } else {
      std::unique_lock<std::mutex> lk(mtx_ctx_);
      cv_ctx_.wait_for(lk, std::chrono::milliseconds(10));
    }
  }
}

10 cyber::Async()

关于TaskManager,

  • 成员task_queue_保存低优先级的任务,成员task_queue_hight_prio_保存高优先级的任务。
  • 成员tasks_保存了所有的任务。
  • 在TaskManager的构造函数中,创建指定数量的协程,这个数量在进程的cpu调度配置文件中指定的CPU核数相同。在协程的处理函数中,会等待task_queue_high_prio_和task_queue_中的任务,有就执行它。task_queue_hight_prio_中的任务会优先执行。

cyber::Async()给使用者一个接口,方便地创建一个函数在协程中执行。它的实现就是向TaskManager的低优先级队列推送一个任务。

相关文章

网友评论

      本文标题:百度 Apollo 8.0 Cyber 源代码分析(五)

      本文链接:https://www.haomeiwen.com/subject/gvpzjjtx.html