美文网首页java
flink 对java8 lambdas表达式的支持

flink 对java8 lambdas表达式的支持

作者: java0110 | 来源:发表于2019-02-17 20:28 被阅读0次

    本文是对https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/java_lambdas.html#java-lambda-expressions的实践

    Map: 对于简单的map(i -> i * i) flink 可以猜测其 类型。复杂的则需要指定return type,或者构造一个MapFunction,或者extends 自 Tuple2<Integer, Integer>。
    flatMap:对于flatMap 的支持是无法猜测出来 类型的,必须通过returns(Types.STRING) 指定具体的返回值类型。

    package myflink.learn.lambda;
    /**
     * Created by:
     * date: 2019-02-17.
     */
    
    import lombok.ToString;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    // https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/java_lambdas.html#java-lambda-expressions
    @Slf4j
    @ToString
    public class LambdaDemo {
        public static void main(String[] args) throws Exception {
            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.fromElements(1, 2, 3).map(i -> i * i).print();
            DataSet<Integer> input = env.fromElements(1, 2, 3);
    
            input.flatMap((Integer number, Collector<String> out) -> {
                StringBuilder builder = new StringBuilder();
                for (int i = 0; i < number; i++) {
                    builder.append("a");
                    out.collect(builder.toString());
                }
            })
                    .returns(Types.STRING)
                    .print();
    
    //        env.fromElements(1, 2, 3)
    //                .map(i -> Tuple2.of(i, i))    // no information about fields of Tuple2
    //                .print(); // Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing.
    //        // In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
    //        // An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface.
    //        // Otherwise the type has to be specified explicitly using type information.
    
            env.fromElements(1, 2, 3)
                    .map(i -> Tuple2.of(i, i))
                    .returns(Types.TUPLE(Types.INT, Types.INT))
                    .print();
    
    
            // 或者 使用一个Mapper
            env.fromElements(1, 2, 3)
                    .map(new MyTuple2Mapper())
                    .print();
    
    
            // 或者 使用一个 匿名类
            env.fromElements(1, 2, 3).map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {
                @Override
                public Tuple2<Integer, Integer> map(Integer value) throws Exception {
                    return Tuple2.of(value, value);
                }
            }).print();
    
            // or in this example use a tuple subclass instead
            env.fromElements(1, 2, 3)
                    .map(i -> new DoubleTuple(i, i))
                    .print();
    
        }
    
        public static class DoubleTuple extends Tuple2<Integer, Integer> {
            // 必须加上 无参构造函数,newInstance 时需要
            /*
            Caused by: java.lang.NoSuchMethodException: myflink.learn.lambda.LambdaDemo$DoubleTuple.<init>()
        at java.lang.Class.getConstructor0(Class.java:3082)
        at java.lang.Class.newInstance(Class.java:412)
             */
            public DoubleTuple() {
    
            }
    
            public DoubleTuple(int f0, int f1) {
                this.f0 = f0;
                this.f1 = f1;
            }
        }
    }
    
    
    

    相关文章

      网友评论

        本文标题:flink 对java8 lambdas表达式的支持

        本文链接:https://www.haomeiwen.com/subject/ohdneqtx.html