1.获取execution environment
StreamExecutionEnvironment是所有流式Flink程序的基础,可以通过静态方法获取environment:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
2.加载/创建初始化数据
- 基于文件
readTextFile(path) readFile(fileInputFormat, path) readFile(fileInputFormat, path, watchType, interval, pathFilter)
- 基于socket
socketTextStream
- 基于collection
fromCollection(Seq) fromCollection(Iterator) fromElements(elements: _*) fromParallelCollection(SplittableIterator) generateSequence(from, to) addSource()
3.数据的转换
不同的算子,例如常用的几种数据流和算子(参考http://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams/)
Flink 中目前支持的主要几种流的类型,以及它们之间的转换关系4.指定放置计算结果的位置
writeAsText() / TextOutputFormat: 以字符串的形式逐行写入元素。字符串是通过调用每个元素的toString()方法获得的。
writeAsCsv(...) / CsvOutputFormat :将元组写入以逗号分隔的value文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
print() / printToErr() :在标准输出/标准错误流上print每个元素的toString()值。还可以选择在输出之前增加prefix(msg)来帮助区分不同的打印调用如果并行度大于1,输出还将加上生成输出的任务的标识符。
writeUsingOutputFormat() / FileOutputFormat: 方法和基类自定义文件输出,支持自定义对象到字节的转换。
writeToSocket: 根据SerializationSchema将元素写入Socket。
addSink: 调用自定义sink函数,Flink附带了到其他系统(如Apache Kafka)的连接器,这些连接器实现了sink函数。
5.触发程序执行
在local模式下执行程序
execute()
将程序达成jar运行在线上
./bin/flink run \
-m node21:8081 \
./examples/batch/WordCount.jar \(jar包所在位置)
--input hdfs:********** \
--output hdfs:********** \
网友评论