reduce

作者: yayooo | 来源:发表于2019-08-25 15:42 被阅读0次

KeyedStream → DataStream
一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是返回最后一次聚合的最终结果。

package com.atguigu.apiTest

import org.apache.flink.streaming.api.scala._

object TestReduce {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val stream2: DataStream[String] = env.readTextFile("C:\\Users\\Administrator\\Desktop\\0311Flink\\flink\\src\\main\\resources\\sensor")

    //stream2.print()

    val strem3: DataStream[SensorReading] = stream2.map(data => {
      val dataArray: Array[String] = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    }).keyBy("id")

    println("*******")
    //strem3.print()

    //输出当前传感器最新的温度+10,而时间戳是上一次数据的时间+1
    val strem4: DataStream[SensorReading] = strem3.keyBy("id").reduce((x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature + 10))
     //(x, y) => SensorReading() x表示已经聚合的结果,y表示新来的

    strem4.print()



    env.execute()

  }

}

//传感器读数样例类
case class SensorReading(id:String, timestamp:Long, temperature: Double)

输出结果:

2> SensorReading(sensor_10,1547718206,10.1)
3> SensorReading(sensor_1,1547718200,11.1)
3> SensorReading(sensor_1,1547718201,45.8)
3> SensorReading(sensor_6,1547718201,15.4)
2> SensorReading(sensor_10,1547718207,48.1)
4> SensorReading(sensor_7,1547718202,6.7)

相关文章

网友评论

      本文标题:reduce

      本文链接:https://www.haomeiwen.com/subject/iyqcectx.html