按三段论顺序
一、得到流式环境
StreamExecutionEnvironment.getExecutionEnvironment()
相应的子api
- createLocalEnvironmentWithWebUI( new Configuration())
- createLocalEnvironment(...) 本地环境
- createRemoteEnvironmen(<host>,<port>,String... jarFiles)
并行序优先级:
自有平行度配置数据高于父亲并行度的设置setParallelism(2);
二、获取源 - Data Source API
关联数据到应用程序 : StreamExecutionEnvironment.addSource(sourceFunction)
sourceFunction
可以自定义编写单例源、并行源或富并行源。
SourceFunction
、ParallelSourceFunction
、RichParallelSourceFunction
。
自定义sourceFunction
//只支持一个并行度的
env.addSource(new SourceFunction<T>(){
@Override
public void run(SourceContext sourceContext) throws Exception {
}
@Override
public void cancel() {
}
});
内置源---pre-implemments
基于文件
env.readTextFile(file)
.readTextFile(rootPath+"example/wordcount.txt");
- 更多文件读取函数
readFile(fileInputFormat, path)
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo...):
基于socket
env.socketTextStream(<host>,<port>,",")
基于集合
- 集合
fromCollection(Collection) - DataStream来自Java.util.Collection.
fromCollection(Iterator, Class) - DataStream来自java.util.iterator.
fromElements(T ...) - 元素-对像和须是相同的数据类型
generateSequence(from, to) -- DataStream来自给定的序列化相同对象.
- 自定义集合
addSource(...)
- 可并行的数据流
fromParallelCollection(SplittableIterator, Class) - DataStream来自环境下的
fromParallelCollection(SplittableIterator<OUT> iterator,TypeInformation<OUT>)
- SplittableIterator:数据源包括在一个切开的迭代器中取中
- TypeInformation:
三、Bundled Connectors实现同第三方工具的集成
- Kafka(source/sink) - FlinkKafkaConsumer/FlinkKafkaProducer
ip:port、group.id、topic
- Cassandra(sink)
- Amazon Kinesis Stream(source/sink)
- ElasticSearch(sink)
- 文件系统或HDFS(流与批两种方式,sink)
- RabbmitMQ(source/sink)
- Apapche NiFi(source/sink)
- JDBC(sink)
- Google PubSub(source/sink)
- Twitter Streaming API(source)
...
四、数据转换 - DataStream Operators
数据转换: Transformation API
1、Map:DataStream -> DataStream
元素一一对应
DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return Integer.parseInt(value);
}
});
2. FlatMap : DataStream至DataStream
一个进来,拆解成多个。比如通过,号分割成不同的元素
3. Filter : DataStream至DataStream
满足条件的拉出即可,数据类型不变。
4. KeyBy:DataStream --> KeyedStream
根据关键字进行数据操作
官网例子5. Reduce:KeyedStream --> DataStream
用于分组之后,做聚合操作、累计操作等
官网例子6. Aggregations::KeyedStream --> DataStream
聚合操作
官网例子
7. windwos: KeyedStream --> WindowedStream
把排序后的整流按所设定的时间拆解到不同的窗口中,统计分析基于窗口来做运算。
官网例子8. windwos ALL: DataStream --> AllWindowedStream
将整个流放到一个窗口里。
官网例子
9. windwos Apply:
WindowedStream--> DataStream 与AllWindowedStream -> DataStream
官网例子
10. windwos Reduce: WindowedStream--> DataStream
11. Aggregations on windwos : WindowedStream--> DataStream
官网例子12. Union: DataStream* 至DataStream
数据合并
官网例子
13. Windows Join: DataStream,DataStream 至DataStream
官网例子14. Interval Join:KeyedStream ,KeyedStream -> DataStream
和Windows Join类似,但有时间限制
官网例子
网友评论