美文网首页Flinkflink
Flink源码4-Slot分配和Task执行

Flink源码4-Slot分配和Task执行

作者: fat32jin | 来源:发表于2020-12-20 15:52 被阅读0次

    接上期:——》JobMaster#startJobExecution()
    resetAndStartScheduler();
    ——》JobMaster#resetAndStartScheduler
    schedulerAssignedFuture.thenRun(this::startScheduling);
    ——》JobMaster#startScheduling()
    schedulerNG.startScheduling();

    SchedulerBase# startScheduling();

    startAllOperatorCoordinators();
    * 注释: 开始调度
    */
    startSchedulingInternal();

    DefaultScheduler#startSchedulingInternal
    schedulingStrategy.startScheduling();

    LazyFromSourcesSchedulingStrategy#startScheduling();

    * 注释: 申请 Slot 并且开始 部署 ExecutionVertices
    */
    allocateSlotsAndDeployExecutionVertices(schedulingTopology.getVertices());
    ——》LazyFromSourcesSchedulingStrategy#allocateSlotsAndDeployExecutionVertices

    * 注释: 申请 slot 和 部署
    * schedulerOperations = DefaultScheduler
    */
    schedulerOperations.allocateSlotsAndDeploy(Collections.

    DefaultScheduler#allocateSlotsAndDeploy()

    *******来到正式入口:DefaultScheduler#allocateSlotsAndDeploy()***********
    流程:
    1、JobMaster 发送请求申请 slot
    2、ResourceManager 接收到请求,执行 slot请求处理
    3、TaskManager 处理 ResourceManager 发送过来的 Slot 请求
    4、JobMaster 接收到 TaskManager 发送过来的 Slot 申请处理结果

    1、JobMaster 发送请求申请 slot 0:31 ~

    —— 5 》DefaultScheduler#allocateSlotsAndDeploy()

    * 1 注释: 申请Slot
    final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments = allocateSlots(executionVertexDeploymentOptions);
    .............
    * 2 注释: 部署运行
    */
    waitForAllSlotsAndDeploy(deploymentHandles);

    ——先走 1 》DefaultScheduler#allocateSlots() 0:21
    * 注释: 申请Slot
    */
    final List<SlotExecutionVertexAssignment>
    slotExecutionVertexAssignments = allocateSlots(executionVertexDeploymentOptions);
    ——》DefaultScheduler#allocateSlots()

    ——4 》DefaultExecutionSlotAllocator#allocateSlotsFor

    • 注释: NormalSlotProviderStrategy */
      slotProviderStrategy.allocateSlot

      SlotProviderStrategy#NormalSlotProviderStrategy#allocateSlot()

      SchedulerImpl#allocateSlot()
      ——》SchedulerImpl#allocateSlotInternal
      ——》SchedulerImpl#internalAllocateSlot
      —— 》SchedulerImpl# allocateSingleSlot

      //第1 步: 从池中获取
      Optional<SlotAndLocality> slotAndLocality = tryAllocateFromAvailable
      if(slotAndLocality.isPresent()) {
      return //如果slot池里面有直接分配
      CompletableFuture.completedFuture(completeAllocationByAssigningPayload
      else{//否则new 一个
      return requestNewAllocatedSlot(slotRequestId, slotProfile
      }
      ——》SchedulerImpl# requestNewAllocatedSlot
      return slotPool.requestNewAllocatedSlot(slotRequestId,

      SlotPoolImpl#requestNewAllocatedSlot()
      ——》 SlotPoolImpl# requestNewAllocatedSlotInternal()
      ——》 SlotPoolImpl# requestSlotFromResourceManager()
      //真正发送请求给resourceManager
      CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway
      .requestSlot

      ResourceManager#requestSlot()
      slotManager.registerSlotRequest(slotRequest);

      SlotManagerImpl#registerSlotRequest()
      ——0 》SlotManagerImpl#internalRequestSlot()

    OptionalConsumer.of(findMatchingSlot(resourceProfile))
    .ifPresent(
    101:) taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest))
    .ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));

      ——1  有空闲 slot》SlotManagerImpl#findMatchingSlot
      ——2  没有slot 》    fulfillPendingSlotRequestWithPendingTaskManagerSlot        
    

    pendingTaskManagerSlotOptional = allocateResource(resourceProfile);
    ——》 SlotManagerImpl# allocateResource();
    if(!resourceActions.allocateResource(defaultWorkerResourceSpec))

    ResourceManager#ResourceActionsImpl#allocateResource()
    // 这里2个实现 ,第一个 StandaloneResourceManager 直接返回false
    // 第二个 startNewWorker ,申请新的 YarnContainer
    return startNewWorker(workerResourceSpec);

    返回上面101:) taskManagerSlot -> allocateSlot
    —— 》 SlotManagerImpl#allocateSlot() 1:18:00

    * 注释: 申请 slot
    CompletableFuture<Acknowledge> requestFuture = gateway
    .requestSlot(slotId, pendingSlotRequest.getJobId(),

    102:) TaskExecutor#requestSlot()

    allocateSlot(slotId, jobId, allocationId, resourceProfile);
    ——》 TaskExecutor#allocateSlot
    if(taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId

    TaskSlotTableImpl#allocateSlot()
    slots.add(allocationId);
    ▲▲▲▲回到上面102:)行方法 ▲▲▲▲▲▲▲▲

           offerSlotsToJobManager(jobId);
    

    ——》 TaskExecutor#offerSlotsToJobManager
    —— 》 TaskExecutor# internalOfferSlotsToJobManager


    image.png
    image.png

    acceptedSlotsFuture .whenCompleteAsync(handleAcceptedSlotOffers
    ——》 TaskExecutor#handleAcceptedSlotOffers()
    ▲▲▲▲回到上面 ——4 》▲▲ 1:48
    ——4 》DefaultExecutionSlotAllocator#allocateSlotsFor()

    image.png
    ▲▲▲▲一直回到最上面 ——5 》 ▲▲
    —— 5 》DefaultScheduler#allocateSlotsAndDeploy()
    * 再走 2 注释: 部署运行
    * 1、申请到slot了
    * 2、构建好了hander
    * 3、 执行部署
    */
    waitForAllSlotsAndDeploy(deploymentHandles); 1:52
    ▲▲▲▲至此 slot 已经 获取 ,下来将进入 task部署和 提交 》▲▲ 1:48

    4.3. Task 部署和提交 2:02 ~

    入口:
    —— 5 》DefaultScheduler#waitForAllSlotsAndDeploy()


    image.png

    —— 》DefaultScheduler#deployAll()

    • 注释: 通过 deployOrHandleError 来进行部署
      * 当然,部署 Task 的时候,也有可能会报错!
      */ FutureUtils.assertNoException(slotAssigned.handle(deployOrHandleError(deploymentHandle)));

    —— 》DefaultScheduler#deployOrHandleError()

    • 注释: 部署 Task(到时候根据 ExecutionVertexID 来确定 Task)
      */
      deployTaskSafe(executionVertexId);
      —— 》DefaultScheduler#deployTaskSafe()

      image.png
          /*************************************************
             注释: 根据 ExecutionVertexId 获取 ExecutionVertex
           */
          final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
      
          /*************************************************
             注释: 一个 Task 执行一个 ExecutionVertex
           *  executionVertexOperations = DefaultExecutionVertexOperations
           */
          executionVertexOperations.deploy(executionVertex);
                                                                            ↓
                 DefaultExecutionVertexOperations#deploy();
        —— 》ExecutionVertex#deploy
               *  注释: 调用 Execution 的 deploy() 方法部署
       */
      currentExecution.deploy();
            —— 》Execution#deploy();
                                              ▼  
      
    image.png

    CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask
    (deployment, rpcTimeout), executor).
    thenCompose(Function.identity())

    —— 》RpcTaskManagerGateway#submitTask()

                  *  注释: TaskExecutor
         *  提交到对应的 slot 所在节点的 TaskExecutor 中来执行该 ExecutionVertex,其实已经变成: Task
         *  关于 Client 提交 Job 到最后变成分布式 Task 物理执行图的所有细节到此为止,结束了。
         *  从这以后,就是去到了 TaskManager 中的 某个节点 TaskExecutor 
                *  中来执行 Task 
         */
        return taskExecutorGateway.submitTask(tdd, jobMasterId, timeout);
                                                                ↓   
    

    —— 》TaskExecutor#submitTask()

    * 注释: 提交 Task 让 TaskManager 启动
    * 第一个参数: TaskDeploymentDescriptor 包含启动当前这个 Task 所需要的一切信息
    * 注释: 构建 Task
    * 内部会初始化一个执行线程。一个Task 是线程级别的执行粒度
    */
    Task task = new Task(jobInformation, taskInformation, tdd.getExecutionAttemptId(),

          ——》Task 构造函数()
                                            ▼  
                  *  注释: 初始化 ResultPartitionerWriter
         */
        // produced intermediate result partitions
        final ResultPartitionWriter[] resultPartitionWriters = shuffleEnvironment
               ..............
               *  注释: 初始化 InputGate
         */ shuffleEnvironment.createInputGates(taskShuffleContext, this, inputGateDeploymentDescriptors)
            .toArray(new IndexedInputGate[0]);
                .............
                *  注释: 执行 Task 的线程 但是不启动 
         *  转到 Task 的 run() 方法
         */
        // finally, create the executing thread, but do not start it
        executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
                 .............
    
    image.png
    * 注释: 注册 Task
    */
    taskAdded = taskSlotTable.addTask(task);
    ................
    * 注释: 如果注册成功,则通过一个线程来运行 Task
    */
    task.startTaskThread();
    ——》Task # startTaskThread

    * 注释: 这个线程,在创建 Task 对象的时候,就已经会初始化好了
    * 经过转换,最终,就是调用当前类的 run() 方法
    */
    executingThread.start();
    —— 》Task #run()
    ——aa ★★》Task #dorun()

    image.png
    a:)      setupPartitionsAndGates(consumableNotifyingPartitionWriters, inputGates);
         ——a 》Task # setupPartitionsAndGates()
    
    image.png
    ——a:1 》 partition.setup();

    ResultPartition# setup();
    partitionManager.registerResultPartition(this);
    image.png
                                        ——a: 2》 启动
                                            // we are requesting partitions
                       for(InputGate gate : inputGates) {
                                 gate.setup();
                                                      ↓ 
                                   ——》 SingleInputGate#.setup();
                                                 ▼       
    

    BufferPool bufferPool = bufferPoolFactory.get();
    setBufferPool(bufferPool);
    ▲回到最上面 ——aa 》Task #dorun ▲
    ——aa 》Task #dorun ()
    ................
    b:) * 注释: 构建一个环境对象
    */

    Environment env = new RuntimeEnvironment(jobId, vertexId,
    c:) invokable = loadAndInstantiateInvokable(userCodeClassLoader,

    d:) invokable.invoke();

    StreamTask#invoke();

    ... beforeInvoke();
    ...★runMailboxLoop();
    ...afterInvoke();

    相关文章

      网友评论

        本文标题:Flink源码4-Slot分配和Task执行

        本文链接:https://www.haomeiwen.com/subject/ylsqnktx.html