编程结构
public class SocketTextStreamWordCount {
public static void main(String[] args) throws Exception {
if (args.length != 2){
System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
return;
}
String hostName = args[0];
Integer port = Integer.parseInt(args[1]);
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream(hostName, port);
DataStream<Tuple2<String, Integer>> counts
text.flatMap(new LineSplitter())
.keyBy(0)
.sum(1);
counts.print();
env.execute("Java WordCount from SocketTextStream Example");
}
- 获得一个execution environment,
- 加载/创建初始数据,
- 指定此数据的转换,
- 指定放置计算结果的位置,
- 触发程序执行
DataSet API
分类
- Source: 数据源创建初始数据集,例如来自文件或Java集合
- Transformation: 数据转换将一个或多个DataSet转换为新的DataSet
- Sink: 将计算结果存储或返回
DataSet Source
基于文件
-
readTextFile(path)/ TextInputFormat
按行读取文件并将其作为字符串返回。 -
readTextFileWithValue(path)/ TextValueInputFormat
按行读取文件并将它们作为StringValues返回。StringValues是可变字符串。 -
readCsvFile(path)/ CsvInputFormat
解析逗号(或其他字符)分隔字段的文件。返回元组或POJO的DataSet。支持基本java类型及其Value对应作为字段类型。 -
readFileOfPrimitives(path, Class)/ PrimitiveInputFormat
解析新行(或其他字符序列)分隔的原始数据类型(如String或)的文件Integer。 -
readFileOfPrimitives(path, delimiter, Class)/ PrimitiveInputFormat
解析新行(或其他字符序列)分隔的原始数据类型的文件,例如String或Integer使用给定的分隔符。 -
readSequenceFile(Key, Value, path)/ SequenceFileInputFormat
创建一个JobConf并从类型为SequenceFileInputFormat,Key class和Value类的指定路径中读取文件,并将它们作为Tuple2 <Key,Value>返回。
基于集合
fromCollection(Collection)
从Java Java.util.Collection创建数据集。集合中的所有数据元必须属于同一类型。
fromCollection(Iterator, Class)
从迭代器创建数据集。该类指定迭代器返回的数据元的数据类型。
fromElements(T ...)
根据给定的对象序列创建数据集。所有对象必须属于同一类型。
fromParallelCollection(SplittableIterator, Class)
并行地从迭代器创建数据集。该类指定迭代器返回的数据元的数据类型。
generateSequence(from, to)
并行生成给定间隔中的数字序列。
通用方法
- readFile(inputFormat, path)/ FileInputFormat- 接受文件输入格式。
- createInput(inputFormat)/ InputFormat- 接受通用输入格式。
代码示例
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 从本地文件系统读
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
// 读取HDFS文件
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
// 读取CSV文件
DataSet<Tuple3<Integer, String, Double>> csvInput1 = env.readCsvFile("hdfs:///the/CSV/file").types(Integer.class, String.class, Double.class);
// 读取CSV文件中的部分
DataSet<Tuple2<String, Double>> csvInput2 = env.readCsvFile("hdfs:///the/CSV/file").includeFields("10010").types(String.class, Double.class);
// 读取CSV映射为一个java类
// DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").pojoType(Person.class, "name", "age", "zipcode");
// 读取一个指定位置序列化好的文件
// DataSet<Tuple2<IntWritable, Text>> tuples =
// env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");
// 从输入字符创建
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");
DataSource<Long> fromParallelCollection = env.fromParallelCollection(new NumberSequenceIterator(10, 20), Long.class);
// 创建一个数字序列
DataSet<Long> numbers = env.generateSequence(1, 10000000);
// 从关系型数据库读取
// DataSet<Tuple2<String, Integer> dbData =
// env.createInput(JDBCInputFormat.buildJDBCInputFormat().setDrivername("org.apache.derby.jdbc.EmbeddedDriver").setDBUrl("jdbc:derby:memory:persons")
// .setQuery("select name, age from persons")
// .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
// .finish());
DataSet Transformation
详情参考官网:数据集转换
- Map
采用一个数据元并生成一个数据元。
data.map(new MapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});
- FlatMap
采用一个数据元并生成零个,一个或多个数据元。
data.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String value, Collector<String> out) {
for (String s : value.split(" ")) {
out.collect(s);
}
}
});
网友评论