美文网首页
flink DataSet

flink DataSet

作者: 程序男保姆 | 来源:发表于2020-08-23 00:59 被阅读0次

DataSet 开发概述

DataSource 数据来源

  • readTextFile
 
public class ReadTextFileDemo {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//        DataSource<String> dataSource = env.readTextFile("file:///Users/baidu/Desktop/input");
        DataSource<String> dataSource = env.readTextFile("hdfs://master:9000/user/yuan/input/wc.count");
        dataSource.flatMap(new FlatMapFunction<String, S>() {
            @Override
            public void flatMap(String value, Collector<S> out) throws Exception {
                Arrays.stream(value.split("")).forEach(o -> {
                    S s = new S();
                    s.setStr(o);
                    out.collect(s);
                });
            }
        }).print();

    }


    public static class S {
        String str;

        public String getStr() {
            return str;
        }

        public void setStr(String str) {
            this.str = str;
        }

        @Override
        public String toString() {
            return "S{" +
                    "str='" + str + '\'' +
                    '}';
        }
    }
}

计数器

    public static void main(String[] args) throws Exception {

        // ExecutionEnvironment 执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> text = env.fromElements("Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        text
                .flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {

                    IntCounter intCounter = new IntCounter(0);

                    @Override
                    public void open(Configuration parameters) throws Exception {
                         // 解决多并行度时 计数总数不正确(因为每个并行度就是一个线程 每个线程之前是不共享数据的)
                        getRuntimeContext().addAccumulator("c", intCounter);
                    }

                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        for (String s : value.split(" ")) {
                            intCounter.add(1);
                            out.collect(new Tuple2<String, Integer>(s, 1));
                        }
                    }
                })
                // 设置几个并行度 就写几份文件
                .setParallelism(4)
                .writeAsText("file:///Users/baidu/Desktop/222", FileSystem.WriteMode.OVERWRITE);;


        JobExecutionResult execute = env.execute("abcd");

        Object c = execute.getAccumulatorResult("c");

        System.out.println("c = " + c);

    }

Sink

分布式缓存

Transformation算子

  • Map
public class MapDemo {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> text = env.fromElements("1", "2", "3", "4", "5");
        // map 一对一转换 字符串数值
        text.map(new MapFunction<String, Integer>() {
            public Integer map(String value) throws Exception {
                return Integer.valueOf(value);
            }
        }).print();
    }

}
  • FlatMap
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> text = env.fromElements("我是一个大傻瓜", "我是一个小朋友");

        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            // 一对多 一行内容转换了好几行
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                for (String s : value.split("")) {
                    out.collect(new Tuple2<String, Integer>(s, 1));
                }
            }
        }).print();
    }
  • MapPartition
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
        data.add(new Tuple2<>(4, "demo9"));
        data.add(new Tuple2<>(4, "demo10"));
        data.add(new Tuple2<>(4, "demo11"));
        data.add(new Tuple2<>(4, "demo12"));
        data.add(new Tuple2<>(5, "demo13"));
        data.add(new Tuple2<>(5, "demo14"));
        data.add(new Tuple2<>(5, "demo15"));
        data.add(new Tuple2<>(5, "demo16"));
        data.add(new Tuple2<>(5, "demo17"));
        data.add(new Tuple2<>(6, "demo18"));
        data.add(new Tuple2<>(6, "demo19"));
        data.add(new Tuple2<>(6, "demo20"));
        data.add(new Tuple2<>(6, "demo21"));


        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        // 按照数字分区 相同的数据会分到一个分区里
        collection
                .partitionByRange(0)
                .mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
                        values.forEach(o -> {
                            System.out.println("线程id:" + Thread.currentThread().getId() + "," + o);
                        });
                        System.out.println();
                    }
                })
                .setParallelism(4)
                .print();
        System.out.println("partitionByRange");

        // 按照hash分区 
        collection
                .partitionByHash(0)
                .mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
                        values.forEach(o -> {
                            System.out.println("线程id:" + Thread.currentThread().getId() + "," + o);
                        });
                        System.out.println();
                    }
                })
                .setParallelism(4)
                .print();
        System.out.println("partitionByHash");

        // 按照自定义分区 
        collection
                .partitionCustom(new Partitioner<Integer>() {
                    @Override
                    public int partition(Integer key, int numPartitions) {
                        System.out.println("key=" + key + "  numPartitions=" + numPartitions);
                        return key % numPartitions;
                    }
                }, 0)
                .mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
                        values.forEach(o -> {
                            System.out.println("线程id:" + Thread.currentThread().getId() + "," + o);
                        });
                        //System.out.println();
                    }
                })
                .setParallelism(4)
                .print();
        System.out.println("partitionCustom");

    }

  • Filter
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Integer> dataSource = env.fromElements(1, 2, 3, 4, 5);

        dataSource.filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer value) throws Exception {
                return value % 2 == 0;
            }
        }).print();

    }
  • Projection of Tuple DataSet(元组数据集投影)
    project 转换将删除或移动元组数据集的元组字段。该 project(int…) 方法选择应由其索引保留的元组字段,并在输出元组中定义其顺序。
    project 不需要定义函数体
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        List<Tuple3<Integer, Integer, Integer>> data = new ArrayList<>();
        data.add(new Tuple3<>(1, 2, 3));
        data.add(new Tuple3<>(4, 5, 6));
        data.add(new Tuple3<>(7, 8, 9));

        // converts Tuple3<Integer, Integer, Integer> into Tuple2<Integer, Integer> 
        //选出合适的字段且可调换顺序
        env
                .fromCollection(data)
                .project(2, 0)
                .print();

    }

执行后的结果
(3,1)
(6,4)
(9,7)

  • Transformations on Grouped DataSet (分组数据集的转换)
    reduce 操作可以对分组的数据集进行操作。指定用于分组的密钥可以通过多种方式完成:
关键表达,groupBy("key")
键选择器功能,implements KeySelector
一个或多个字段位置键(仅限元组数据集),groupBy(0, 1)
案例类别字段(仅案例类别),groupBy("a", "b")
  • Reduce on Grouped DataSet(减少分组数据集)
    应用于分组数据集的 Reduce 转换使用用户定义的 reduce 函数将每个组简化为单个元素。
    对于每组输入元素,reduce 函数依次将成对的元素组合为一个元素,直到每组只剩下一个元素为止。
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
        data.add(new Tuple2<>(4, "demo9"));
        data.add(new Tuple2<>(4, "demo10"));
        data.add(new Tuple2<>(4, "demo11"));
        data.add(new Tuple2<>(4, "demo12"));
        data.add(new Tuple2<>(5, "demo13"));
        data.add(new Tuple2<>(5, "demo14"));
        data.add(new Tuple2<>(5, "demo15"));
        data.add(new Tuple2<>(5, "demo16"));
        data.add(new Tuple2<>(5, "demo17"));
        data.add(new Tuple2<>(6, "demo18"));
        data.add(new Tuple2<>(6, "demo19"));
        data.add(new Tuple2<>(6, "demo20"));
        data.add(new Tuple2<>(6, "demo21"));

        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        collection.groupBy(0).reduce(new ReduceFunction<Tuple2<Integer, String>>() {
            @Override
            public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
                return new Tuple2<>(value1.f0,value1.f1+" + "+value2.f1);
            }
        }).print();


    }

  • GroupReduce on Grouped DataSet(分组数据集上的 GroupReduce)
    应用于分组数据集的 GroupReduce 转换为每个组调用用户定义的 group-reduce 函数。
    此与 Reduce 的区别在于,用户定义的函数可一次获取整个组。该函数在组的所有元素上使用 Iterable 调用,并且可以返回任意数量的结果元素。
  public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
        data.add(new Tuple2<>(4, "demo9"));
        data.add(new Tuple2<>(4, "demo10"));
        data.add(new Tuple2<>(4, "demo11"));
        data.add(new Tuple2<>(4, "demo12"));
        data.add(new Tuple2<>(5, "demo13"));
        data.add(new Tuple2<>(5, "demo14"));
        data.add(new Tuple2<>(5, "demo15"));
        data.add(new Tuple2<>(5, "demo16"));
        data.add(new Tuple2<>(5, "demo17"));
        data.add(new Tuple2<>(6, "demo18"));
        data.add(new Tuple2<>(6, "demo19"));
        data.add(new Tuple2<>(6, "demo20"));
        data.add(new Tuple2<>(6, "demo21"));

        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        collection.groupBy(0)
                .reduceGroup(new RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {

                        values.forEach(o -> {
                            if (o.f0 % 2 == 0) {
                                out.collect(o);
                            }
                        });
                    }
                })
                .print();
    }

  • GroupCombine on a Grouped DataSet 可组合的 GroupReduce 函数

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
        data.add(new Tuple2<>(4, "demo9"));
        data.add(new Tuple2<>(4, "demo10"));
        data.add(new Tuple2<>(4, "demo11"));
        data.add(new Tuple2<>(4, "demo12"));
        data.add(new Tuple2<>(5, "demo13"));
        data.add(new Tuple2<>(5, "demo14"));
        data.add(new Tuple2<>(5, "demo15"));
        data.add(new Tuple2<>(5, "demo16"));
        data.add(new Tuple2<>(5, "demo17"));
        data.add(new Tuple2<>(6, "demo18"));
        data.add(new Tuple2<>(6, "demo19"));
        data.add(new Tuple2<>(6, "demo20"));
        data.add(new Tuple2<>(6, "demo21"));

        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        collection.groupBy(0)
//                .reduceGroup(new RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
//                    @Override
//                    public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
//
//                        values.forEach(o -> {
//                            if (o.f0 % 2 == 0) {
//                                out.collect(o);
//                            }
//                        });
//                    }
//                })
                .reduceGroup(new GroupReduce())
                .print();
    }

    private static class GroupReduce implements GroupCombineFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>, GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {

/**
  * 与 reduce 函数相比,group-reduce 函数不是可隐式组合的。
  * 为了使 group-reduce 函数可组合,它必须实现 GroupCombineFunction 接口。
  *
  * 要点:GroupCombineFunction 接口的通用输入和输出类型必须等于 GroupReduceFunction 的通用输入类型,
  * 如以下示例所示:
  */
        // 实现comblinefunction接口 
        @Override
        public void combine(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
            Tuple2<Integer, String> t = StreamSupport.stream(values.spliterator(), false).reduce((o1, o2) -> {
                return new Tuple2<>(o1.f0, o1.f1 + " + " + o2.f1);
            }).get();
            out.collect(t);
        }

        @Override
        public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
            values.forEach(o->{
                out.collect(o);
            });
        }
    }

  • Aggregate on Grouped Tuple DataSet(在分组元组数据集聚合)
    有一些常用的聚合操作。聚合转换提供以下内置聚合功能:
    Sum
    Min, and
    Max

聚合转换只能应用于元组数据集,并且仅支持用于分组的字段位置键

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, Integer>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, 1));
        data.add(new Tuple2<>(1, 2));
        data.add(new Tuple2<>(1, 3));
        data.add(new Tuple2<>(2, 4));
        data.add(new Tuple2<>(2, 5));
        data.add(new Tuple2<>(2, 6));
        data.add(new Tuple2<>(3, 7));
        data.add(new Tuple2<>(3, 8));
        data.add(new Tuple2<>(4, 9));
        data.add(new Tuple2<>(4, 10));
        data.add(new Tuple2<>(4, 11));
        data.add(new Tuple2<>(4, 12));
        data.add(new Tuple2<>(5, 13));
        data.add(new Tuple2<>(5, 14));
        data.add(new Tuple2<>(5, 15));
        data.add(new Tuple2<>(5, 16));
        data.add(new Tuple2<>(5, 17));
        data.add(new Tuple2<>(6, 18));
        data.add(new Tuple2<>(6, 19));
        data.add(new Tuple2<>(6, 20));
        data.add(new Tuple2<>(6, 21));

        DataSource<Tuple2<Integer, Integer>> collection = env.fromCollection(data);

        collection.groupBy(0)
                .aggregate(Aggregations.SUM,1)
                .print();
    }

  • MinBy / MaxBy on Grouped Tuple DataSet(分组元组数据集上的 MinBy / MaxBy)
    MinBy(MaxBy)转换为每个元组组选择一个元组。
    选定的元组是其一个或多个指定字段的值最小(最大)的元组。用于比较的字段必须是有效的关键字段,即可比较的字段。
    如果多个元组具有最小(最大)字段值,则返回这些元组中的任意元组。
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, Integer>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, 1));
        data.add(new Tuple2<>(1, 2));
        data.add(new Tuple2<>(1, 3));
        data.add(new Tuple2<>(2, 4));
        data.add(new Tuple2<>(2, 5));
        data.add(new Tuple2<>(2, 6));
        data.add(new Tuple2<>(3, 7));
        data.add(new Tuple2<>(3, 8));
        data.add(new Tuple2<>(4, 9));
        data.add(new Tuple2<>(4, 10));
        data.add(new Tuple2<>(4, 11));
        data.add(new Tuple2<>(4, 12));
        data.add(new Tuple2<>(5, 13));
        data.add(new Tuple2<>(5, 14));
        data.add(new Tuple2<>(5, 15));
        data.add(new Tuple2<>(5, 16));
        data.add(new Tuple2<>(5, 17));
        data.add(new Tuple2<>(6, 18));
        data.add(new Tuple2<>(6, 19));
        data.add(new Tuple2<>(6, 20));
        data.add(new Tuple2<>(6, 21));

        DataSource<Tuple2<Integer, Integer>> collection = env.fromCollection(data);

        collection.groupBy(0)
                .maxBy(1)
                .print();
    }

  • Reduce on full DataSet减少完整的 DataSet)
    Reduce 转换将用户定义的 reduce 函数应用于 DataSet 的所有元素。reduce 函数随后将成对的元素组合为一个元素,直到仅剩下一个元素为止。
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
        data.add(new Tuple2<>(4, "demo9"));
        data.add(new Tuple2<>(4, "demo10"));
        data.add(new Tuple2<>(4, "demo11"));
        data.add(new Tuple2<>(4, "demo12"));
        data.add(new Tuple2<>(5, "demo13"));
        data.add(new Tuple2<>(5, "demo14"));
        data.add(new Tuple2<>(5, "demo15"));
        data.add(new Tuple2<>(5, "demo16"));
        data.add(new Tuple2<>(5, "demo17"));
        data.add(new Tuple2<>(6, "demo18"));
        data.add(new Tuple2<>(6, "demo19"));
        data.add(new Tuple2<>(6, "demo20"));
        data.add(new Tuple2<>(6, "demo21"));

        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        collection
                .reduce(new ReduceFunction<Tuple2<Integer, String>>() {
                    @Override
                    public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
                        return new Tuple2<>(value1.f0 + value2.f0, value1.f1 + " + " + value2.f1);
                    }
                }).print();
    }

  • GroupReduce on full DataSet
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<>(1, "demo1"));
        data.add(new Tuple2<>(1, "demo2"));
        data.add(new Tuple2<>(1, "demo3"));
        data.add(new Tuple2<>(2, "demo4"));
        data.add(new Tuple2<>(2, "demo5"));
        data.add(new Tuple2<>(2, "demo6"));
        data.add(new Tuple2<>(3, "demo7"));
        data.add(new Tuple2<>(3, "demo8"));
      

        DataSource<Tuple2<Integer, String>> collection = env.fromCollection(data);

        collection
//                .groupBy(0)
                .reduce(new ReduceFunction<Tuple2<Integer, String>>() {
                    @Override
                    public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
                        return new Tuple2<>(value1.f0 + value2.f0, value1.f1 + " + " + value2.f1);
                    }
                }).print();


    }

  • GroupCombine on a full DataSet(完整数据集上的 GroupCombine)
    完整数据集上的 GroupCombine 与分组数据集上的 GroupCombine 相似。
    数据在所有节点上进行分区,然后以贪婪的方式进行组合(即,仅将适合内存的数据进行一次组合)。
  • Aggregate on full Tuple DataSet(完整数据集上聚合)
    聚合转换只能应用于元组数据集。

  • MinBy / MaxBy on full Tuple DataSet(完整数据集上的 MinBy/MaxBy)
    MinBy(MaxBy)转换从元组的数据集中选择一个元组。
    选定的元组是其一个或多个指定字段的值最小(最大)的元组。
    用于比较的字段必须是有效的关键字段,即可比较的字段。
    如果多个元组具有最小(最大)字段值,则返回这些元组中的任意元组。

  • Distinct(去重)
    Distinct 转换计算源数据集的不同元素的数据集。

  • Join

  • OuterJoin 合并两个数据流 join with

    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple2<Integer, String>> stringDataSource1 =
                env.fromElements(
                        new Tuple2(1, "小王"),
                        new Tuple2(2, "小里"),
                        new Tuple2(3, "小张"),
                        new Tuple2(4, "小四")
                );


        DataSource<Tuple2<Integer, String>> stringDataSource2 =
                env.fromElements(
                        new Tuple2(1, "北京"),
                        new Tuple2(2, "上海"),
                        new Tuple2(3, "成都"),
                        new Tuple2(5, "重庆")
                );


        stringDataSource1
                //.leftOuterJoin(stringDataSource2)
                //.rightOuterJoin(stringDataSource2)
                //.fullOuterJoin(stringDataSource2)
                .join(stringDataSource2)
                // 第一个数据 中 第一个字段
                .where(0)
                // 第二个数据 中 第一个字段
                .equalTo(0)
                .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
                    @Override
                    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
                        if (first == null) {
                            return new Tuple3<>(second.f0, "-", second.f1);
                        } else if (second == null) {
                            return new Tuple3<>(first.f0, first.f1, "-");
                        } else {
                            return new Tuple3<>(first.f0, first.f1, second.f1);
                        }
                    }
                })
                .print();
    }

  • Cross
    Cross 转换将两个数据集组合为一个数据集。它构建两个输入数据集的元素的所有成对组合,即构建笛卡尔积。
    Cross 转换要么在每对元素上调用用户定义的交叉函数,要么输出 Tuple2
    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple1<Integer>> stringDataSource1 =
                env.fromElements(
                        new Tuple1(1),
                        new Tuple1(2)
                );

        DataSource<Tuple1<String>> stringDataSource2 =
                env.fromElements(
                        new Tuple1("北京"),
                        new Tuple1("上海"),
                        new Tuple1("重庆")
                );

        stringDataSource1
                .cross(stringDataSource2)
                .with(new CrossFunction<Tuple1<Integer>, Tuple1<String>, Tuple2<Integer, String>>() {
                    @Override
                    public Tuple2<Integer, String> cross(Tuple1<Integer> val1, Tuple1<String> val2) throws Exception {
                        return new Tuple2(val1.f0, val2.f0);
                    }
                }).print();
    }

(1,北京)
(1,上海)
(1,重庆)
(2,北京)
(2,上海)
(2,重庆)

  • CoGroup 与join不同的是 join是一对一有多少相同的数据就产出多少条。 cogroup是多对多 会按照key分组
    CoGroup 转换共同处理两个数据集的组。两个数据集都分组在一个定义的键上,并且两个共享相同键的数据集的组一起交给用户定义的 co-group function。
    如果对于一个特定的键,只有一个 DataSet 有一个组,则使用该组和一个空组调用共同组功能。协同功能可以分别迭代两个组的元素并返回任意数量的结果元素。

与 Reduce,GroupReduce 和 Join 相似,可以使用不同的键选择方法来定义键。

    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple2<Integer, String>> stringDataSource1 =
                env.fromElements(
                        new Tuple2(1, "小王"),
                        new Tuple2(1, "小周"),
                        new Tuple2(2, "小里"),
                        new Tuple2(3, "小张"),
                        new Tuple2(4, "小四")
                );


        DataSource<Tuple2<Integer, String>> stringDataSource2 =
                env.fromElements(
                        new Tuple2(1, "北京"),
                        new Tuple2(1, "邯郸"),
                        new Tuple2(2, "上海"),
                        new Tuple2(3, "成都"),
                        new Tuple2(5, "重庆")
                );


        stringDataSource1
                .coGroup(stringDataSource2)
                // 第一个数据 中 第一个字段
                .where(0)
                // 第二个数据 中 第一个字段
                .equalTo(0)
                .with(new CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<Integer, String>> first, Iterable<Tuple2<Integer, String>> second, Collector<Tuple2<Integer, String>> out) throws Exception {
                        Tuple2<Integer, String> t1 = StreamSupport.stream(first.spliterator(), false).reduce((o1, o2) -> {
                            return new Tuple2<>(o1.f0, o1.f1 + " + " + o2.f1);
                        }).orElse(null);
                        Tuple2<Integer, String> t2 = StreamSupport.stream(second.spliterator(), false).reduce((o1, o2) -> {
                            return new Tuple2<>(o1.f0, o1.f1 + " + " + o2.f1);
                        }).orElse(null);
                        if (t1 != null && t2 != null) {
                            out.collect(new Tuple2<>(t1.f0, t1.f1 + " + " + t2.f1));
                        }
                    }
                })
                .print();
    }
(3,小张 + 成都)
(1,小王 + 小周 + 北京 + 邯郸)
(2,小里 + 上海)

  • Union 合并两个以上相同类型的数据集
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple2<Integer, String>> stringDataSource1 =
                env.fromElements(
                        new Tuple2(1, "小王"),
                        new Tuple2(1, "小周"),
                        new Tuple2(2, "小里"),
                        new Tuple2(3, "小张"),
                        new Tuple2(4, "小四")
                );


        DataSource<Tuple2<Integer, String>> stringDataSource2 =
                env.fromElements(
                        new Tuple2(1, "北京"),
                        new Tuple2(2, "上海"),
                        new Tuple2(3, "成都"),
                        new Tuple2(5, "重庆")
                );

        stringDataSource1.union(stringDataSource2)
                .print();
    }
(1,小王)
(4,小四)
(1,北京)
(1,小周)
(2,上海)
(2,小里)
(3,小张)
(3,小张)
(5,重庆)
  • Rebalance(重新平衡)
    均匀地重新平衡数据集的并行分区,以消除数据偏斜。
    重要:此操作会通过网络重新整理整个 DataSet。可能会花费大量时间。

  • Hash-Partition (哈希分区) 详见mapPartition
    重要:此操作会通过网络重新整理整个 DataSet。可能会花费大量时间。

  • Range-Partition(范围分区)
    重要:此操作需要在 DataSet 上额外传递一次以计算范围,通过网络对整个 DataSet 进行边界划分和改组。这会花费大量时间。

  • Sort Partition(分区排序)
    以指定顺序对指定字段上的 DataSet 的所有分区进行本地排序。可以将字段指定为字段表达式或字段位置。
    可以通过链接 sortPartition() 调用在多个字段上对分区进行排序。

  • First-n(前 n 个(任意)元素)
    返回数据集的前 n 个(任意)元素。First-n 可以应用于常规数据集,分组的数据集或分组排序的数据集。可以将分组键指定为键选择器功能或字段位置键。

相关文章

网友评论

      本文标题:flink DataSet

      本文链接:https://www.haomeiwen.com/subject/fkmbjktx.html