美文网首页玩转大数据
Flink 使用之 ParameterTool

Flink 使用之 ParameterTool

作者: AlienPaul | 来源:发表于2022-02-08 09:45 被阅读0次

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

背景

运行Flink作业一个常见的需求是需要从命令行解析自定义参数。例如:

flink run -t yarn-per-job --detached /path/to/job.jar --input xxx --output xxx

Flink已经内置了参数解析工具ParameterTool。大家可能会问,Apache也提供了Commons CLI工具,为什么不使用这个?ParameterTool除了使用更为简便之外,还可以避免Jar依赖冲突问题。具体描述参见:Flink commons-cli no such Method 问题排查。因为Flink内部解析命令行参数使用到了Apache Commons CLI,如果用户作业也使用到Commons CLI,并且引入了差异较大的版本,很可能会出现这个异常。所以建议大家,Flink作业中解析自定义参数一定要使用ParameterTool,不要使用Commons CLI

ParameterTool

ParameterTool使用起来比Commons CLI简单许多。分为两个步骤,读取参数列表和获取参数值。

读取参数列表

ParameterTool支持从properties文件,命令行,map和系统变量读取参数列表。所有的读取方法都是static的。

  • ParameterTool.fromArgs(String[] args):从命令行读取。--xxx-x两种形式都支持。
  • ParameterTool.fromPropertiesFile:从properties文件读取,支持指定path,File对象或者是InputStream三种方式。
  • ParameterTool.fromMap(Map<String, String> map):从map读取参数列表。
  • ParameterTool.fromSystemProperties():从系统变量读取参数列表。

获取参数值

ParameterTool拥有getgetXXX方法,用于获取不同类型的参数值,其中get方法获取的参数值为String类型,其他类型的方法不再一一介绍,看方法名使用即可。这些getgetXXX方法还有一个可以指定默认值的重载方法,如果参数列表中不存在该参数,则返回默认值。

除此之外还有一些特殊的方法需要专门介绍。

  • getRequired(String key):获取一个String类型的参数值。和get不同的地方是要求这个参数值必须存在,否则会抛出RuntimeException
  • getUnrequestedParameters():获取所有未请求的参数的key。未请求的参数指的是参数列表中存在,但是没有调用getXXX方法读取过的参数
  • createPropertiesFile:将解析后的参数列表写入properties文件。
  • getConfiguration():转换参数列表为Flink的Configuration对象。
  • getProperties():获取参数填充过后的Properties对象。
  • getNumberOfParameters():获取参数的个数。
  • toMap():转换为Map类型。
  • mergeWith(ParameterTool other):与另一个ParameterTool对象的参数列表合并,返回合并后的ParameterTool

使用示例

import org.apache.flink.api.java.utils.ParameterTool
// ...
def main(args: Array[String]): Unit = {
    // 从命令行读取参数列表
    var params = ParameterTool.fromArgs(args)

    // 获取参数值,如果address参数不存在,返回null
    println(params.get("address"))
    // 指定默认值
    println(params.getInt("port", 80))
    println(params.getInt("p", 1))
}

在idea中调试从命令行传入参数的操作方法如下:
点击右上角Open 'Edit Run/Debug configurations' dialog,对话框左侧选择需要运行的项目main class。在右侧的Program arguments文本框点击右侧展开图标,输入命令行参数。如下图所示:

idea调试配置命令行传入参数方式示意图

ParameterTool是可序列化的,这意味着我们还可以将ParameterTool对象直接传递给TaskManager。除了直接在Stream API算子中调用JobManager中创建出的ParameterTool对象之外,还可以将其注册为全局作业配置参数:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

然后在算子中使用RichXxxFunction,从ExecutionConfig中读取注册过的参数,示例代码如下:

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    ParameterTool parameters = (ParameterTool)
        getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
    parameters.getRequired("input");
    // .. do more ..

参考文献

Handling Application Parameters | Apache Flink

相关文章

网友评论

    本文标题:Flink 使用之 ParameterTool

    本文链接:https://www.haomeiwen.com/subject/ndhbkrtx.html