Flink 使用介绍相关文档目录
背景
运行Flink作业一个常见的需求是需要从命令行解析自定义参数。例如:
flink run -t yarn-per-job --detached /path/to/job.jar --input xxx --output xxx
Flink已经内置了参数解析工具ParameterTool
。大家可能会问,Apache也提供了Commons CLI工具,为什么不使用这个?ParameterTool
除了使用更为简便之外,还可以避免Jar依赖冲突问题。具体描述参见:Flink commons-cli no such Method 问题排查。因为Flink内部解析命令行参数使用到了Apache Commons CLI,如果用户作业也使用到Commons CLI,并且引入了差异较大的版本,很可能会出现这个异常。所以建议大家,Flink作业中解析自定义参数一定要使用ParameterTool
,不要使用Commons CLI。
ParameterTool
ParameterTool
使用起来比Commons CLI简单许多。分为两个步骤,读取参数列表和获取参数值。
读取参数列表
ParameterTool
支持从properties文件,命令行,map和系统变量读取参数列表。所有的读取方法都是static的。
- ParameterTool.fromArgs(String[] args):从命令行读取。
--xxx
和-x
两种形式都支持。 - ParameterTool.fromPropertiesFile:从properties文件读取,支持指定path,
File
对象或者是InputStream
三种方式。 - ParameterTool.fromMap(Map<String, String> map):从map读取参数列表。
- ParameterTool.fromSystemProperties():从系统变量读取参数列表。
获取参数值
ParameterTool
拥有get
和getXXX
方法,用于获取不同类型的参数值,其中get
方法获取的参数值为String
类型,其他类型的方法不再一一介绍,看方法名使用即可。这些get
和getXXX
方法还有一个可以指定默认值的重载方法,如果参数列表中不存在该参数,则返回默认值。
除此之外还有一些特殊的方法需要专门介绍。
- getRequired(String key):获取一个String类型的参数值。和
get
不同的地方是要求这个参数值必须存在,否则会抛出RuntimeException
。 - getUnrequestedParameters():获取所有未请求的参数的key。未请求的参数指的是参数列表中存在,但是没有调用
getXXX
方法读取过的参数 - createPropertiesFile:将解析后的参数列表写入properties文件。
- getConfiguration():转换参数列表为Flink的
Configuration
对象。 - getProperties():获取参数填充过后的
Properties
对象。 - getNumberOfParameters():获取参数的个数。
- toMap():转换为
Map
类型。 - mergeWith(ParameterTool other):与另一个
ParameterTool
对象的参数列表合并,返回合并后的ParameterTool
。
使用示例
import org.apache.flink.api.java.utils.ParameterTool
// ...
def main(args: Array[String]): Unit = {
// 从命令行读取参数列表
var params = ParameterTool.fromArgs(args)
// 获取参数值,如果address参数不存在,返回null
println(params.get("address"))
// 指定默认值
println(params.getInt("port", 80))
println(params.getInt("p", 1))
}
在idea中调试从命令行传入参数的操作方法如下:
点击右上角Open 'Edit Run/Debug configurations' dialog
,对话框左侧选择需要运行的项目main class。在右侧的Program arguments文本框点击右侧展开图标,输入命令行参数。如下图所示:
ParameterTool
是可序列化的,这意味着我们还可以将ParameterTool
对象直接传递给TaskManager。除了直接在Stream API算子中调用JobManager中创建出的ParameterTool
对象之外,还可以将其注册为全局作业配置参数:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
然后在算子中使用RichXxxFunction
,从ExecutionConfig
中读取注册过的参数,示例代码如下:
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// .. do more ..
网友评论