美文网首页玩转大数据Java
Flink 源码之TaskManager启动流程

Flink 源码之TaskManager启动流程

作者: AlienPaul | 来源:发表于2020-09-14 14:54 被阅读0次

    Flink 系列博客

    Flink QuickStart
    Flink 双流操作
    Flink on Yarn Kerberos的配置
    Flink on Yarn部署和任务提交操作
    Flink 配置Prometheus监控
    Flink in docker 部署
    Flink HA 部署
    Flink 常见调优参数总结
    Flink 源码之任务提交流程分析
    Flink 源码之基本算子
    Flink 源码之Trigger
    Flink 源码之Evictor
    Flink 源码之Window
    Flink 源码之WindowOperator
    Flink 源码之StreamGraph生成
    Flink 源码之JobGraph生成
    Flink 源码之两阶段提交
    Flink 源码之分布式快照
    Flink 源码之时间处理
    Flink 源码之节点间通信
    Flink 源码之Credit Based反压
    Flink 源码之快照
    Flink 源码之FlinkKafkaConsumer
    Flink 源码之内存管理
    Flink 源码之 1.11新特性Unaligned checkpoint

    启动脚本分析

    TaskManager的启动方式为taskmanager.sh start。因此我们如果想要知道程序入口类,必须首先分析这个脚本。

    这个脚本比较长,我们重点关注如下片段:

    # ...
    # 设置ENTRYPOINT变量值为taskexecutor
    ENTRYPOINT=taskexecutor
    # ...
    if [[ $STARTSTOP == "start-foreground" ]]; then
        exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
    else
        if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
            # Start a single TaskManager
            # 我们关注这里,不使用NUMA方式后台启动
            "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
        else
            # Example output from `numactl --show` on an AWS c4.8xlarge:
            # policy: default
            # preferred node: current
            # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
            # cpubind: 0 1
            # nodebind: 0 1
            # membind: 0 1
            read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
            for NODE_ID in "${NODE_LIST[@]:1}"; do
                # Start a TaskManager for each NUMA node
                numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
            done
        fi
    fi
    

    通过分析上面的脚本,我们看到如果没有使用start-foreground(前台模式启动),实际上调用的是flink-daemon.sh脚本。此脚本的使用方式和参数示例为:

    flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]

    为了弄清楚Java的入口类,我们接着分析flink-daemon.sh,发现如下片段:

    # ...
    STARTSTOP=$1
    # 经上面分析可知,通过taskmanager.sh执行,DAEMON的值为taskexecutor
    DAEMON=$2
    # ...
    case $DAEMON in
        (taskexecutor)
            CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
        ;;
    
        (zookeeper)
            CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
        ;;
    
        (historyserver)
            CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
        ;;
    
        (standalonesession)
            CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
        ;;
    
        (standalonejob)
            CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
        ;;
    
        (*)
            echo "Unknown daemon '${DAEMON}'. $USAGE."
            exit 1
        ;;
    esac
    # ...
    

    不难发现,DAEMON变量的值为taskexecutor,实际的入口类为org.apache.flink.runtime.taskexecutor.TaskManagerRunnerTaskManagerRunner就是TaskManager启动的入口类。

    下面我们分析TaskManagerRunner的相关源代码。

    TaskManagerRunner

    TaskManagerRunner是TaskManager在yarn模式和standalone模式下的启动类。

    我们查看下main方法:

    public static void main(String[] args) throws Exception {
        // startup checks and logging
        // 日志打印环境信息
        EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
        
        // 注册信号处理句柄
        // 在Linux下响应TERM,HUP和INT
        SignalHandler.register(LOG);
        
        // 增加shutdownlook,在JVM关闭之前回调
        // 让JVM关闭延迟5秒钟
        JvmShutdownSafeguard.installAsShutdownHook(LOG);
    
        // 获取并打印最大打开文件句柄数限制
        long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
    
        if (maxOpenFileHandles != -1L) {
            LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
        } else {
            LOG.info("Cannot determine the maximum number of open file descriptors");
        }
    
        // 启动TaskManager
        runTaskManagerSecurely(args, ResourceID.generate());
    }
    

    继续跟踪runTaskManagerSecurely方法,内容如下:

    public static void runTaskManagerSecurely(String[] args, ResourceID resourceID) {
        try {
            // 读取flink-conf.yaml和命令行传入的动态参数,作为配置信息
            final Configuration configuration = loadConfiguration(args);
    
            // 初始化共享文件系统配置(FileSystemFactory),比如HDFS
            FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
    
            // 读取安全相关配置,包含flink,JAAS,Hadoop和Zookeeper的安全配置
            SecurityUtils.install(new SecurityConfiguration(configuration));
    
            // 以安全认证环境下调用runTaskManager
            SecurityUtils.getInstalledContext().runSecured(() -> {
                runTaskManager(configuration, resourceID);
                return null;
            });
        } catch (Throwable t) {
            final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            LOG.error("TaskManager initialization failed.", strippedThrowable);
            System.exit(STARTUP_FAILURE_RETURN_CODE);
        }
    }
    

    该方法载入了Flink的主配置文件,初始化了文件系统和服务安全配置。启动TaskManager的方法在runTaskManager。如下所示:

    public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception {
        // 创建一个TaskManagerRunner,使用随机的resourceId
        final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId);
    
        // 调用start
        taskManagerRunner.start();
    }
    

    我们需要分析TaskManagerRunner构造函数和start方法。

    首先我们分析下构造函数:

    public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception {
        this.configuration = checkNotNull(configuration);
        this.resourceId = checkNotNull(resourceId);
    
        // 获取Akka超时时间
        timeout = AkkaUtils.getTimeoutAsTime(configuration);
    
        // 创建一个task manager的线程池
        // corePoolSize和机器CPU数量一致
        this.executor = java.util.concurrent.Executors.newScheduledThreadPool(
            Hardware.getNumberCPUCores(),
            new ExecutorThreadFactory("taskmanager-future"));
    
        // 创建高可用服务
        // 负责选举JobManager和ResourceManager和获取leader信息
        highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
            configuration,
            executor,
            HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
    
        // 创建RPC服务,和其他Flink进程互相通信的时候使用
        // 用于连接一个RpcEndpoint,连接成功之后返回一个RpcGateway
        rpcService = createRpcService(configuration, highAvailabilityServices);
    
        // 创建心跳服务
        HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
    
        // 创建监控指标注册
        // 用于记录所有的metrics,连接MetricGroup和MetricReporter
        metricRegistry = new MetricRegistryImpl(
            MetricRegistryConfiguration.fromConfiguration(configuration),
            ReporterSetup.fromConfiguration(configuration));
    
        // 开启metrics查询服务
        // 以key-value方式返回Flink中已注册的metrics
        final RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(configuration, rpcService.getAddress());
        metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId);
    
        // 创建BlobCache服务
        blobCacheService = new BlobCacheService(
            configuration, highAvailabilityServices.createBlobStore(), null
        );
        
        // 创建外部资源信息Provider
        final ExternalResourceInfoProvider externalResourceInfoProvider =
        ExternalResourceUtils.createStaticExternalResourceInfoProvider(
            ExternalResourceUtils.getExternalResourceAmountMap(configuration),
            ExternalResourceUtils.externalResourceDriversFromConfig(configuration, pluginManager));
    
        // 启动task manager
        // 稍后分析
        taskManager = startTaskManager(
            this.configuration,
            this.resourceId,
            rpcService,
            highAvailabilityServices,
            heartbeatServices,
            metricRegistry,
            blobCacheService,
            false,
            this);
    
        this.terminationFuture = new CompletableFuture<>();
        this.shutdown = false;
    
        MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture);
    }
    

    在分析了层层封装之后,终于找到startTaskManager方法。该方法创建出一个TaskExecutor对象,如下所示:

    public static TaskExecutor startTaskManager(
            Configuration configuration,
            ResourceID resourceID,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            HeartbeatServices heartbeatServices,
            MetricRegistry metricRegistry,
            BlobCacheService blobCacheService,
            boolean localCommunicationOnly,
            ExternalResourceInfoProvider externalResourceInfoProvider,
            FatalErrorHandler fatalErrorHandler) throws Exception {
    
        checkNotNull(configuration);
        checkNotNull(resourceID);
        checkNotNull(rpcService);
        checkNotNull(highAvailabilityServices);
    
        LOG.info("Starting TaskManager with ResourceID: {}", resourceID.getStringWithMetadata());
    
        // 获取外部访问地址
        String externalAddress = rpcService.getAddress();
    
        // 获取task executor资源详情
        // 包含CPU核数,task堆内存,tsk堆外内存和managed内存
        final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
    
        // 获取Task Manager服务配置
        TaskManagerServicesConfiguration taskManagerServicesConfiguration =
            TaskManagerServicesConfiguration.fromConfiguration(
                configuration,
                resourceID,
                externalAddress,
                localCommunicationOnly,
                taskExecutorResourceSpec);
    
        // 创建task manager的MetricGroup
        Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
            metricRegistry,
            externalAddress,
            resourceID,
            taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
    
        // 创建用于IO任务的线程池
        final ExecutorService ioExecutor = Executors.newFixedThreadPool(
            taskManagerServicesConfiguration.getNumIoThreads(),
            new ExecutorThreadFactory("flink-taskexecutor-io"));
    
        // 创建Task Manager服务,是其他多种资源或服务的容器
        // 它涉及的服务也非常多,稍后我们单独分析
        TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
            taskManagerServicesConfiguration,
            blobCacheService.getPermanentBlobService(),
            taskManagerMetricGroup.f1,
            ioExecutor,
            fatalErrorHandler);
    
        // 创建task manager的配置
        TaskManagerConfiguration taskManagerConfiguration =
            TaskManagerConfiguration.fromConfiguration(configuration, taskExecutorResourceSpec, externalAddress);
    
        // 获取metrics查询服务的地址
        String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress();
    
        // 创建task executor
        return new TaskExecutor(
            rpcService,
            taskManagerConfiguration,
            highAvailabilityServices,
            taskManagerServices,
            externalResourceInfoProvider,
            heartbeatServices,
            taskManagerMetricGroup.f0,
            metricQueryServiceAddress,
            blobCacheService,
            fatalErrorHandler,
            new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
            createBackPressureSampleService(configuration, rpcService.getScheduledExecutor()));
    }
    

    到这里为止,一个完整的TaskExecutor创建完毕。创建的过程涉及到了很多相关服务器的初始化,稍后在文末以脑图形式为大家总结。

    TaskManagerServices的fromConfiguration方法

    TaskManagerServices是一系列TaskManager服务的容器,包含内存控制器,IO控制器,Shuffle环境等。各个服务的用途计划在后续博客中介绍。

    这里我们重点关注它的fromConfiguration方法。如下所示:

    public static TaskManagerServices fromConfiguration(
            TaskManagerServicesConfiguration taskManagerServicesConfiguration,
            MetricGroup taskManagerMetricGroup,
            Executor taskIOExecutor) throws Exception {
    
        // pre-start checks
        // 检查temp dir(yarn模式为local_dirs,standalone模式为java.io.tmpdir)目录是否存在,如果不存在会创建文件夹
        // 是否temp dir是否是目录,是否可写入
        checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
    
        // 创建TaskEventDispatcher
        // 任务事件派发器,用于消费任务向生产任务发送TaskEvent
        final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
    
        // 创建异步IO管理器
        // 根据配置的tmp dir的个数,创建对应数量的读写线程,负责异步读写数据
        // start the I/O manager, it will create some temp directories.
        final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
    
        // 创建ShuffleEnvironment
        // 负责在本地提供一个shuffle环境,使用memory segment作为数据存储
        // 可以创建数据写入端ResultPartitionWriter和数据消费端InputGate
        final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
            taskManagerServicesConfiguration,
            taskEventDispatcher,
            taskManagerMetricGroup);
        // 启动ShuffleManager
        // 返回TaskManager的数据端口(taskmanager.data.port)
        final int dataPort = shuffleEnvironment.start();
    
        // 创建key value状态存储服务
        final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
        kvStateService.start();
    
        // 封装task manager连接信息到TaskManagerLocation
        final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
            taskManagerServicesConfiguration.getResourceID(),
            taskManagerServicesConfiguration.getTaskManagerAddress(),
            dataPort);
    
        // 创建广播变量管理器
        final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
    
        // 创建TaskSlotTable,维护task和slot的分配关系
        final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable(
            taskManagerServicesConfiguration.getNumberOfSlots(),
            taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
            taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
            taskManagerServicesConfiguration.getPageSize());
    
        // 创建JobManagerTable,维护JobId和JobManager的对应关系
        final JobManagerTable jobManagerTable = new JobManagerTable();
    
        // 创建Job leader服务。Job leader是领导一个job的job manager。
        // 一旦某个job manager获得leader角色,或者失去leader状态,会通知JobLeaderListener,位于TaskExecutor.java中
        final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());
    
        // 读取本地状态保存根路径
        // taskmanager.state.local.root-dirs
        final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
    
        final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
    
        // 创建目录
        for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
            stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
        }
    
        // 创建任务状态管理器
        final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
            taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
            stateRootDirectoryFiles,
            taskIOExecutor);
    
        // 构建TaskManagerServices
        return new TaskManagerServices(
            taskManagerLocation,
            taskManagerServicesConfiguration.getManagedMemorySize().getBytes(),
            ioManager,
            shuffleEnvironment,
            kvStateService,
            broadcastVariableManager,
            taskSlotTable,
            jobManagerTable,
            jobLeaderService,
            taskStateManager,
            taskEventDispatcher);
    }
    

    TaskManager的启动

    以上只是相关服务的创建逻辑,服务启动的逻辑位于start方法中。

    TaskManagerRunnerstart方法如下所示:

    public void start() throws Exception {  
        // 调用TaskExecutor的start方法
        // 即TaskExecutor父类RpcEndpoint的start方法
        taskManager.start();
    }
    

    RpcEndpoint启动的时候会调用生命周期的onStart方法。
    TaskExecutoronStart方法:

    @Override
    public void onStart() throws Exception {
        try {
            // 启动TaskExecutor服务
            startTaskExecutorServices();
        } catch (Exception e) {
            final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e);
            onFatalError(exception);
            throw exception;
        }
    
        // 启动超时检测
        startRegistrationTimeout();
    }
    

    启动TaskExecutor服务的方法startTaskExecutorServices代码如下:

    private void startTaskExecutorServices() throws Exception {
        try {
            // start by connecting to the ResourceManager
            // 启动leader变更监听服务,如果leader变更会通知ResourceManagerLeaderListener
            resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    
            // tell the task slot table who's responsible for the task slot actions
            // 启动task slot table,参数的SlotAction负责定义释放slot(freeSlot)和slot超时(timeoutSlot)逻辑
            taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
    
            // start the job leader service
            // 启动job leader服务
            // 管理某个job的task manager是这个job的job leader
            // job manager如果有变更,会通知JobLeaderListenerImpl
            jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
    
            // 创建文件缓存
            fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
        } catch (Exception e) {
            handleStartTaskExecutorServicesException(e);
        }
    }
    

    以上是TaskManager完整的创建和启动流程。

    附录

    TaskExecutor中的重要服务脑图

    这些服务的作用在本人后续博客中计划陆续分析。


    TaskExecutor

    相关文章

      网友评论

        本文标题:Flink 源码之TaskManager启动流程

        本文链接:https://www.haomeiwen.com/subject/krdzektx.html