美文网首页
flink returns 设置返回类型

flink returns 设置返回类型

作者: 良人与我 | 来源:发表于2019-04-24 10:42 被阅读0次

Flink 支持 lambda表达式,但是用到泛型时候,需要明确声明类型。

public class FlinkFirstDemo {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.fromElements("hello everyone","how are you")
                .flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public void flatMap(String value, Collector<String> out)  {
                        for (String s : value.split(" ")) {
                            out.collect(s);
                        }
                    }
                })
                .print();
        System.out.println("hello");

    }
}

输出结果

hello
everyone
how
are
you
hello

通过idea的提示 将匿名每部类,用lambda表达式替换



代码如下

public class FlinkFirstDemo {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.fromElements("hello everyone","how are you")
                .flatMap((FlatMapFunction<String, String>) (value, out) -> {
                    for (String s : value.split(" ")) {
                        out.collect(s);
                    }
                })
                .print();
        System.out.println("hello");
    }
}

运行报错

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkFirstDemo.java:22)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
    at org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
    at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
    at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
    at com.river.FlinkFirstDemo.main(FlinkFirstDemo.java:27)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.

推断不出来 返回的集合是string的泛型。
只能通过 .returns(Types.STRING) 明确告诉它返回类型。

public class FlinkFirstDemo {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.fromElements("hello everyone","how are you")
                .flatMap((FlatMapFunction<String, String>) (value, out) -> {
                    for (String s : value.split(" ")) {
                        out.collect(s);
                    }
                })
                .returns(Types.STRING)
                .print();
        System.out.println("hello");
    }
}

这样有点不爽哦。

官网也给出了解释。Flink 支持 lambda表达式,但是用到泛型时候,需要明确声明类型。

Java 8 introduced several new language features designed for faster and clearer coding. With the most important feature, the so-called “Lambda Expressions”, it opened the door to functional programming. Lambda expressions allow for implementing and passing functions in a straightforward way without having to declare additional (anonymous) classes.

Attention Flink supports the usage of lambda expressions for all operators of the Java API, however, whenever a lambda expression uses Java generics you need to declare type information explicitly.

image.png

参考地址
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/java_lambdas.html

相关文章

网友评论

      本文标题:flink returns 设置返回类型

      本文链接:https://www.haomeiwen.com/subject/fluwgqtx.html