美文网首页
滚动聚合算子(Rolling Aggregation)

滚动聚合算子(Rolling Aggregation)

作者: yayooo | 来源:发表于2019-08-25 15:42 被阅读0次

    这些算子可以针对KeyedStream的每一个支流做聚合。

    • sum
    • min
    • max
    • minBy
    • maxBy

    测试maxBy

    package com.atguigu.apiTest
    
    import org.apache.flink.streaming.api.scala._
    
    object TranformTest {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        //将全局并行度设置为1
        env.setParallelism(1)
    
        val streamFromFile: DataStream[String] = env.readTextFile("C:\\Users\\Administrator\\Desktop\\0311Flink\\flink\\src\\main\\resources\\hello.txt")
    
        //1.基本转换算子和简单聚合算子
        val dataStream: DataStream[SensorReading] = streamFromFile.map(data => {
          val dataArray: Array[String] = data.split(",")
          //包装成样例类
          SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    
        })
    
        //keyBy()
       val keyByStream: DataStream[SensorReading] = dataStream.keyBy("id").maxBy("temperature")
    
        keyByStream.print()
    
    
        env.execute()
    
      }
    
    }
    
    

    文件hello.txt为:

    sensor_1, 1547718199, 35.80018327300259
    sensor_6, 1547718201, 15.402984393403084
    sensor_7, 1547718202, 6.720945201171228
    sensor_10, 1547718205, 38.101067604893444
    sensor_7, 1547718202, 6
    

    输出:

    SensorReading(sensor_1,1547718199,35.80018327300259)
    SensorReading(sensor_6,1547718201,15.402984393403084)
    SensorReading(sensor_7,1547718202,6.720945201171228)
    SensorReading(sensor_10,1547718205,38.101067604893444)
    SensorReading(sensor_7,1547718202,6.720945201171228)
    
    

    相关文章

      网友评论

          本文标题:滚动聚合算子(Rolling Aggregation)

          本文链接:https://www.haomeiwen.com/subject/epxxectx.html