这些算子可以针对KeyedStream的每一个支流做聚合。
- sum
- min
- max
- minBy
- maxBy
测试maxBy
package com.atguigu.apiTest
import org.apache.flink.streaming.api.scala._
object TranformTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//将全局并行度设置为1
env.setParallelism(1)
val streamFromFile: DataStream[String] = env.readTextFile("C:\\Users\\Administrator\\Desktop\\0311Flink\\flink\\src\\main\\resources\\hello.txt")
//1.基本转换算子和简单聚合算子
val dataStream: DataStream[SensorReading] = streamFromFile.map(data => {
val dataArray: Array[String] = data.split(",")
//包装成样例类
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
//keyBy()
val keyByStream: DataStream[SensorReading] = dataStream.keyBy("id").maxBy("temperature")
keyByStream.print()
env.execute()
}
}
文件hello.txt为:
sensor_1, 1547718199, 35.80018327300259
sensor_6, 1547718201, 15.402984393403084
sensor_7, 1547718202, 6.720945201171228
sensor_10, 1547718205, 38.101067604893444
sensor_7, 1547718202, 6
输出:
SensorReading(sensor_1,1547718199,35.80018327300259)
SensorReading(sensor_6,1547718201,15.402984393403084)
SensorReading(sensor_7,1547718202,6.720945201171228)
SensorReading(sensor_10,1547718205,38.101067604893444)
SensorReading(sensor_7,1547718202,6.720945201171228)
网友评论