美文网首页flink实时数据相关
Flink源码解析之JobManager启动

Flink源码解析之JobManager启动

作者: 小C菜鸟 | 来源:发表于2018-03-18 12:12 被阅读326次

    JobManager职责

    JobManager的职责主要是接收Flink作业,调度Task,收集作业状态和管理TaskManager。它包含一个Actor,并且接收如下信息:

    • RegisterTaskManager: 它由想要注册到JobManager的TaskManager发送。注册成功会通过AcknowledgeRegistration消息进行Ack。
    • SubmitJob: 由提交作业到系统的Client发送。提交的信息是JobGraph形式的作业描述信息。
    • CancelJob: 请求取消指定id的作业。成功会返回CancellationSuccess,否则返回CancellationFailure。
    • UpdateTaskExecutionState: 由TaskManager发送,用来更新执行节点(ExecutionVertex)的状态。成功则返回true,否则返回false。
    • RequestNextInputSplit: TaskManager上的Task请求下一个输入split,成功则返回NextInputSplit,否则返回null。
    • JobStatusChanged: 它意味着作业的状态(RUNNING, CANCELING, FINISHED,等)发生变化。这个消息由ExecutionGraph发送。

    JobManager启动过程

    代码在org.apache.flink.runtime.jobmanager.JobManager.scala文件中,入口是main方法,通过脚本启动JobManager时,调用的就是main方法。main方法是通过调用runJobManager方法来启动JobManager的,下面我们主要看下runJobManager方法。

    def runJobManager(
                         configuration: Configuration,
                         executionMode: JobManagerMode,
                         listeningAddress: String,
                         listeningPort: Int)
      : Unit = {
    
        ....
    
        // 首先启动JobManager的ActorSystem,因为如果端口号是0,它决定了使用哪个端口,并会更新相应的配置。
        val jobManagerSystem = startActorSystem(
          configuration,
          listeningAddress,
          listeningPort)
    
        // 创建高可靠的服务,比如通过ZooKeeper配置了多个JobManager
        val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
          configuration,
          ioExecutor,
          AddressResolution.NO_ADDRESS_RESOLUTION)
        
        ...
     
        //启动JobManager的所有组件,包括library缓存,实例管理和调度器,最终启动 JobManager Actor。
        val (_, _, webMonitorOption, _) = try {
          startJobManagerActors(
            jobManagerSystem,
            configuration,
            executionMode,
            listeningAddress,
            futureExecutor,
            ioExecutor,
            highAvailabilityServices,
            metricRegistry,
            classOf[JobManager],
            classOf[MemoryArchivist],
            Option(classOf[StandaloneResourceManager])
          )
        } catch {
          case t: Throwable =>
            futureExecutor.shutdownNow()
            ioExecutor.shutdownNow()
    
            throw t
        }
    
        // 阻塞,直到系统退出
        jobManagerSystem.awaitTermination()
    
        webMonitorOption.foreach {
          webMonitor =>
            try {
              webMonitor.stop()
            } catch {
              case t: Throwable =>
                LOG.warn("Could not properly stop the web monitor.", t)
            }
        }
    
        try {
          highAvailabilityServices.close()
        } catch {
          case t: Throwable =>
            LOG.warn("Could not properly stop the high availability services.", t)
        }
    
        try {
          metricRegistry.shutdown()
        } catch {
          case t: Throwable =>
            LOG.warn("Could not properly shut down the metric registry.", t)
        }
    
        ExecutorUtils.gracefulShutdown(
          timeout.toMillis,
          TimeUnit.MILLISECONDS,
          futureExecutor,
          ioExecutor)
      }
    

    JobManager高可用性

    目前JobManager的高可用性模式分为两种:

    • NONE:意味着没有高可用性,只有一个JobManager节点。
    • ZooKeeper:通过ZooKeeper实现高可用性,多个JobManager节点组成一个集群,通过ZooKeeper选举出master节点,由master节点提供服务,其它节点作为备份。

    当使用NONE模式时,只有一个JobManager节点提供服务,且JobManager不会保存提交的jar包信息,将Checkpoint和metadata信息保存在Java堆或者本地文件系统中,因此意味着没有搞可用性。

    而使用ZooKeeper模式时,有一个Master和多个Standby节点,当Master故障时,Standby节点会通过选举产生新的Master节点。这样不会产生单点故障,只要有新的Master生成,程序可以继续执行。Standby JobManager和Master JobManager实例之间没有明确区别。 每个JobManager可以成为Master或Standby节点。

    举例,使用三个JobManager节点的情况下,进行以下设置:


    使用NONE或ZooKeeper模式,通过如下配置进行设置:

      high-availability: none/zookeeper
    

    高可用服务创建过程

    public static HighAvailabilityServices createHighAvailabilityServices(
            Configuration configuration,
            Executor executor,
            AddressResolution addressResolution) throws Exception {
    
            HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
    
            switch(highAvailabilityMode) {
                case NONE:     //NONE模式
                    final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);
    
                    final String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
                        hostnamePort.f0,
                        hostnamePort.f1,
                        JobMaster.JOB_MANAGER_NAME,
                        addressResolution,
                        configuration);
                    final String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
                        hostnamePort.f0,
                        hostnamePort.f1,
                        ResourceManager.RESOURCE_MANAGER_NAME,
                        addressResolution,
                        configuration);
                    final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
                        hostnamePort.f0,
                        hostnamePort.f1,
                        Dispatcher.DISPATCHER_NAME,
                        addressResolution,
                        configuration);
    
                    return new StandaloneHaServices(
                        resourceManagerRpcUrl,
                        dispatcherRpcUrl,
                        jobManagerRpcUrl);
                case ZOOKEEPER:     //ZOOKEEPER模式
                    //存储JobManager Metadata 数据的Service
                    BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
    
                    //基于ZooKeeper的JobManager 高可用Service
                    return new ZooKeeperHaServices(
                        ZooKeeperUtils.startCuratorFramework(configuration),
                        executor,
                        configuration,
                        blobStoreService);
                default:
                    throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
            }
        }
    

    相关文章

      网友评论

        本文标题:Flink源码解析之JobManager启动

        本文链接:https://www.haomeiwen.com/subject/cdkeqftx.html