美文网首页
找分布式工作复习学习系列---市面分布式框架解析之Ray(四)

找分布式工作复习学习系列---市面分布式框架解析之Ray(四)

作者: 加油11dd23 | 来源:发表于2021-09-28 22:50 被阅读0次

    一、背景

    Ray 的设计目标是比 Spark 、MapReduce 数据流更灵活,比 Orleans 等 Actor 模型多了 fault tolerance 和 exactly-once,比 Mesos 的两层调度更高效,比 Tensorflow 、MXNet 更易用。

    虽然 Ray 与 TensorFlow 一样,也是面向机器学习场景的框架,但是我们一般称呼它是分布式的计算框架。因为 Ray 核心部分只是提供了分布式计算的能力。虽然如此,Ray 提供的分布式计算能力非常强大,且精巧。如上述代码所示,我们将 f(x) 加上了 @ray.remote 的注解,随后利用 f.remote 进行调用。虽然看上去它与原生的 Python 函数并无二致,但是,它可以在除本机外的其他 Ray 集群中的节点中执行。所以 Ray 是在几乎不提高用户代码复杂性的情况下,实现了分布式计算的能力。这样的函数,在 Ray 的设计中被称作 Task。

    除了轻量级的 API 之外,Ray 的主要特性包括:高吞吐但低延迟的调度能力;支持任务的动态构建。这与实现息息相关,在稍后的章节再进行介绍。总而言之,对用户而言,Ray 可以被当做插上了分布式计算翅膀的 Python 加强版(这么说也不太合适,因为 Ray 目前也有 Java Worker 的支持,不过为了方便理解,可以先姑且这么认为)。

    但是,只有如此底层的能力,没有上层成熟的生态,是难以与成熟的框架相抗衡的。因此,Ray 面向强化学习场景,基于分布式计算的能力支持,实现了上层的算法库 RLLib

    image.png

    除此之外,Ray 还有一个大规模超参数搜索的支持:Tune。Ray 提供的分布式计算能力,是天生非常适合超参数搜索这样的业务场景的。在机器学习中,超参数是在开始学习过程之前需要用户(算法工程师)给定取值的参数,而不是在训练阶段学习到的参数。比如在深度学习中,Batch Size,Learning Rate(如果有的话),Dropout(如果有的话)等等,就是超参数。超参数的取值在一定程度上对模型有着一定的影响。Tune 就利用 Ray 的能力,支持并行的超参数搜索。

    二、基础架构设计

    image.png

    Ray 的节点需要运行两个进程,一个是 RayLet 进程,一个是 Plasma Store(对应图中的 Object Store)进程。其中 RayLet 进程中维护着一个 Node Manager,和一个 Object Manager。Ray 提供了 Python 的 API,而 RayLet 是用 C++ 实现的。其中的 Node Manager 充当了论文中 Local Scheduler 的角色,主要负责管理 Node 下的 Worker,调度在 Node 上的任务,管理任务间的依赖顺序等。而其中的 Object Manager,主要提供了从其他的 Object Manager Pull/Push Object 的能力。

    Plasma Store 进程,是一个共享内存的对象存储进程。原本 Plasma 是 Ray 下的,而目前已经是 Apache Arrow 的一部分了。之前介绍 Ray 在执行带有 remote 注解的函数时并不会立刻运行,而是会将其作为任务分发,而返回也会被存入 Object Store 中。这里的 Object Store 就是 Plasma[4]。

    而论文中的 Control State,在实现中被叫做 GCS,是基于 Redis 的存储。而 GCS 是运行在一类特殊的节点上的。这类特殊的节点被称作 Head Node。它不仅会运行 GCS,还会运行对其他节点的 Monitor 进程等。

    Ray 提交任务的方式与 Spark 非常类似,需要利用 Driver 来提交任务,而任务会在 Worker 上进行执行。Ray 支持的任务分为两类,分别是任务(Task)和 Actor 方法(ActorMethod)。其中任务就是之前的例子中的被打上了 remote 注解的函数。而 Actor 方法是被打上了 remote 注解的类(或叫做 Actor)的成员方法和构造方法。两者的区别在于任务都是无状态的,而 Actor 会保有自己的状态,因此所有的 Actor 方法需要在 Actor 所在的节点才能执行。

    image.png
    经过几个版本的迭代,有些内容做了一些优化,主要的结构还是如上图。GCS 作为集中的服务端,是 Worker 之间传递消息的纽带。每个 Server 都有一个共用的 Object Store,也就是用 Apache Arrow/Plasma 构建的内存数据。 Local Scheduler 是 Server 内部的调度,同时通过 GCS 来和其他 Server 上的 Worker 通信。Object Store 时间也有通信,作用是传递 Worker 之间的数据。

    在 Paper 里面描述了一个典型的远程调用流程:


    image.png

    可以看到,GCS 储存了代码、输入参数、返回值。Worker 通过 Local Scheduler 来和 GCS 通信。Local Scheduler 就是 Raylet, 是单机上的基础调度服务


    image.png

    Object > 100 kb 会通过 Object Store 之间的并行 RPC 来传输,而不通过任务调度 RPC 来实现。Apache Arrow 在 0.15 之后提供了一个 Apache Arrow Flight 的 RPC 框架,0.16 又做了强化。不知道 Ray 的 Object 的并行传递是不是采用 Arrow Flight。下图是一个 任务调度的 RPC 示例图:


    image.png

    三、Raylet

    1、Raylet 本地调度的核心

    重新画了一个简单一点的 Worker 和 GCS 的关系图:


    image.png

    Raylet 在中间的作用非常关键,Raylet 包含了几个重要内容:

    • Node Manager
    • Object Manager
    • gcs_client 或者 gcs server
      Node Manager 是基于 boost::asio 的异步通信模块,主要是通信的连接和消息处理管理;Object Manager 是 Object Store 的管理;gcs_client 是连接 GCS 客户端。如果设置RAY_GCS_SERVICE_ENABLED 为 True 的话 ,这个 Server 就是 作为 GCS 启动。

    我们先看一下 Raylet 的启动过程:


    image.png

    首先,要做 Raylet 的初始化,这里面包含很多参数,包括 Node Manager 和 gcs client 的初始化。然后 Raylet Start 之后,注册 GCS,准备接收消息。一旦有消息进来,就进入 Node Manager 的 ProcessClientMessage 过程。在解释 ProcessClientMessage 的操作之前,我们需要了解一下 Ray Worker 和 Raylet 的进程/线程和通信的模型

    2、通信模型

    Ray 采用的是 Boost::asio 的异步通信模型,这里有一个很丰富全面的关于 asio 的介绍

    image.png
    Asio 采用的是 Proactor 模型。一个操作经过 Initiator 之后分解为 Asynchronous Operation Processor(AOP) 、Asynchronouse Operation(AO) 和 Completion Hanlder(CH) 。AOP 做具体的工作,执行异步操作。执行完成之后,把结果放入Completion Event Queue(CEQ)。Asynchronous Event Demultiplexer(AED)等待 CEQ ,如果 CEQ 出现完成事件,则返回一个完成事件到 CH

    Raylet 启动了一个 main_service , 是 boost::asio::io_service 。io_service 也是 asio 运转的核心组件。前面的AOP、AED 和 Proactor 都是由 io_service 串联起来的。io_service 内部实现了一个任务队列,队列的任务就是void(void) 函数

    io_service 的接口有 run 、run_one、poll、poll_one、stop、reset 、 dispatch 、post 。run 方式就是轮询执行队列里面的所有任务,无任务执行的时候就 epoll_wait 上阻塞等待

    // Initialize the node manager.
    boost::asio::io_service main_service;
    main_service.run();
    

    Node Manager 在初始化的时候,会按照 num_initial_workers 的数量初始化 worker pool 。然后Node Manager 会按照 asio 的异步机制,分配任务到这些 worker pool 里面的进程

    接下来我们看一下 Raylet 、Worker 和 GCS 的消息传递和调度机制

    3、消息传递和调度

    Ray 后面的公司 http://Anyscale.io 的 blog 有一篇文章,叫做 Fast Scheduling in Ray 0.8 。讲了怎么在 ray 0.8 里面优化调度

    image.png
    Worker 提交 task 到 raylet,raylet 分配 task 到其他 worker。同时 raylet 还需要把 task 、相关 worker 信息提交给 GCS。task 执行的参数和返回都需要通过 Object Store 来获取

    接下来,我们看一下详细的消息传递和对应的一些执行过程

    先看一下 Submit Task 这个操作: Worker 提交一个 Task ,就调用 SubmitTask 的任务到 Raylet 。Task 在 Raylet 内部有一个 Lineage 的机制。这个也是上面 Anyscale 图里面的 task lineage

    我们先了解一下 Task Lineage 的机制

    (1)、Task Lineage

    Task Lineage 里面包含几个概念,Lineage Cache 、Lineage Entry 和 Lineage 。Lineage 是管理 Task 执行的 DAG (有向无环图) ;Lineage Entry 是对 Task 状态的一些管理;Lineage Cache 是对 Task 在本机执行缓存的管理。在上面 Fast Scheduling in Ray 0.8 文章里面,主要就是通过对 Lineage 的优化来提升 Ray 0.8 的调度性能

    image.png
    在 Ray 0.8 里面,把调用其他 Worker 的流程,从 Raylet 到 GCS 然后到 woker ,改为直接查询 Lineage Cache,如果 Worker 曾经调用过,就直接请求对应的 Worker。减少调用路径,提升效率。

    回到我们对 Lineage 的分析

    Task 在 GCS 里面有几个状态:None、Uncommitted、Committing、Committed 。None 意思是在 Lineage Cache 里面不存在;当任务从 Woker 提交之后,是 uncommited 状态了;当任务发生一些变化,经过一些操作或者重新提交,就是 Committing 状态。意思就是正在进行 Committing,等待返回状态;提交的任务得到了反馈,就是 Committed 状态。但是有一个不同,任务没有删除,当下一个任务还是调度这个 Worker 的时候,就可以直接调用这个 Task Entry 来实现。这就是上面说的优化过程

    TaskEntry 保存 Task 的状态和相关的联系。主要包含这么几个内容:

    • GcsStatus:就是上面说的 Task 的状态
    • parent_task_ids_:一个 Set ,保存了 Task 的父任务 ID 列表
    • forwarded_to_:一个 Set,保存了任务明确提交到的 Node Manager 的 ID 列表
      Lineage 维护了两个 map。一个是 Task 和 LineageEntry 的 map;一个是 TaskID 和 TaskID Set 的 Map。第二个的意思就是 Task 和它子 Task 组的映射

    LineageCache 是 Task 的 Cache Table 。包含了 Task 的信息和状态。Lineage Cache 的策略是把所有的任务成为 Uncommitted 状态。为了安全起见,只有当 Task 的父任务都删除了,子任务才能删除

    Lineage 的细节还很多,而且还处在优化的状态。我们先看看通过一个 Task 提交的过程来看看 Lineage 是怎么运转的

    (2)、提交任务

    Submit Task 之后,先记录增加了一个 Task。然后拿到需要提交的 Task Spec,就是 Task 详细信息。然后提交。Task 有几个状态:

    • Placeable:就绪的状态,可以分配到 Node, 可以是本地或者远端。分配的原则根据资源状况,例如本地的内存、是否超过 Task 最大数量等。如果本地资源不够,就会提交到其他的 Node ,也就是服务器。当然,如果其他 Node 资源也不够,就会继续分配。
    • WaitForActoreCreation:这个转改是针对 Actore Task ,代表 Actor 方法等待 Actore 完成返回
    • Waiting:Task 在等待它的参数的依赖关系满足要求。也就是 Task 的参数需要放到 local object store
    • Ready:Task 可以运行,所有的参数已经传输到 local object store 了
    • Running:Task 已经分配冰运行到一个
      worker
    • Blocked:Task 暂停。可能是因为 Task 正在等待启动其他 Task 并且等待结果返回
    • Infeasible:Task 所需要的资源所有机器都不满足
      在 design_docs/task_states.rst 文档里面有一个图:


      image.png

      在 SubmitTask 最后:

    // if the task was forwarded.
        if (forwarded) {
          // Check for local dependencies and enqueue as waiting or ready for dispatch.
          EnqueuePlaceableTask(task);
        } else {
          // (See design_docs/task_states.rst for the state transition diagram.)
          local_queues_.QueueTasks({task}, TaskState::PLACEABLE);
          ScheduleTasks(cluster_resource_map_);
        }
    

    如果要提交的 Task 需要 forward (在收到 HandleForwardTask 操作的时候),则进行 Task 如队列操作。入队的时候,如果参数都满足,也就是本地资源足够。Task 就入队列,成为 READY 状态,如果不满足,就是 WAITING 状态。同时改变 Task 状态为 Pending

    if (args_ready) {
        local_queues_.QueueTasks({task}, TaskState::READY);
        DispatchTasks(MakeTasksByClass({task}));
      } else {
        local_queues_.QueueTasks({task}, TaskState::WAITING);
      }
    
    task_dependency_manager_.TaskPending(task);
    

    (3)、调度策略

    在 SubmitTask 之后,如果不是 forward ,则执行两个操作:

    local_queues_.QueueTasks({task}, TaskState::PLACEABLE);
    ScheduleTasks(cluster_resource_map_);
    

    第一个是把 Task 放到本地,并且把 Task 状态置为 Placeable;第二是把 Task 在集群进行调度

    Ray 针对 Task 有两个 Queue:ReadyQueue、SchedulingQueue 。 ReadyQueue 是已经准备好的 Task 的队列;SchedulingQueue 是已经提交的 Task 的队列。这两个队列用来存储不同状态的 Task,实现上面说的 Task 状态变化过程。

    调度任务的步骤是这样:

    • 先尝试把 Tasks 放在 Local Node
      遍历集群的资源,查看这个 Task 需要的资源是不是可以用。把可以用的集群的资源都储存到 clients_keys 的 vector 里面
    • 如果 Local Node 有资源
      从资源里面按照均匀分布取出一个资源。均匀分布的算法用的是 C++ 的 uniform_int_distribution 。然后为集群的 Client 都准备好计算资源
    • 如果没有合适的计算资源,就采用硬分配的方式,给 Client 安排计算资源。

    四、集群架构

    按照上面的描述,Ray 集群有 Worker 、Gcs 和 Raylet 等模块。Worker 是一个执行单元。 Worker 的执行是通过 gRPC 来远程提交的。整个架构有点像 istio 的 service mesh 的结构


    image.png

    对应以上的粗粒度的组件,拆解开来就像下面这样:


    image.png
    这里面有几个关键组件。Raylet 是处理 Worker 和 GCS 的关键连接点,还有处理 Local Worker 之间的调度。Raylet 里面包含 Node Manager,这是处理消息传递和调度的基础模块;还有 Object Manager ,这是处理本机 Arrow 内存读取的组件,相对容易理解;Core worker 组件针对 Python Driver 提供支持,主要是完成 Task 的调度。就是 python 里面使用 ray 时候需要加的 remote 注解。这个是 Ray 的核心。Python Driver 主要是针对 python 提供支持,当然 Ray 也有 Java Driver ,这里没有列出

    我们先从 Raylet 看起

    1、Raylet

    在 Raylet 初始化的时候,初始化了一个 main_service 。 这是一个boost::asio::io_service 实例。这个在上面的通信模型里面简单描述了一下 asio 的机制。main_service 在 main.cc 启动(main_service.run()),main_service 的引用传递到了 Raylet ,然后 Raylet 应用传递到了 Node Manager

    Node Manager(下称 NM)是 Raylet 的一个负责通信的模块,处理 Raylet 和其他分布式节点(服务器)、Worker、Task 分配还有 GCS 的通信

    从 Raylet 到 Node Manager 的入口在 HandlerAccept :

    ClientHandler<local_stream_protocol> client_handler =
            [this](LocalClientConnection &client) { node_manager_.ProcessNewClient(client); };
        MessageHandler<local_stream_protocol> message_handler =
            [this](std::shared_ptr<LocalClientConnection> client, int64_t message_type,
                   const uint8_t *message) {
              node_manager_.ProcessClientMessage(client, message_type, message);
            };
        // Accept a new local client and dispatch it to the node manager.
        auto new_connection = LocalClientConnection::Create(
            client_handler, message_handler, std::move(socket_), "worker",
            node_manager_message_enum,
            static_cast<int64_t>(protocol::MessageType::DisconnectClient));
    

    client_handler 是处理连接请求,message_handler 是处理这个 Client 的消息。LocalClientConnection 是一个针对客户端请求到服务端的抽象,主要是基于 asio 机制把读写,和异步读写封装了一下

    ProcessNewClient 主要是记录 Client 的一些信息 ProcessClientMessage 就是上面架构图里面的消息处理,对应不同的消息处理流程。可以看一下附录:《Node Manager 处理消息的列表》

    这里面最重要的一个,是 SubmitTask。是针对 task 的处理,Task 作为主要任务调度的模块,贯穿 Ray 分布式任务调度的全过程。所以我们有必要从源头来了解和跟踪一下 Task 的发起到完成的整个过程。同时,我们也可以通过这个过程,了解从 Python Driver 到 Core Worker ,然后到 Raylet 的处理过程。

    2、Submit Task

    Task 是表示一个任务及其执行的资源等信息。Task 的发起是从 Python Driver

    @ray.remote
    def borrower(inner_ids):
         inner_id = inner_ids[0]
         ray.get(foo.remote(inner_id))
    
    inner_id = ray.put(1)
    outer_id = ray.put([inner_id])
    res = borrower.remote(outer_id)
    

    例如以上的代码,@ray.remote 注解下面的函数,就是一个执行体。对应的是 RemoteFuntion Class 。在 _remote 这一段:

    self._pickled_function = pickle.dumps(self._function)
    

    _function 序列化为 _pickled_funtion,然后再 hash 为 pickled_function_hash

    self._function_descriptor = PythonFunctionDescriptor.from_function(
                    self._function, self._pickled_function)
    
    def from_function(cls, function, pickled_function):
        pickled_function_hash = hashlib.sha1(pickled_function).hexdigest()
    

    然后就调用了 Core Worker 的 SubmitTask

    Status SubmitTask(const RayFunction &function, const std::vector<TaskArg> &args,const TaskOptions &task_options, std::vector<ObjectID> *return_ids,int max_retries);
    

    在 SubmitTask 里面。生成一个 task id ,然后通过 BuildCommonTaskSpec 函数,把 Task 所有信息封装成一个 TaskSpecification 实例。然后把这个 TaskSpecification 提交到 Task 的任务队列里面。

    if (task_options.is_direct_call) {
        task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, max_retries);
        return direct_task_submitter_->SubmitTask(task_spec);
      } else {
        return local_raylet_client_->SubmitTask(task_spec);
      }
    

    这里面 Task Manager 是对 Task 管理的一个封装。包含了对应的内存 in_memory_store_、引用计数 reference_counter_(主要用作对 ObjectID 的管理,用在 GC 上),任务的状态和 Retry 次数等。这里面 task_manager->AddPendingTask ,主要是针对 Task 提交前做了记录,记录 Task ID,为了 reference_manager_ 之后的 GC 用

    is_direct_call 是针对 Actor worker 的直接调用。local_raylet_client_ 就是上面提到的 Raylet Client,Core worker 把接收到的 remote 调用提交到 Raylet ,Raylet 来做调度。就是下图红色的那一段:


    image.png

    在 Raylet 的 Node Manager 接收到 SubmitTask 消息,按照 Task 的依赖次序来提交 Task。意思就是,如果一个 Task B 依赖于另外一个 Task A,那就先提交 Task A

    如果任务是提交到另外一个 Node(这个取决于 Lineage 调度,forwarded 是 True,forwarded 是 SubmitTask 的最后一个参数),则在 Lineage Cache 增加一个 UncommittedLineage

    lineage_cache_.AddUncommittedLineage(task_id, uncommitted_lineage)
    

    这里面第二个参数,是 SubmitTask 的时候,生成的一个 Lineage 的实例。

    如果任务是提交到本地(forwarded 是 False,默认),则异步 commit task 到 GCS:

    lineage_cache_.CommitTask(task)
    

    调用的是 Lineage 的 CommitTask,然后调用 Lineage 的 FlushTask,接着调用 gcs_client_->Tasks().AsyncAdd 把 Task 状态提交到 GCS,然后根据返回状态更新本地 Lineage 的 Task 状态为 GcsStatus::COMMITTED。同时 Evict Task 和 UnSubscribeTask

    整个过程流程图如下:


    image.png

    Node Manager 处理消息的列表:
    ![

    ](https://img.haomeiwen.com/i17624987/e16a293393459d2e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

    Reference

    https://zhuanlan.zhihu.com/p/111340572
    https://zhuanlan.zhihu.com/p/104022670
    https://zhuanlan.zhihu.com/p/41994061
    https://zhuanlan.zhihu.com/p/41994061
    https://zhuanlan.zhihu.com/p/357182462
    https://zhuanlan.zhihu.com/p/370915277
    https://zhuanlan.zhihu.com/p/344736949
    https://zhuanlan.zhihu.com/p/372646991
    https://zhuanlan.zhihu.com/p/32474974
    https://www.zhihu.com/question/265485941/answer/298033190
    https://zhuanlan.zhihu.com/p/75496780
    https://zhuanlan.zhihu.com/p/412312837
    留个坑,以后更

    相关文章

      网友评论

          本文标题:找分布式工作复习学习系列---市面分布式框架解析之Ray(四)

          本文链接:https://www.haomeiwen.com/subject/lekdnltx.html