美文网首页
flink function类传递参数(Passing Para

flink function类传递参数(Passing Para

作者: MinaLing | 来源:发表于2019-03-29 14:04 被阅读0次

在flink的function类中传递参数,对于flink datastream和dataset是不同的,对于dataset,可以通过类构造函数、withParameters(Configuration)、全局参数、广播变量等方法,详细参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/。对于datastream,常用的是类构造函数、ParameterTool,参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/best_practices.html
下面一一进行介绍。
一、类构造函数
这个方法就是在要传递参数的方法所在类中,增加待参数的构造方法。

public class TestFlatMap extends RichFlatMapFunction<IN, OUT> {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    
    private String dc;
    
    public TestFlatMap (String dc) {
        // TODO Auto-generated constructor stub
        this.dc = dc;
    }

    @Override
    public void flatMap(IN value, Collector<OUT> out) throws Exception {
        // TODO Auto-generated method stub
        try {
//          System.out.println(value);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            System.out.println("flat map error, " + value);
        }
    }

}

二、ParameterTool
注册全局变量

ParameterTool parameters = ParameterTool.fromArgs(args);

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

在rich function中使用

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    ParameterTool parameters = (ParameterTool)
        getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
    parameters.getRequired("input");
    // .. do more ..
      }
}

三、withParameters(Configuration)

public class TestMap extends RichMapFunction<IN, OUT> {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    
    private String dc;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        dc = parameters.getString("param", "");
    }

    @Override
    public void flatMap(IN value, Collector<OUT> out) throws Exception {
        // TODO Auto-generated method stub
        try {
//          System.out.println(value);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            System.out.println("flat map error, " + value);
        }
    }

}

外层引用

DataSet<Integer> toMap = env.fromElements(1, 2, 3);

Configuration config = new Configuration();
config.setInteger("param", "test");

toMap.map(new TestMap()).withParameters(config);

相关文章

网友评论

      本文标题:flink function类传递参数(Passing Para

      本文链接:https://www.haomeiwen.com/subject/skclbqtx.html