美文网首页
二、 DataSetAPI

二、 DataSetAPI

作者: 木戎 | 来源:发表于2019-05-20 16:37 被阅读0次

编程结构


public class SocketTextStreamWordCount {

    public static void main(String[] args) throws Exception {
        if (args.length != 2){
System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            return;
        }
        String hostName = args[0];
        Integer port = Integer.parseInt(args[1]);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();
        DataStream<String> text = env.socketTextStream(hostName, port);

        DataStream<Tuple2<String, Integer>> counts 
        text.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);
        counts.print();
        env.execute("Java WordCount from SocketTextStream Example");
    }
  • 获得一个execution environment,
  • 加载/创建初始数据,
  • 指定此数据的转换,
  • 指定放置计算结果的位置,
  • 触发程序执行

DataSet API


分类

  • Source: 数据源创建初始数据集,例如来自文件或Java集合
  • Transformation: 数据转换将一个或多个DataSet转换为新的DataSet
  • Sink: 将计算结果存储或返回

DataSet Source

基于文件

  • readTextFile(path)/ TextInputFormat 按行读取文件并将其作为字符串返回。
  • readTextFileWithValue(path)/ TextValueInputFormat 按行读取文件并将它们作为StringValues返回。StringValues是可变字符串。
  • readCsvFile(path)/ CsvInputFormat 解析逗号(或其他字符)分隔字段的文件。返回元组或POJO的DataSet。支持基本java类型及其Value对应作为字段类型。
  • readFileOfPrimitives(path, Class)/ PrimitiveInputFormat 解析新行(或其他字符序列)分隔的原始数据类型(如String或)的文件Integer。
  • readFileOfPrimitives(path, delimiter, Class)/ PrimitiveInputFormat 解析新行(或其他字符序列)分隔的原始数据类型的文件,例如String或Integer使用给定的分隔符。
  • readSequenceFile(Key, Value, path)/ SequenceFileInputFormat 创建一个JobConf并从类型为SequenceFileInputFormat,Key class和Value类的指定路径中读取文件,并将它们作为Tuple2 <Key,Value>返回。

基于集合

fromCollection(Collection) 从Java Java.util.Collection创建数据集。集合中的所有数据元必须属于同一类型。

fromCollection(Iterator, Class) 从迭代器创建数据集。该类指定迭代器返回的数据元的数据类型。

fromElements(T ...) 根据给定的对象序列创建数据集。所有对象必须属于同一类型。

fromParallelCollection(SplittableIterator, Class) 并行地从迭代器创建数据集。该类指定迭代器返回的数据元的数据类型。

generateSequence(from, to) 并行生成给定间隔中的数字序列。

通用方法

  • readFile(inputFormat, path)/ FileInputFormat- 接受文件输入格式。
  • createInput(inputFormat)/ InputFormat- 接受通用输入格式。

代码示例

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 从本地文件系统读
        DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");

        // 读取HDFS文件
        DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");

        // 读取CSV文件
        DataSet<Tuple3<Integer, String, Double>> csvInput1 = env.readCsvFile("hdfs:///the/CSV/file").types(Integer.class, String.class, Double.class);

        // 读取CSV文件中的部分
        DataSet<Tuple2<String, Double>> csvInput2 = env.readCsvFile("hdfs:///the/CSV/file").includeFields("10010").types(String.class, Double.class);


        // 读取CSV映射为一个java类
//    DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").pojoType(Person.class, "name", "age", "zipcode");

        // 读取一个指定位置序列化好的文件
//    DataSet<Tuple2<IntWritable, Text>> tuples =
//            env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");

        // 从输入字符创建
        DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");

        DataSource<Long> fromParallelCollection = env.fromParallelCollection(new NumberSequenceIterator(10, 20), Long.class);

        // 创建一个数字序列
        DataSet<Long> numbers = env.generateSequence(1, 10000000);

        // 从关系型数据库读取
//    DataSet<Tuple2<String, Integer> dbData =
//            env.createInput(JDBCInputFormat.buildJDBCInputFormat().setDrivername("org.apache.derby.jdbc.EmbeddedDriver").setDBUrl("jdbc:derby:memory:persons")
//                    .setQuery("select name, age from persons")
//                    .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
//                    .finish());

DataSet Transformation

详情参考官网:数据集转换

  • Map
    采用一个数据元并生成一个数据元。
data.map(new MapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});
  • FlatMap
    采用一个数据元并生成零个,一个或多个数据元。
data.flatMap(new FlatMapFunction<String, String>() {
  public void flatMap(String value, Collector<String> out) {
    for (String s : value.split(" ")) {
      out.collect(s);
    }
  }
});

相关文章

  • 二、 DataSetAPI

    编程结构 获得一个execution environment, 加载/创建初始数据, 指定此数据的转换, 指定放置...

  • tensorflow全新的数据读取方式,DatasetAPI

    tf.data 最佳实践摘要 具体理解参考:https://tensorflow.juejin.im/perfor...

  • Flink从入门到放弃(入门篇3)-DataSetAPI

    戳更多文章: 1-Flink入门 2-本地环境搭建&构建第一个Flink应用 3-DataSet API 4-Da...

  • 二(二)

    发什么神经 突然就很想花钱 一边心疼 一边毫不在乎的花 啧 莫名其妙

  • 二,二

    2017.9.11教师节后的周一,第一次走进教室,刚站到讲台两个小可爱送给我两束花,原谅我那时候人还没有认全没有记...

  • 二〇二〇

    本来这篇小结打算年初写的,但是想想后边还有复试就先放一放,结果复试结束后过了两个月才想起要写这篇小结... 时过境...

  • 二金二木二火二土

    今天看完了极简中国史,这本书看了半个多月,因为是八十年前写的书,不是白话文,所以看的特别累。不过从近代前辈的角度去...

  • 说二『似二非二的二』

    说实话原以为他最多似二,生活小节或许专门学着似二,中枢神经应该不至于非二,没想到最近越来越疯狂地绞尽脑汁地朝着二的...

  • 二胎(二)

    今天宝宝三十周了,还有十周你就要出来了,也许会提前,妈妈和家人都很期待。 到了孕晚期,睡觉是个问题,左睡右睡都不对...

  • 二小姐(二)

    我去了李家,那环境好,夫人老爷小姐都很和蔼,我正坐在由木头和瓷做的椅子上,正等待着女管家来接我,我人生地不熟...

网友评论

      本文标题:二、 DataSetAPI

      本文链接:https://www.haomeiwen.com/subject/wwybaqtx.html