美文网首页
系统设计:分布式调度器(KISS面试系列)

系统设计:分布式调度器(KISS面试系列)

作者: 花花大脸猫 | 来源:发表于2022-06-05 22:21 被阅读0次

    这是系统面试准备系列的第一篇博客。我的目标是设计出KISS(keep it simple stupid)系统,可以在实际系统设计面试中花费45-60分钟进行讨论。

    介绍
    任务调度是一个常见的系统设计面试问题,下面的一些领域,可能会需要设计一个任务调度系统:

    1. 设计一个对账处理的系统(每月/周/日 进行对账处理)
    2. 设计一个代码部署的系统(定期进行代码流水线处理)

    这篇文章的目的是设计一个简单且可扩容的任务调度系统

    问题陈述
    设计一个在指定时间间隔运行的任务调度系统

    功能性需求

    1. 用户能够调度或者查看任务
    2. 用户能够列出所有已提交的任务,并显式当前任务的状态
    3. 任务能够运行一次或者重复运行。任务需要在定义的调度时间之后,再给定的X阈值时间内运行(假设X = 15分钟)
    4. 单个任务的执行时间不能超过X分钟(假设X = 5分钟)
    5. 任务也会有优先级,高优先级的任务需要比低优先级的任务先执行
    6. 任务的最终输出需要被存储至文件系统中

    非功能性需求

    1. 高可用 - 对于用户而言,系统可以一直添加/查看任务
    2. 可扩展性 - 系统可以扩展支持数百万的任务调度
    3. 可靠性 - 系统必须最少一次执行一个任务,并且相同的任务不能在同一时间
      被不同的进程调度
    4. 持久性 - 在任何失败场景下,系统都不应该丢失任务的信息
    5. 延迟 - 系统应在作业被接受后立即回馈用户。 用户不必等到工作完成。

    流量和存储估算
    假设任务调度系统的QPS设计目标:1000QPS;假设单个调度任务最多可以运行 5 分钟,因此该系统是高度受到CPU的制约的

    CPU限制(CPU Bound)
    一个现代的CPU可以有16核,单核拥有2个线程,单个调度任务最多可以运行 5 分钟。 那么单机器CPU执行任务的公式:

    16核 * 2线程 / 5min / 60s = 0.1 任务/秒 (8000 任务/天)
    

    内存限制(Memory Bound)
    假设每一个任务会占用5M的内存,对应的内存分配了16G, 那么单机器内存执行任务的公式:

      16G * 1024 / 5M / 5min / 60s = 10 任务/秒 
      如果需要达到1000QPS, 那么需要的机器数量 1000 / 10 = 100
    

    上述结果给了我们一个提示,即用于处理任务调度的单机设计既不高可用也不可扩展。 所以我们需要分布式系统来设计解决方案

    系统接口
    有如下三个接口需要暴露给用户

    1. 任务提交: submitJob(api_key, user_id, job_schedule_time, job_type, priority, result_location)
      job_type值可以是ONCE 或者 RECURRING,API可以返回HTTP Code 202代表接收到了任务
    2. 单个任务查看: viewJob(api_key, user_id, job_id)
      响应的任务状态包括NOT_STARTED STARTED COMPLETED
    3. 任务列表查看: listJobs(api_key, user_id, pagination_token)

    高层级设计

    image.png

    用户的请求流程:
    (1 & 2) 用户通过load balancer(或API 网关)提交/获取任务
    (3) 请求会被持久化到DB中,并且返回ack告知用户已处理
    (4 & 5)Job Scheduler Service会持续的从DB中拉取快到期执行的任务,并将其塞入队列中
    (6 & 7)Job Executor Service会执行实际的业务逻辑调度任务,更新最终结果进文件系统并将任务调度状态置为COMPLETED

    DB 设计
    由于我们对事务支持或任何其他 ACID 属性没有严格要求,只需牢记峰值的QPS(2 * 1000 = 2000),所以我们可以同时使用SQL或者NoSQL数据库,考虑到 NoSql 在规模、维护和成本方面的明显优势,我会选择使用 DynamoDb 的 NoSql 解决方案

    • 用户查询模式
      给定UsedId,新增任务
      给定UserId,获取所有jobIds

    • DB Schema

    Table: JOB
    +------------------------------+--------+
    |          Attribute           |  Type  |
    +------------------------------+--------+
    | user_id (partition key)      | uuid   |
    | job_id (sort key)            | uuid   |
    | actual_job_execution_time    | date   |
    | job_status                   | string |
    | job_type                     | string |
    | job_interval                 | int    |
    | result_location              | string |
    | current_retries              | int    |
    | max_retries                  | int    |
    | scheduled_job_execution_time | date   |
    | execution_status             | string |
    

    job_status:用户查看的任务状态,包含NOT_STARTED, STARTED, COMPLETED三种状态
    execution_status:当前服务维护的实际执行状态,包含NOT_STARTED, CLAIMED, PROCESSING, SUCCESS, RETRIABLE_FAILURE, FATAL_FAILURE

    除了用户之外,我们的作业调度服务将轮询数据库以获取到期的任务,可以通过不同的方式来实现这一目标:

    1. 基于X分钟大小的桶窗口分区
      我们可以创建名为 scheduleJob 的索引来检索最后 X 分钟到期的作业,将其拉出之后使用延时队列特性塞入MQ中
    Index: ScheduledJob
    +----------------------------------------------+------+
    |                  Attribute                   | Type |
    +----------------------------------------------+------+
    | scheduled_job_execution_time (partition key) | time |
    | job_id (sort key)                            | uuid |
    +----------------------------------------------+------+
    Query (SQL equivalent):
    SELECT * FROM ScheduledJob WHERE scheduled_job_execution_time == now() - X
    
    1. 基于X分钟大小的桶窗口+ share id 的分区
      很有可能,在一个特定的时间窗口内,很多任务会被接收到(假设有10万个)。在这种场景下,上述查询语句的性能会非常的慢,我们可以根据随机的Share Id(假设在1到N之间)进一步对DB进行分区
    Index: ScheduledJob
    
    +----------------------------------------------+------+
    |                  Attribute                   | Type |
    +----------------------------------------------+------+
    | scheduled_job_execution_time (partition key) | uuid |
    | shard_id (partition key)                     | int  |
    | job_id (sort key)                            | uuid |
    +----------------------------------------------+------+
    Query (SQL equivalent):
    SELECT * FROM ScheduledJob WHERE scheduled_job_execution_time == now() - X and shard_id == Y
    

    深入底层设计

    image.png
    1. 任务调度器(Job Scheduler)如何工作
      任务调度流程:
    • 每 X 分钟,Master节点创建一个权威的 UNIX 时间戳,并为每个 worker 分配一个 shard_id 和 schedule_job_execution_time。
    • Worker 节点将执行以下查询,并将任务推送到 Kafka 队列中执行。
    worker 1:
    SELECT * FROM ScheduledJob WHERE scheduled_job_execution_time == now() - X and shard_id = 1
    worker 2:
    SELECT * FROM ScheduledJob WHERE scheduled_job_execution_time == now() - X and shard_id = 2
    

    容错设计

    • Master 监控Worker的健康状况并知晓哪个Worker死亡以及如何将查询重新分配给新Worker。
    • 如果Master节点死亡,我们可以分配其他worker节点作为master
    • 此外,如果worker成功查询db,我们还可以引入本地数据库来跟踪状态并将待执行任务放入队列中
    1. 任务执行器(Job Executor)如何工作
      Job Executor存在多个Consumer从队列中拉取待消费的任务,Consumer 机器也存在Master进程与Worker进程。Master进程与Worker进程都基于Pull模型上运行。Master进程 将从队列中轮询调度任务,Worker进程将通过执行以下代码不断从 master 轮询调度任务
    while True:
      w = get_next_work()
      do_work(w)
    

    任务执行流程与容错设计

    • 当从队列中取出调度任务时,消费者的 master 更新db中JOB的属性 execution_status=CLAIMED。
    • 当Worker进程接手工作时,它会更新 execution_status=PROCESSING 并不断向本地 DB 发送健康检查。
    • 调度任务完成后,Worker进程会将结果推送到 AWS s3 中,更新db中JOB的 execution_status=COMPLETED 或 FATAL_FAILED,并使用状态更新本地 db
    • Worker进程和Master进程都将更新本地数据库中的健康检查。

    健康检查服务
    健康检查服务定期运行(比如每 x 秒),并扫描上次从Worker进程接收到的健康检查小于定义阈值的数据库。 在这种情况下,它认为调度任务未能处理并将其推送回队列。

    结论
    系统设计是一个广泛的话题,一小时的面试很难涵盖到系统设计的方方面面。在上述的设计中,我们已经达到了面试官可以进一步深究的大部分区域。

    引用

    参考链接:https://medium.com/@raxshah/system-design-design-a-distributed-job-scheduler-kiss-interview-series-753107c0104c

    相关文章

      网友评论

          本文标题:系统设计:分布式调度器(KISS面试系列)

          本文链接:https://www.haomeiwen.com/subject/lmhnprtx.html