美文网首页玩转大数据JavaFlink学习指南
Flink 使用之动态Kerberos认证

Flink 使用之动态Kerberos认证

作者: AlienPaul | 来源:发表于2023-12-20 10:23 被阅读0次

    Flink 使用介绍相关文档目录

    Flink 使用介绍相关文档目录

    背景

    Flink Kerberos认证的配置位于YAML配置文件中,带来的便利是用户不用反复去配置认证信息。但这样使用有局限性。用户如果需要频繁切换不同的用户提交任务,是否可以像spark-submit那样通过--principal--keytab参数来临时指定用户呢?答案是可以的。

    使用方式

    Flink 1.17版本支持使用-D 配置项的方式在提交任务时指定认证配置。例如:

    ./flink run-application -t yarn-application -D security.kerberos.login.keytab=/etc/security/keytabs/hdfs.headless.keytab -D security.kerberos.login.principal=hdfs@PAUL.COM ../examples/batch/WordCount.jar
    

    其中-D 配置项的内容和YAML配置文件中的参数名相同。

    需要注意的是,经本人验证Flink 1.13.2 和Flink 1.15.4版本存在问题,无法使用此方式。

    经查询社区,和修复此问题相关联的Issue为:

    • [FLINK-29435][client] SecurityConfiguration supports dynamic configuration
    • [FLINK-31321][Deployment/YARN] Yarn-session mode, securityConfiguration supports dynamic configuration

    如果需要在1.13.x和1.15.x版本中修复,可以将这两个commit cherrypick过去重新编译解决。

    除此之外还有一个备用方法:动态指定Kerberos Cache。该方法对于Flink 1.13.x和1.15.x可用。

    kinit hdfs@PAUL.COM -kt /etc/security/keytabs/hdfs.headless.keytab -c /opt/paul/krb_cache
    export KRB5CCNAME=/opt/paul/krb_cache && ./flink run -m yarn-cluster ../examples/batch/WordCount.jar
    

    kinit命令可以通过-c参数指定自定义的cache文件路径。在Flink使用的时候将其配置到KRB5CCNAME环境变量对应起来。不同用户使用不同的cache文件可避免相互影响。

    KRB5CCNAME环境变量仅在当前shell中生效,不会干扰其他的shell提交作业。

    源代码分析

    security config动态参数解析部分

    CliFrontend::mainInternal方法try块内容:

    final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
    CommandLine commandLine =
            cli.getCommandLine(
                    new Options(),
                    Arrays.copyOfRange(args, min(args.length, 1), args.length),
                    true);
    Configuration securityConfig = new Configuration(cli.configuration);
    // 获取命令行中-D key=value形式的动态参数,将其合并入securityConfig(Configuration类型)中
    DynamicPropertiesUtil.encodeDynamicProperties(commandLine, securityConfig);
    // 生成安全相关配置(Hadoop认证还是JAAS)
    SecurityUtils.install(new SecurityConfiguration(securityConfig));
    // 执行认证过程
    retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
    

    上面生成安全相关配置和执行认证过程参见Flink 源码之安全认证

    动态指定Kerberos Cache

    相关源代码位于:KerberosUtils::static方法。相关代码片段为:

    // 获取系统环境变量KRB5CCNAME
    String ticketCache = System.getenv("KRB5CCNAME");
    if (ticketCache != null) {
        if (IBM_JAVA) {
            System.setProperty("KRB5CCNAME", ticketCache);
        } else {
            // 添加到kerberosCacheOptions中
            kerberosCacheOptions.put("ticketCache", ticketCache);
        }
    }
    

    相关文章

      网友评论

        本文标题:Flink 使用之动态Kerberos认证

        本文链接:https://www.haomeiwen.com/subject/gfvtndtx.html