美文网首页
多流转换算子Connect和 CoMap

多流转换算子Connect和 CoMap

作者: yayooo | 来源:发表于2019-08-25 16:28 被阅读0次
    connect
    DataStream,DataStream → ConnectedStreams
    将两条流形式上包了一层,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。 CoMap/CoFlatMap
    ConnectedStreams → DataStream
    作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。
    package com.atguigu.apiTest
    
    import org.apache.flink.streaming.api.scala._
    
    object TestSplitSelect {
      def main(args: Array[String]): Unit = {
    
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val dataStream: DataStream[String] = env.readTextFile("C:\\Users\\Administrator\\Desktop\\0311Flink\\flink\\src\\main\\resources\\sensor")
    
        val dataStrem2: DataStream[SensorReading] = dataStream.map(data => {
          val dataArray: Array[String] = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
        })
    
        val splitStream: SplitStream[SensorReading] = dataStrem2.split(sensorData => {
          if (sensorData.temperature > 30) Seq("high") else Seq("low")
        })
    
        val high = splitStream.select("high")
        val low = splitStream.select("low")
        val all = splitStream.select("high","low")
    
        //合并两条流
        val warning: DataStream[(String, Double)] = high.map(data => (data.id, data.temperature))
        val connectedStream: ConnectedStreams[(String, Double), SensorReading] = warning.connect(low)
    
        //def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R) 传入两个函数
        val coMapDataStream: DataStream[Product with Serializable] = connectedStream.map(
          warningData => (warningData._1, warningData._2, "warning"),
          lowData => (lowData.id, "healthy")
        )
        coMapDataStream.print()
    
    
    
        env.execute()
    
    
      }
    
    }
    
    //传感器读数样例类
    case class SensorReading(id:String, timestamp:Long, temperature: Double)
    
    

    输出结果:

    4> (sensor_1,healthy)
    2> (sensor_10,38.1,warning)
    2> (sensor_7,healthy)
    3> (sensor_10,healthy)
    1> (sensor_1,35.8,warning)
    1> (sensor_6,healthy)
    
    

    相关文章

      网友评论

          本文标题:多流转换算子Connect和 CoMap

          本文链接:https://www.haomeiwen.com/subject/ldtkectx.html