美文网首页
flink DataSet

flink DataSet

作者: 程序男保姆 | 来源:发表于2020-08-23 00:59 被阅读0次

    DataSet 开发概述

    DataSource 数据来源

    • readTextFile
     
    public class ReadTextFileDemo {
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    //        DataSource<String> dataSource = env.readTextFile("file:///Users/baidu/Desktop/input");
            DataSource<String> dataSource = env.readTextFile("hdfs://master:9000/user/yuan/input/wc.count");
            dataSource.flatMap(new FlatMapFunction<String, S>() {
                @Override
                public void flatMap(String value, Collector<S> out) throws Exception {
                    Arrays.stream(value.split("")).forEach(o -> {
                        S s = new S();
                        s.setStr(o);
                        out.collect(s);
                    });
                }
            }).print();
    
        }
    
    
        public static class S {
            String str;
    
            public String getStr() {
                return str;
            }
    
            public void setStr(String str) {
                this.str = str;
            }
    
            @Override
            public String toString() {
                return "S{" +
                        "str='" + str + '\'' +
                        '}';
            }
        }
    }
    

    计数器

        public static void main(String[] args) throws Exception {
    
            // ExecutionEnvironment 执行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            DataSource<String> text = env.fromElements("Who's there?",
                    "I think I hear them. Stand, ho! Who's there?");
    
            text
                    .flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
    
                        IntCounter intCounter = new IntCounter(0);
    
                        @Override
                        public void open(Configuration parameters) throws Exception {
                             // 解决多并行度时 计数总数不正确(因为每个并行度就是一个线程 每个线程之前是不共享数据的)
                            getRuntimeContext().addAccumulator("c", intCounter);
                        }
    
                        @Override
                        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                            for (String s : value.split(" ")) {
                                intCounter.add(1);
                                out.collect(new Tuple2<String, Integer>(s, 1));
                            }
                        }
                    })
                    // 设置几个并行度 就写几份文件
                    .setParallelism(4)
                    .writeAsText("file:///Users/baidu/Desktop/222", FileSystem.WriteMode.OVERWRITE);;
    
    
            JobExecutionResult execute = env.execute("abcd");
    
            Object c = execute.getAccumulatorResult("c");
    
            System.out.println("c = " + c);
    
        }
    
    

    Sink

    分布式缓存

    Transformation算子

    • Map
    public class MapDemo {
    
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            DataSource<String> text = env.fromElements("1", "2", "3", "4", "5");
            // map 一对一转换 字符串数值
            text.map(new MapFunction<String, Integer>() {
                public Integer map(String value) throws Exception {
                    return Integer.valueOf(value);
                }
            }).print();
        }
    
    }
    
    • FlatMap
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            DataSource<String> text = env.fromElements("我是一个大傻瓜", "我是一个小朋友");
    
            text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                // 一对多 一行内容转换了好几行
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    for (String s : value.split("")) {
                        out.collect(new Tuple2<String, Integer>(s, 1));
                    }
                }
            }).print();
        }
    
    • MapPartition
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
            data.add(new Tuple2<>(1, "demo1"));
            data.add(new Tuple2<>(1, "demo2"));
            data.add(new Tuple2<>(1, "demo3"));
            data.add(new Tuple2<>(2, "demo4"));
            data.add(new Tuple2<>(2, "demo5"));
            data.add(new Tuple2<>(2, "demo6"));
            data.add(new Tuple2<>(3, "demo7"));
            data.add(new Tuple2<>(3, "demo8"));
            data.add(new Tuple2<>(4, "demo9"));
            data.add(new Tuple2<>(4, "demo10"));
            data.add(new Tuple2<>(4, "demo11"));
            data.add(new Tuple2<>(4, "demo12"));
            data.add(new Tuple2<>(5, "demo13"));
            data.add(new Tuple2<>(5, "demo14"));
            data.add(new Tuple2<>(5, "demo15"));
            data.add(new Tuple2<>(5, "demo16"));
            data.add(new Tuple2<>(5, "demo17"));
            data.add(new Tuple2<>(6, "demo18"));
            data.add(new Tuple2<>(6, "demo19"));
            data.add(new Tuple2<>(6, "demo20"));
            data.add(new Tuple2<>(6, "demo21"));
    
    
            DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);
    
            // 按照数字分区 相同的数据会分到一个分区里
            collection
                    .partitionByRange(0)
                    .mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                        @Override
                        public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
                            values.forEach(o -> {
                                System.out.println("线程id:" + Thread.currentThread().getId() + "," + o);
                            });
                            System.out.println();
                        }
                    })
                    .setParallelism(4)
                    .print();
            System.out.println("partitionByRange");
    
            // 按照hash分区 
            collection
                    .partitionByHash(0)
                    .mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                        @Override
                        public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
                            values.forEach(o -> {
                                System.out.println("线程id:" + Thread.currentThread().getId() + "," + o);
                            });
                            System.out.println();
                        }
                    })
                    .setParallelism(4)
                    .print();
            System.out.println("partitionByHash");
    
            // 按照自定义分区 
            collection
                    .partitionCustom(new Partitioner<Integer>() {
                        @Override
                        public int partition(Integer key, int numPartitions) {
                            System.out.println("key=" + key + "  numPartitions=" + numPartitions);
                            return key % numPartitions;
                        }
                    }, 0)
                    .mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                        @Override
                        public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
                            values.forEach(o -> {
                                System.out.println("线程id:" + Thread.currentThread().getId() + "," + o);
                            });
                            //System.out.println();
                        }
                    })
                    .setParallelism(4)
                    .print();
            System.out.println("partitionCustom");
    
        }
    
    
    • Filter
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            DataSource<Integer> dataSource = env.fromElements(1, 2, 3, 4, 5);
    
            dataSource.filter(new FilterFunction<Integer>() {
                @Override
                public boolean filter(Integer value) throws Exception {
                    return value % 2 == 0;
                }
            }).print();
    
        }
    
    • Projection of Tuple DataSet(元组数据集投影)
      project 转换将删除或移动元组数据集的元组字段。该 project(int…) 方法选择应由其索引保留的元组字段,并在输出元组中定义其顺序。
      project 不需要定义函数体
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            List<Tuple3<Integer, Integer, Integer>> data = new ArrayList<>();
            data.add(new Tuple3<>(1, 2, 3));
            data.add(new Tuple3<>(4, 5, 6));
            data.add(new Tuple3<>(7, 8, 9));
    
            // converts Tuple3<Integer, Integer, Integer> into Tuple2<Integer, Integer> 
            //选出合适的字段且可调换顺序
            env
                    .fromCollection(data)
                    .project(2, 0)
                    .print();
    
        }
    
    执行后的结果
    (3,1)
    (6,4)
    (9,7)
    
    
    • Transformations on Grouped DataSet (分组数据集的转换)
      reduce 操作可以对分组的数据集进行操作。指定用于分组的密钥可以通过多种方式完成:
    关键表达,groupBy("key")
    键选择器功能,implements KeySelector
    一个或多个字段位置键(仅限元组数据集),groupBy(0, 1)
    案例类别字段(仅案例类别),groupBy("a", "b")
    
    • Reduce on Grouped DataSet(减少分组数据集)
      应用于分组数据集的 Reduce 转换使用用户定义的 reduce 函数将每个组简化为单个元素。
      对于每组输入元素,reduce 函数依次将成对的元素组合为一个元素,直到每组只剩下一个元素为止。
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
            data.add(new Tuple2<>(1, "demo1"));
            data.add(new Tuple2<>(1, "demo2"));
            data.add(new Tuple2<>(1, "demo3"));
            data.add(new Tuple2<>(2, "demo4"));
            data.add(new Tuple2<>(2, "demo5"));
            data.add(new Tuple2<>(2, "demo6"));
            data.add(new Tuple2<>(3, "demo7"));
            data.add(new Tuple2<>(3, "demo8"));
            data.add(new Tuple2<>(4, "demo9"));
            data.add(new Tuple2<>(4, "demo10"));
            data.add(new Tuple2<>(4, "demo11"));
            data.add(new Tuple2<>(4, "demo12"));
            data.add(new Tuple2<>(5, "demo13"));
            data.add(new Tuple2<>(5, "demo14"));
            data.add(new Tuple2<>(5, "demo15"));
            data.add(new Tuple2<>(5, "demo16"));
            data.add(new Tuple2<>(5, "demo17"));
            data.add(new Tuple2<>(6, "demo18"));
            data.add(new Tuple2<>(6, "demo19"));
            data.add(new Tuple2<>(6, "demo20"));
            data.add(new Tuple2<>(6, "demo21"));
    
            DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);
    
            collection.groupBy(0).reduce(new ReduceFunction<Tuple2<Integer, String>>() {
                @Override
                public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
                    return new Tuple2<>(value1.f0,value1.f1+" + "+value2.f1);
                }
            }).print();
    
    
        }
    
    
    • GroupReduce on Grouped DataSet(分组数据集上的 GroupReduce)
      应用于分组数据集的 GroupReduce 转换为每个组调用用户定义的 group-reduce 函数。
      此与 Reduce 的区别在于,用户定义的函数可一次获取整个组。该函数在组的所有元素上使用 Iterable 调用,并且可以返回任意数量的结果元素。
      public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
            data.add(new Tuple2<>(1, "demo1"));
            data.add(new Tuple2<>(1, "demo2"));
            data.add(new Tuple2<>(1, "demo3"));
            data.add(new Tuple2<>(2, "demo4"));
            data.add(new Tuple2<>(2, "demo5"));
            data.add(new Tuple2<>(2, "demo6"));
            data.add(new Tuple2<>(3, "demo7"));
            data.add(new Tuple2<>(3, "demo8"));
            data.add(new Tuple2<>(4, "demo9"));
            data.add(new Tuple2<>(4, "demo10"));
            data.add(new Tuple2<>(4, "demo11"));
            data.add(new Tuple2<>(4, "demo12"));
            data.add(new Tuple2<>(5, "demo13"));
            data.add(new Tuple2<>(5, "demo14"));
            data.add(new Tuple2<>(5, "demo15"));
            data.add(new Tuple2<>(5, "demo16"));
            data.add(new Tuple2<>(5, "demo17"));
            data.add(new Tuple2<>(6, "demo18"));
            data.add(new Tuple2<>(6, "demo19"));
            data.add(new Tuple2<>(6, "demo20"));
            data.add(new Tuple2<>(6, "demo21"));
    
            DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);
    
            collection.groupBy(0)
                    .reduceGroup(new RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                        @Override
                        public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
    
                            values.forEach(o -> {
                                if (o.f0 % 2 == 0) {
                                    out.collect(o);
                                }
                            });
                        }
                    })
                    .print();
        }
    
    
    • GroupCombine on a Grouped DataSet 可组合的 GroupReduce 函数
    
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
            data.add(new Tuple2<>(1, "demo1"));
            data.add(new Tuple2<>(1, "demo2"));
            data.add(new Tuple2<>(1, "demo3"));
            data.add(new Tuple2<>(2, "demo4"));
            data.add(new Tuple2<>(2, "demo5"));
            data.add(new Tuple2<>(2, "demo6"));
            data.add(new Tuple2<>(3, "demo7"));
            data.add(new Tuple2<>(3, "demo8"));
            data.add(new Tuple2<>(4, "demo9"));
            data.add(new Tuple2<>(4, "demo10"));
            data.add(new Tuple2<>(4, "demo11"));
            data.add(new Tuple2<>(4, "demo12"));
            data.add(new Tuple2<>(5, "demo13"));
            data.add(new Tuple2<>(5, "demo14"));
            data.add(new Tuple2<>(5, "demo15"));
            data.add(new Tuple2<>(5, "demo16"));
            data.add(new Tuple2<>(5, "demo17"));
            data.add(new Tuple2<>(6, "demo18"));
            data.add(new Tuple2<>(6, "demo19"));
            data.add(new Tuple2<>(6, "demo20"));
            data.add(new Tuple2<>(6, "demo21"));
    
            DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);
    
            collection.groupBy(0)
    //                .reduceGroup(new RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
    //                    @Override
    //                    public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
    //
    //                        values.forEach(o -> {
    //                            if (o.f0 % 2 == 0) {
    //                                out.collect(o);
    //                            }
    //                        });
    //                    }
    //                })
                    .reduceGroup(new GroupReduce())
                    .print();
        }
    
        private static class GroupReduce implements GroupCombineFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>, GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
    
    /**
      * 与 reduce 函数相比,group-reduce 函数不是可隐式组合的。
      * 为了使 group-reduce 函数可组合,它必须实现 GroupCombineFunction 接口。
      *
      * 要点:GroupCombineFunction 接口的通用输入和输出类型必须等于 GroupReduceFunction 的通用输入类型,
      * 如以下示例所示:
      */
            // 实现comblinefunction接口 
            @Override
            public void combine(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
                Tuple2<Integer, String> t = StreamSupport.stream(values.spliterator(), false).reduce((o1, o2) -> {
                    return new Tuple2<>(o1.f0, o1.f1 + " + " + o2.f1);
                }).get();
                out.collect(t);
            }
    
            @Override
            public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
                values.forEach(o->{
                    out.collect(o);
                });
            }
        }
    
    
    • Aggregate on Grouped Tuple DataSet(在分组元组数据集聚合)
      有一些常用的聚合操作。聚合转换提供以下内置聚合功能:
      Sum
      Min, and
      Max

    聚合转换只能应用于元组数据集,并且仅支持用于分组的字段位置键

        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            ArrayList<Tuple2<Integer, Integer>> data = new ArrayList<>();
            data.add(new Tuple2<>(1, 1));
            data.add(new Tuple2<>(1, 2));
            data.add(new Tuple2<>(1, 3));
            data.add(new Tuple2<>(2, 4));
            data.add(new Tuple2<>(2, 5));
            data.add(new Tuple2<>(2, 6));
            data.add(new Tuple2<>(3, 7));
            data.add(new Tuple2<>(3, 8));
            data.add(new Tuple2<>(4, 9));
            data.add(new Tuple2<>(4, 10));
            data.add(new Tuple2<>(4, 11));
            data.add(new Tuple2<>(4, 12));
            data.add(new Tuple2<>(5, 13));
            data.add(new Tuple2<>(5, 14));
            data.add(new Tuple2<>(5, 15));
            data.add(new Tuple2<>(5, 16));
            data.add(new Tuple2<>(5, 17));
            data.add(new Tuple2<>(6, 18));
            data.add(new Tuple2<>(6, 19));
            data.add(new Tuple2<>(6, 20));
            data.add(new Tuple2<>(6, 21));
    
            DataSource<Tuple2<Integer, Integer>> collection = env.fromCollection(data);
    
            collection.groupBy(0)
                    .aggregate(Aggregations.SUM,1)
                    .print();
        }
    
    
    • MinBy / MaxBy on Grouped Tuple DataSet(分组元组数据集上的 MinBy / MaxBy)
      MinBy(MaxBy)转换为每个元组组选择一个元组。
      选定的元组是其一个或多个指定字段的值最小(最大)的元组。用于比较的字段必须是有效的关键字段,即可比较的字段。
      如果多个元组具有最小(最大)字段值,则返回这些元组中的任意元组。
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            ArrayList<Tuple2<Integer, Integer>> data = new ArrayList<>();
            data.add(new Tuple2<>(1, 1));
            data.add(new Tuple2<>(1, 2));
            data.add(new Tuple2<>(1, 3));
            data.add(new Tuple2<>(2, 4));
            data.add(new Tuple2<>(2, 5));
            data.add(new Tuple2<>(2, 6));
            data.add(new Tuple2<>(3, 7));
            data.add(new Tuple2<>(3, 8));
            data.add(new Tuple2<>(4, 9));
            data.add(new Tuple2<>(4, 10));
            data.add(new Tuple2<>(4, 11));
            data.add(new Tuple2<>(4, 12));
            data.add(new Tuple2<>(5, 13));
            data.add(new Tuple2<>(5, 14));
            data.add(new Tuple2<>(5, 15));
            data.add(new Tuple2<>(5, 16));
            data.add(new Tuple2<>(5, 17));
            data.add(new Tuple2<>(6, 18));
            data.add(new Tuple2<>(6, 19));
            data.add(new Tuple2<>(6, 20));
            data.add(new Tuple2<>(6, 21));
    
            DataSource<Tuple2<Integer, Integer>> collection = env.fromCollection(data);
    
            collection.groupBy(0)
                    .maxBy(1)
                    .print();
        }
    
    
    • Reduce on full DataSet减少完整的 DataSet)
      Reduce 转换将用户定义的 reduce 函数应用于 DataSet 的所有元素。reduce 函数随后将成对的元素组合为一个元素,直到仅剩下一个元素为止。
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
            data.add(new Tuple2<>(1, "demo1"));
            data.add(new Tuple2<>(1, "demo2"));
            data.add(new Tuple2<>(1, "demo3"));
            data.add(new Tuple2<>(2, "demo4"));
            data.add(new Tuple2<>(2, "demo5"));
            data.add(new Tuple2<>(2, "demo6"));
            data.add(new Tuple2<>(3, "demo7"));
            data.add(new Tuple2<>(3, "demo8"));
            data.add(new Tuple2<>(4, "demo9"));
            data.add(new Tuple2<>(4, "demo10"));
            data.add(new Tuple2<>(4, "demo11"));
            data.add(new Tuple2<>(4, "demo12"));
            data.add(new Tuple2<>(5, "demo13"));
            data.add(new Tuple2<>(5, "demo14"));
            data.add(new Tuple2<>(5, "demo15"));
            data.add(new Tuple2<>(5, "demo16"));
            data.add(new Tuple2<>(5, "demo17"));
            data.add(new Tuple2<>(6, "demo18"));
            data.add(new Tuple2<>(6, "demo19"));
            data.add(new Tuple2<>(6, "demo20"));
            data.add(new Tuple2<>(6, "demo21"));
    
            DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);
    
            collection
                    .reduce(new ReduceFunction<Tuple2<Integer, String>>() {
                        @Override
                        public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
                            return new Tuple2<>(value1.f0 + value2.f0, value1.f1 + " + " + value2.f1);
                        }
                    }).print();
        }
    
    
    • GroupReduce on full DataSet
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
            data.add(new Tuple2<>(1, "demo1"));
            data.add(new Tuple2<>(1, "demo2"));
            data.add(new Tuple2<>(1, "demo3"));
            data.add(new Tuple2<>(2, "demo4"));
            data.add(new Tuple2<>(2, "demo5"));
            data.add(new Tuple2<>(2, "demo6"));
            data.add(new Tuple2<>(3, "demo7"));
            data.add(new Tuple2<>(3, "demo8"));
          
    
            DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);
    
            collection
    //                .groupBy(0)
                    .reduce(new ReduceFunction<Tuple2<Integer, String>>() {
                        @Override
                        public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
                            return new Tuple2<>(value1.f0 + value2.f0, value1.f1 + " + " + value2.f1);
                        }
                    }).print();
    
    
        }
    
    
    • GroupCombine on a full DataSet(完整数据集上的 GroupCombine)
      完整数据集上的 GroupCombine 与分组数据集上的 GroupCombine 相似。
      数据在所有节点上进行分区,然后以贪婪的方式进行组合(即,仅将适合内存的数据进行一次组合)。
    • Aggregate on full Tuple DataSet(完整数据集上聚合)
      聚合转换只能应用于元组数据集。

    • MinBy / MaxBy on full Tuple DataSet(完整数据集上的 MinBy/MaxBy)
      MinBy(MaxBy)转换从元组的数据集中选择一个元组。
      选定的元组是其一个或多个指定字段的值最小(最大)的元组。
      用于比较的字段必须是有效的关键字段,即可比较的字段。
      如果多个元组具有最小(最大)字段值,则返回这些元组中的任意元组。

    • Distinct(去重)
      Distinct 转换计算源数据集的不同元素的数据集。

    • Join

    • OuterJoin 合并两个数据流 join with

        public static void main(String[] args) throws Exception {
    
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            DataSource<Tuple2<Integer, String>> stringDataSource1 =
                    env.fromElements(
                            new Tuple2(1, "小王"),
                            new Tuple2(2, "小里"),
                            new Tuple2(3, "小张"),
                            new Tuple2(4, "小四")
                    );
    
    
            DataSource<Tuple2<Integer, String>> stringDataSource2 =
                    env.fromElements(
                            new Tuple2(1, "北京"),
                            new Tuple2(2, "上海"),
                            new Tuple2(3, "成都"),
                            new Tuple2(5, "重庆")
                    );
    
    
            stringDataSource1
                    //.leftOuterJoin(stringDataSource2)
                    //.rightOuterJoin(stringDataSource2)
                    //.fullOuterJoin(stringDataSource2)
                    .join(stringDataSource2)
                    // 第一个数据 中 第一个字段
                    .where(0)
                    // 第二个数据 中 第一个字段
                    .equalTo(0)
                    .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
                        @Override
                        public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
                            if (first == null) {
                                return new Tuple3<>(second.f0, "-", second.f1);
                            } else if (second == null) {
                                return new Tuple3<>(first.f0, first.f1, "-");
                            } else {
                                return new Tuple3<>(first.f0, first.f1, second.f1);
                            }
                        }
                    })
                    .print();
        }
    
    
    • Cross
      Cross 转换将两个数据集组合为一个数据集。它构建两个输入数据集的元素的所有成对组合,即构建笛卡尔积。
      Cross 转换要么在每对元素上调用用户定义的交叉函数,要么输出 Tuple2
        public static void main(String[] args) throws Exception {
    
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            DataSource<Tuple1<Integer>> stringDataSource1 =
                    env.fromElements(
                            new Tuple1(1),
                            new Tuple1(2)
                    );
    
            DataSource<Tuple1<String>> stringDataSource2 =
                    env.fromElements(
                            new Tuple1("北京"),
                            new Tuple1("上海"),
                            new Tuple1("重庆")
                    );
    
            stringDataSource1
                    .cross(stringDataSource2)
                    .with(new CrossFunction<Tuple1<Integer>, Tuple1<String>, Tuple2<Integer, String>>() {
                        @Override
                        public Tuple2<Integer, String> cross(Tuple1<Integer> val1, Tuple1<String> val2) throws Exception {
                            return new Tuple2(val1.f0, val2.f0);
                        }
                    }).print();
        }
    
    (1,北京)
    (1,上海)
    (1,重庆)
    (2,北京)
    (2,上海)
    (2,重庆)
    
    
    • CoGroup 与join不同的是 join是一对一有多少相同的数据就产出多少条。 cogroup是多对多 会按照key分组
      CoGroup 转换共同处理两个数据集的组。两个数据集都分组在一个定义的键上,并且两个共享相同键的数据集的组一起交给用户定义的 co-group function。
      如果对于一个特定的键,只有一个 DataSet 有一个组,则使用该组和一个空组调用共同组功能。协同功能可以分别迭代两个组的元素并返回任意数量的结果元素。

    与 Reduce,GroupReduce 和 Join 相似,可以使用不同的键选择方法来定义键。

        public static void main(String[] args) throws Exception {
    
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            DataSource<Tuple2<Integer, String>> stringDataSource1 =
                    env.fromElements(
                            new Tuple2(1, "小王"),
                            new Tuple2(1, "小周"),
                            new Tuple2(2, "小里"),
                            new Tuple2(3, "小张"),
                            new Tuple2(4, "小四")
                    );
    
    
            DataSource<Tuple2<Integer, String>> stringDataSource2 =
                    env.fromElements(
                            new Tuple2(1, "北京"),
                            new Tuple2(1, "邯郸"),
                            new Tuple2(2, "上海"),
                            new Tuple2(3, "成都"),
                            new Tuple2(5, "重庆")
                    );
    
    
            stringDataSource1
                    .coGroup(stringDataSource2)
                    // 第一个数据 中 第一个字段
                    .where(0)
                    // 第二个数据 中 第一个字段
                    .equalTo(0)
                    .with(new CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                        @Override
                        public void coGroup(Iterable<Tuple2<Integer, String>> first, Iterable<Tuple2<Integer, String>> second, Collector<Tuple2<Integer, String>> out) throws Exception {
                            Tuple2<Integer, String> t1 = StreamSupport.stream(first.spliterator(), false).reduce((o1, o2) -> {
                                return new Tuple2<>(o1.f0, o1.f1 + " + " + o2.f1);
                            }).orElse(null);
                            Tuple2<Integer, String> t2 = StreamSupport.stream(second.spliterator(), false).reduce((o1, o2) -> {
                                return new Tuple2<>(o1.f0, o1.f1 + " + " + o2.f1);
                            }).orElse(null);
                            if (t1 != null && t2 != null) {
                                out.collect(new Tuple2<>(t1.f0, t1.f1 + " + " + t2.f1));
                            }
                        }
                    })
                    .print();
        }
    (3,小张 + 成都)
    (1,小王 + 小周 + 北京 + 邯郸)
    (2,小里 + 上海)
    
    
    • Union 合并两个以上相同类型的数据集
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            DataSource<Tuple2<Integer, String>> stringDataSource1 =
                    env.fromElements(
                            new Tuple2(1, "小王"),
                            new Tuple2(1, "小周"),
                            new Tuple2(2, "小里"),
                            new Tuple2(3, "小张"),
                            new Tuple2(4, "小四")
                    );
    
    
            DataSource<Tuple2<Integer, String>> stringDataSource2 =
                    env.fromElements(
                            new Tuple2(1, "北京"),
                            new Tuple2(2, "上海"),
                            new Tuple2(3, "成都"),
                            new Tuple2(5, "重庆")
                    );
    
            stringDataSource1.union(stringDataSource2)
                    .print();
        }
    (1,小王)
    (4,小四)
    (1,北京)
    (1,小周)
    (2,上海)
    (2,小里)
    (3,小张)
    (3,小张)
    (5,重庆)
    
    • Rebalance(重新平衡)
      均匀地重新平衡数据集的并行分区,以消除数据偏斜。
      重要:此操作会通过网络重新整理整个 DataSet。可能会花费大量时间。

    • Hash-Partition (哈希分区) 详见mapPartition
      重要:此操作会通过网络重新整理整个 DataSet。可能会花费大量时间。

    • Range-Partition(范围分区)
      重要:此操作需要在 DataSet 上额外传递一次以计算范围,通过网络对整个 DataSet 进行边界划分和改组。这会花费大量时间。

    • Sort Partition(分区排序)
      以指定顺序对指定字段上的 DataSet 的所有分区进行本地排序。可以将字段指定为字段表达式或字段位置。
      可以通过链接 sortPartition() 调用在多个字段上对分区进行排序。

    • First-n(前 n 个(任意)元素)
      返回数据集的前 n 个(任意)元素。First-n 可以应用于常规数据集,分组的数据集或分组排序的数据集。可以将分组键指定为键选择器功能或字段位置键。

    相关文章

      网友评论

          本文标题:flink DataSet

          本文链接:https://www.haomeiwen.com/subject/fkmbjktx.html