美文网首页
TaskFlow DAG部分源码阅读

TaskFlow DAG部分源码阅读

作者: JimmyPan | 来源:发表于2020-08-28 10:39 被阅读0次

    Taskflow有很多功能,例如动态时构建子图,条件节点,并行运算,静态图。这次学习着重于Taskflow对于静态图DAG的调度

    TaskFlow 核心结构

    DAG图:


      Class Node{
        public:...
        private:
        PassiveVector<Node*> _successors; //保存所有的后继节点
        PassiveVector<Node*> _dependents;// 保存所有的前趋节点
        std::atomic<size_t> _join_counter {0};  // 维护在运行中,当前结点还剩几个依赖没有完成
    }
    
    class Graph {
      private:
        std::vector<Node*> _nodes; //保存所有的节点
    };
    
    class Topology { 
      private:
        Taskflow& _taskflow;
        std::promise<void> _promise; // 该拓扑结构的promise,返回的future从这里获得
        PassiveVector<Node*> _sources; // 存储一个图中所有入度为0的节点
        std::function<bool()> _pred; //结束条件 返回bool  ,返回false时,重新被调用
        std::function<void()> _call; // 结束后调用
        std::atomic<size_t> _join_counter {0}; //维护在运行中,当前拓扑中还剩入度为0的节点没有完成,当该值到0时,则终止任务
    };
    
    class Taskflow : public FlowBuilder {
      private:
        std::string _name;
        Graph _graph; //DAG图
        std::list<Topology> _topologies; //在run中创建,例如当run一个taskflow多次,一个taskflow可能会有多个Topology,全部的Topology都运行完,才会结束
    };
    

    调度器


    class Executor{
        const size_t _VICTIM_BEG;//vicitm(被偷取的线程) 
        const size_t _VICTIM_END;// 这两个标志,用来表示被偷取线程的范围
        const size_t _MAX_STEALS; //最多尝试偷取多次后调用 yield
        const size_t _MAX_YIELDS;//最多让出多少次线程后 返回
       
        std::condition_variable _topology_cv; //该cv在析构函数中wait,任务完成时进行通知
        std::mutex _topology_mutex; //和cv一起用
        std::mutex _wsq_mutex; // 偷取队列,push的时候需要加锁保护?
    
        size_t _num_topologies {0}; //计数器,run的时候+1,可能有多个任务,到0时才会析构executor
        
        std::vector<Worker> _workers; // 维护worker生命周期,工作线程的local_thread维护其中worker的指针
        std::vector<std::thread> _threads; //创建线程
    
        Notifier _notifier[NUM_DOMAINS]; // 用于做cv通知
    
        TaskQueue<Node*> _wsq[NUM_DOMAINS]; //共享队列
    
        size_t _id_offset[NUM_DOMAINS] = {0};
    
        std::atomic<size_t> _num_actives[NUM_DOMAINS]; //偷窃态的线程数 (正在尝试偷取其它任务的线程)
        std::atomic<size_t> _num_thieves[NUM_DOMAINS]; //激活态的线程数(正在执行节点中任务的线程)
        std::atomic<bool>   _done {0}; // 标志任务是否已经完成
        std::unordered_set<std::shared_ptr<ObserverInterface>> _observers; //任务开始前后有回调函数,记录任务执行进度
    
    }
    

    工作线程的本地thread_local


      struct Worker {
        size_t id;
        size_t vtm; // 受害者,表示偷的是谁
        Domain domain;
        Executor* executor;
        Notifier::Waiter* waiter; //用作cv通知和阻塞
        std::mt19937 rdgen { std::random_device{}() }; //偷取时利用这随机选择
        TaskQueue<Node*> wsq[NUM_DOMAINS]; //本地的偷窃队列
      };
    

    拓扑图调度流程


    以一个例子做演示一个DAG的调度流程,如下图,taskflow维护了两个计数器
    1.每个任务节点维护一个join_counter,表示该节点还有几个依赖未完成(入度为0)。
    2.拓扑结构维护一个join_counter,表示该拓扑图还是有几个入度为0的节点

    1. 调度器创建拓扑结构,并将入度为0的节点(A,D)加入到队列中
    2. 线程中某个线程抢到A,执行A,并将它的两个后继节点(B,C)的join_counter减1,查看A后继节点,判断是否有join_counter为0的节点,并加入到队列中(B的join_counter为0,加入队列), C的join_counter不为0,则不管C
    3. 某个线程在A运行的同时抢到D,并将它的两个后继节点(C)的join_counter减1,此时C的Join_counter为0,则将C加入队列。(哪个工作线程最后一个把C的join_counter减到0,就由谁把C加入队列)
    4. B,C被在队列中被线程池中的线程抢到并执行
    5. 执行B,C的线程中,哪个线程最后一个运行完,谁把E的join_counter减到0,谁会把E加入到任务队列
    6. 某个线程抢到E,并把E运行完,此时整个拓扑图没有入度为0的几点存在了,结束运行


      image.png

    线程池中任务的分配和偷取

    核心代码分析

    • 调度器添加graph到任务队列中:

    调度器线程通过将初始入度为0的节点放入shared_queue,并唤醒任务来开启整个任务流程
    topology 中保存所有入度为0的节点,并保存着整个拓扑结构中当前入度为0的节点数,来控制流程的结束。还保存着拓扑图promise,返回给用户future

    template <typename P, typename C>
    std::future<void> Executor::run_until(Taskflow& taskflow) {
    
        std::future<void> future;
        {
            lock(mutex);
            taskflow._topologies.emplace_back(taskflow);//创建一个topology
            tpg = &(f._topologies.back()); //获得刚创建的topology
            future = tpg->_promise.get_future(); //获得topology中promise的future当返回值
        }
        //设置tgp,主要找所有入度为0的点放入topology的soucre,并且为图的所有节点设置join_counter
        for(auto node : taskflow->graph->nodes ){
            if(node->dependts() == 0 ){ //如果入度为0
                tgp->source.push_back()/node;
            }
            node->set_join_counter();//设置每个节点的join_counter,作用如前文所述
        }
        //将上面找出的所有入度为0的节点,加入到队列中
         shared_queue.push_back(tgp->sourece);
    
    
        notfiy_n(tgp->source.size()) //唤醒对应个数的工作线程
        return future;
    }
    
    • 工作线程运行循环:

    工作线程负责任务的执行和获取。taskflow有一个设计目标就是,要保持一个当一个线程激活时的时候,有个一个线程正在偷取,因为激活的线程可能会添加新的任务到队列中,需要一个线程去偷取来减少任务等待。

    为了达到这个目标,taskflow设计了两个状态,进入exploit_task的线程,是激活态(active),负责进行任务的运行和调度新任务。进入wait_for_task的线程,是偷窃态(thief),负责偷取任务。
    之后设计了用了一种自适应策略:当最后一个偷窃态的线程编程激活状态时,会唤醒一个新的线程来取代自己偷窃线程的角色,

    worker_thread([](worker* w){
    while(true){
      expliot_task(w,t); //线程进入激活态,运行任务并将后继加入队列
      if(wait_for_task(w,t)) == false)//线程进入偷窃态,不停的去偷取任任务
           then break;
    }
    }
    
    

    1.exploit_task.进入激活态,运行当前任务并且添加当前任务的后继:

    void Executor::_exploit_task(Worker& w, Node*& t) {
        if(t) {  //如果任务存在
            if(_num_actives.fetch_add(1) == 0 && _num_thieves == 0) { //active + 1 表示激活态的线程 + 1 ,如果有一个激活态线程,并且目前没有偷窃态线程,要唤醒一个线程去偷窃
                _notifier.notify_one; //激活一个睡眠线程,去偷取可能会到来的任务
            }
            while(t) {
                _invoke(w, t);// 运行任务,调用t中的回调函数,并且如果后继的话在本地队列继续添加新任务
                t = local_taskqueue.pop(); // 该线程自己从自己的local queue中拿,其它线程也会去偷取。自己拿完或者被偷完时,结束循环
            }
            --_num_actives; // 离开激活态,active-1
        }
    }
    

    2.wait_for_task.进入偷取态,尝试去偷取任务

    bool Executor::_wait_for_task(Worker& worker, Node*& t) {
        wait_for_task:
    
        ++_num_thieves[d];
    
        explore_task:
    
        _explore_task(worker, t);
    
        if(t) {
            if(_num_thieves.fetch_sub(1) == 1) {
                _notifier.notify(false);
            }
            return true;
        }
    
        _notifier[d].prepare_wait(worker.waiter);
        
        if(!shared_queue.empty()) {
            _notifier[d].cancel_wait(worker.waiter);
            t = shared_queue.steal();  
            if(t) {
                if(_num_thieves.fetch_sub(1) == 1) {
                    _notifier.notify(false);
                }
                return true;
            }
            else {
                goto explore_task;
            }
        }
        if(_done) {
            _notifier.cancel_wait(worker.waiter);
            for(int i=0; i<NUM_DOMAINS; ++i) {
                _notifier.notify(true);
            }
            --_num_thieves;
            return false;
        }
    
        if(_num_thieves[d].fetch_sub(1) == 1) {
            if(_num_actives) {
                _notifier.cancel_wait(worker.waiter);
                goto wait_for_task;
            }
            // check all domain queue again
            for(auto& w : _workers) {
                if(!w.local_queue.empty()) {
                    worker.vtm = w.id;
                    _notifier[d].cancel_wait(worker.waiter);
                    goto wait_for_task;
                }
            }
        }
        
        _notifier[d].commit_wait(worker.waiter);
    
        return true;
    }
    
    
    

    当工作线程完成了本地队列中全部的任务(exploit_task),接下来就会去运行wait_for_task。
    首先该工作线程会进入explore_task去尝试偷取。当这个工作线程偷到了一个任务,并且当前线程是最后一个正在尝试偷取的线程,它会去唤醒一个新的工作线程去取代它自己去偷取新任务,并让自己返回激活状态(运行任务),否则的话它会去成为一个睡眠线程的候选者(尚未真正睡眠,如果没有任务的话,并且不是最后一个偷窃态的线程,它会真正睡眠)

    然而必须避免,没有充分利用cpu的并行(underutilized paralleism)的情况,因为任务可能会在我们把工作线程睡眠时到来,所以用2PC(两阶段提交)调整活动工作线程的数量以调整可用任务的并行性。尽量减少无用的睡眠动作。

    运行态后工作线程已经用将本地队列中的任务运行干净,并且投入了很多精力在偷取线程上。任务队列大概率已经是空的了,我们用prepare_wait将它变为一个候选的睡眠线程,从现在开始,所有来自其它线程通知对这个候选睡眠线程都是可见的了。

    首先我们再次检查共享队列,因为其它线程可能会在prepare_wait后再次插入新任务,如果共享队列不为空,则此刻就不能睡觉了,立即偷取并返回。
    接着,如果该线程是最后一个偷窃态线程,它只有在两种情况下能逃脱睡眠的命运,1.当前还有其它运行态的线程(说明可能会有新任务被加入到本地队列,该线程还不能休息,要继续努力偷取) 2.检查其它线程的队列不为空(不为空说明还有任务,还不能睡觉,必须要去偷了)
    如果以上检查都没能阻拦它去阻塞,那它就真的要去阻塞了。

    2.explore_task

    
    inline void Executor::_explore_task(Worker& w, Node*& t) {
      
      //assert(_workers[w].wsq.empty());
      assert(!t);
    
      const auto d = w.domain;
    
      size_t num_steals = 0;
      size_t num_yields = 0;
    
      std::uniform_int_distribution<size_t> rdvtm(_VICTIM_BEG, _VICTIM_END); //随机产生受害者(被偷取的线程)
      do {
        t = (w.id == w.vtm) ? _wsq[d].steal() : _workers[w.vtm].wsq[d].steal();  // 如果被偷取者的id等于自己的id则从,shared_queue中偷取,否则从对应线程的local_queue中偷取
        if(t) {
          break;
        }    
        if(num_steals++ > _MAX_STEALS) { //如果偷了8次都没偷到,就让出自己的时间片,让其它任务先执行
          std::this_thread::yield();
          if(num_yields++ > _MAX_YIELDS) { // 如果让出了100次,还没有偷到任务,则停止循环
            break;
          }
        }    
        w.vtm = rdvtm(w.rdgen);
      } while(!_done);
    
    }
    

    在explore_task中,偷窃态的工作线程随机从所有的工作线程中选择一个进行偷取,如果被选到的使它自己,则从shared_queue中进行偷取。
    在多个线程争抢中,偷取可能失败,所以用两个参数来控制,一直偷取还是让出cpu资源。
    如果偷到了max_steal次还是没有偷到,则让出自己的cpu资源,等待cpu下次调度
    如果已经让了max_yield次,则不再进行偷取,返回函数

    Work steal Queue

    相关文章

      网友评论

          本文标题:TaskFlow DAG部分源码阅读

          本文链接:https://www.haomeiwen.com/subject/pvqudktx.html