Flink 获取配置的途径

作者: AlienPaul | 来源:发表于2024-01-02 14:17 被阅读0次

    前言

    Flink配合Hadoop使用的时候获取配置文件的方式非常之多,官网没有统一的总结。本篇将这些获取配置的方法梳理总结是为了:

    1. 掌握多种指定Flink Hadoop配置的方式。
    2. 对于一个较乱的环境,Flink无法正确读取Hadoop配置的时候,提供一个排查问题思路。

    获取Flink conf目录

    Flink获取conf目录(Flink配置)的顺序:

    1. 查找FLINK_CONF_DIR环境变量。
    2. 查找../conf目录。
    3. 查找conf目录。

    代码位于CliFrontendgetConfigurationDirectoryFromEnv方法:

    public static String getConfigurationDirectoryFromEnv() {
        // 从FLINK_CONF_DIR环境变量获取conf路径
        String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
    
        if (location != null) {
            if (new File(location).exists()) {
                return location;
            } else {
                throw new RuntimeException(
                        "The configuration directory '"
                                + location
                                + "', specified in the '"
                                + ConfigConstants.ENV_FLINK_CONF_DIR
                                + "' environment variable, does not exist.");
            }
        } else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
            // 尝试查找../conf目录是否存在
            location = CONFIG_DIRECTORY_FALLBACK_1;
        } else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
            // 尝试查找conf目录是否存在
            location = CONFIG_DIRECTORY_FALLBACK_2;
        } else {
            throw new RuntimeException(
                    "The configuration directory was not specified. "
                            + "Please specify the directory containing the configuration file through the '"
                            + ConfigConstants.ENV_FLINK_CONF_DIR
                            + "' environment variable.");
        }
        return location;
    }
    

    获取log4j配置文件

    YarnLogConfigUtildiscoverLogConfigFile方法从flink配置文件目录中查找log4j.propertieslogback.xml。如果两者都存在,优先使用log4j.properties

    private static Optional<File> discoverLogConfigFile(final String configurationDirectory) {
        Optional<File> logConfigFile = Optional.empty();
    
        // 从Flink配置文件目录中查找log4j.properties文件
        final File log4jFile =
                new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
        if (log4jFile.exists()) {
            logConfigFile = Optional.of(log4jFile);
        }
    
        // 从Flink配置文件目录中查找logback.xml文件
        final File logbackFile =
                new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
        if (logbackFile.exists()) {
            if (logConfigFile.isPresent()) {
                // 如果两个配置文件都存在,打印告警
                LOG.warn(
                        "The configuration directory ('"
                                + configurationDirectory
                                + "') already contains a LOG4J config file."
                                + "If you want to use logback, then please delete or rename the log configuration file.");
            } else {
                logConfigFile = Optional.of(logbackFile);
            }
        }
        return logConfigFile;
    }
    

    Kerberos相关配置

    KerberosUtilsstatic方法中。可以使用KRB5CCNAME环境变量,指定Flink Kerberos认证使用的ticket cache路径。

    String ticketCache = System.getenv("KRB5CCNAME");
    if (ticketCache != null) {
        if (IBM_JAVA) {
            System.setProperty("KRB5CCNAME", ticketCache);
        } else {
            kerberosCacheOptions.put("ticketCache", ticketCache);
        }
    }
    

    KerberosLoginProviderdoLoginAndReturnUGI使用了KRB5PRINCIPAL环境变量指定principal。代码如下所示:

    public UserGroupInformation doLoginAndReturnUGI() throws IOException {
        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
    
        if (principal != null) {
            LOG.info(
                    "Attempting to login to KDC using principal: {} keytab: {}", principal, keytab);
            UserGroupInformation ugi =
                    UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
            LOG.info("Successfully logged into KDC");
            return ugi;
        } else if (!HadoopUserUtils.isProxyUser(currentUser)) {
            LOG.info("Attempting to load user's ticket cache");
            final String ccache = System.getenv("KRB5CCNAME");
            final String user =
                    Optional.ofNullable(System.getenv("KRB5PRINCIPAL"))
                            .orElse(currentUser.getUserName());
            UserGroupInformation ugi = UserGroupInformation.getUGIFromTicketCache(ccache, user);
            LOG.info("Loaded user's ticket cache successfully");
            return ugi;
        } else {
            throwProxyUserNotSupported();
            return currentUser;
        }
    }
    

    Hadoop相关配置

    读取Hadoop配置文件的流程位于HadoopUtilsgetHadoopConfiguration方法。流程如下:

    1. 从class path中读取hdfs-default.xml和hdfs-site.xml文件。
    2. $HADOOP_HOME/conf$HADOOP_HOME/etc/hadoop中读取。
    3. 从Flink配置文件中fs.hdfs.hdfsdefaultfs.hdfs.hdfssitefs.hdfs.hadoopconf配置读取。此方法已废弃不建议使用。
    4. HADOOP_CONF_DIR读取。
    5. 从Flink配置文件读取flink.hadoop.开头的配置作为Hadoop的配置项。
    @SuppressWarnings("deprecation")
    public static Configuration getHadoopConfiguration(
            org.apache.flink.configuration.Configuration flinkConfiguration) {
    
        // Instantiate an HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml
        // from the classpath
        // 从classpath中读取hdfs-default.xml和hdfs-site.xml文件
        Configuration result = new HdfsConfiguration();
        boolean foundHadoopConfiguration = false;
    
        // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
        // the hdfs configuration.
        // The properties of a newly added resource will override the ones in previous resources, so
        // a configuration
        // file with higher priority should be added later.
    
        // Approach 1: HADOOP_HOME environment variables
        String[] possibleHadoopConfPaths = new String[2];
    
        // 从HADOOP_HOME环境变量获取
        // $HADOOP_HOME/conf和$HADOOP_HOME/etc/hadoop
        final String hadoopHome = System.getenv("HADOOP_HOME");
        if (hadoopHome != null) {
            LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: {}", hadoopHome);
            possibleHadoopConfPaths[0] = hadoopHome + "/conf";
            possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
        }
    
        for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
            if (possibleHadoopConfPath != null) {
                foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
            }
        }
    
        // Approach 2: Flink configuration (deprecated)
        // 从Flink配置文件中fs.hdfs.hdfsdefault配置项获取
        final String hdfsDefaultPath =
                flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
        if (hdfsDefaultPath != null) {
            result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
            LOG.debug(
                    "Using hdfs-default configuration-file path from Flink config: {}",
                    hdfsDefaultPath);
            foundHadoopConfiguration = true;
        }
    
        // 从Flink配置文件中fs.hdfs.hdfssite配置项获取
        final String hdfsSitePath =
                flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
        if (hdfsSitePath != null) {
            result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
            LOG.debug(
                    "Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
            foundHadoopConfiguration = true;
        }
    
        // 从Flink配置文件中fs.hdfs.hadoopconf配置项获取
        final String hadoopConfigPath =
                flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
        if (hadoopConfigPath != null) {
            LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
            foundHadoopConfiguration =
                    addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
        }
    
        // Approach 3: HADOOP_CONF_DIR environment variable
        // 从HADOOP_CONF_DIR环境变量获取
        String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
        if (hadoopConfDir != null) {
            LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir);
            foundHadoopConfiguration =
                    addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
        }
    
        // Approach 4: Flink configuration
        // add all configuration key with prefix 'flink.hadoop.' in flink conf to hadoop conf
        // 读取Flink配置文件中flink.hadoop.开头的配置项,去掉该前缀的内容作为key放入configuration
        for (String key : flinkConfiguration.keySet()) {
            for (String prefix : FLINK_CONFIG_PREFIXES) {
                if (key.startsWith(prefix)) {
                    String newKey = key.substring(prefix.length());
                    String value = flinkConfiguration.getString(key, null);
                    result.set(newKey, value);
                    LOG.debug(
                            "Adding Flink config entry for {} as {}={} to Hadoop config",
                            key,
                            newKey,
                            value);
                    foundHadoopConfiguration = true;
                }
            }
        }
    
        // 如果以上途径均未发现Hadoop配置,打印警告
        if (!foundHadoopConfiguration) {
            LOG.warn(
                    "Could not find Hadoop configuration via any of the supported methods "
                            + "(Flink configuration, environment variables).");
        }
    
        return result;
    }
    

    Hadoop class path配置

    构建Flink Hadoop classpath的相关代码位于config.sh

    INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
    

    可使用export HADOOP_CLASSPATH=xxx方式为Flink指定Hadoop的class path。

    通过这行代码还可以得知Flink的class path包含HADOOP_CONF_DIRYARN_CONF_DIR,分别对应Hadoop和Yarn配置文件目录。可以通过HADOOP_CONF_DIRYARN_CONF_DIR分别指定Hadoop和Yarn的配置文件路径。

    Yarn相关配置

    YarnClusterDescriptor::isReadyForDeployment方法中检测HADOOP_CONF_DIRYARN_CONF_DIR。如果HADOOP_CONF_DIR或者YARN_CONF_DIR都没有配置,打印警告。

    // check if required Hadoop environment variables are set. If not, warn user
    if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
        LOG.warn(
                "Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. "
                        + "The Flink YARN Client needs one of these to be set to properly load the Hadoop "
                        + "configuration for accessing YARN.");
    }
    

    一般来说hadoop classpath命令的返回结果中包含了Hadoop的conf目录,所以没有指定HADOOP_CONF_DIRYARN_CONF_DIR也不影响Flink作业访问HDFS和提交Yarn集群。

    Yarn配置文件的读取逻辑为:

    1. 从class path中读取yarn-site.xmlyarn-default.xml。因为HADOOP_CLASSPATHHADOOP_CONF_DIRYARN_CONF_DIR环境变量都可以修改Flink Hadoop的class path,这三个环境变量的配置均会影响此步骤。
    2. 通过Flink配置文件中flink.yarn开头的配置指定Yarn配置项。

    下面是源代码分析。

    提交Yarn作业之前通过Utils::getYarnAndHadoopConfiguration方法读取Hadoop和Yarn的配置文件。

    public static YarnConfiguration getYarnAndHadoopConfiguration(
            org.apache.flink.configuration.Configuration flinkConfig) {
        final YarnConfiguration yarnConfig = getYarnConfiguration(flinkConfig);
        // 获取Hadoop配置,前面已分析过
        yarnConfig.addResource(HadoopUtils.getHadoopConfiguration(flinkConfig));
    
        return yarnConfig;
    }
    

    UtilsgetYarnConfiguration方法从Flink配置文件中读取flink.yarn开头的配置,去掉前缀之后作为key放入configuration。

    public static YarnConfiguration getYarnConfiguration(
            org.apache.flink.configuration.Configuration flinkConfig) {
        // 从class path中获取yarn-default.xml和yarn-site.xml文件
        final YarnConfiguration yarnConfig = new YarnConfiguration();
    
        for (String key : flinkConfig.keySet()) {
            for (String prefix : FLINK_CONFIG_PREFIXES) {
                if (key.startsWith(prefix)) {
                    String newKey = key.substring("flink.".length());
                    String value = flinkConfig.getString(key, null);
                    yarnConfig.set(newKey, value);
                    LOG.debug(
                            "Adding Flink config entry for {} as {}={} to Yarn config",
                            key,
                            newKey,
                            value);
                }
            }
        }
    
        return yarnConfig;
    }
    

    附录:日志不打印问题排查

    检查的顺序为:

    检查$FLINK_HOME/conf/log4j*.properties配置文件是否正确。

    检查$FLINK_HOME/lib/中的日志相关jar包是否存在或是否冲突。

    检查打包进作业内的日志相关jar包,需要都排除使用Flink框架自带的日志依赖库。

    相关文章

      网友评论

        本文标题:Flink 获取配置的途径

        本文链接:https://www.haomeiwen.com/subject/iqipndtx.html