美文网首页
Flink自定义source

Flink自定义source

作者: yayooo | 来源:发表于2019-08-23 00:01 被阅读0次
    package com.atguigu.apiTest
    
    
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import org.apache.flink.streaming.api.scala._
    
    import scala.util.Random
    
    object SourceTestUDSource {
      def main(args: Array[String]): Unit = {
    
        //自定义source
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
    
        val stream4: DataStream[SensorReading] = env.addSource(new SensorSource())
    
        stream4.print("stream4").setParallelism(1)
    
        env.execute()
      }
    }
    
    
    /**
      * //传感器读数样例类
      * case class SensorReading(id:String, timestamp:Long, temperature: Double)
      */
    class SensorSource() extends SourceFunction[SensorReading] {
    
      //定义一个flag,表示数据源是否正常运行
      var running: Boolean = true
      //关闭数据源的生成
      override def cancel(): Unit = {
        running=false
      }
    
      //正常生成数据
      override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
        //初始化一个随机数生成器
        val rand = new Random()
    
        //持续生成
        //初始化定义一组数据
        var curTemp = 1.to(10).map(
          i => ("sensor_" + i, 60 + rand.nextGaussian()* 20 )
        )
    
        while (running) {
          //更新前一次数据的值
          curTemp = curTemp.map(
            t => (t._1, t._2 + rand.nextGaussian())
          )
          //获取当前时间戳
          val curTime: Long = System.currentTimeMillis()
          curTemp.foreach(
            //使用sourceContext上下文输出
            t=> sourceContext.collect(SensorReading(t._1,curTime,t._2))
          )
          Thread.sleep(500)
        }
    
        running=true
      }
    }
    

    相关文章

      网友评论

          本文标题:Flink自定义source

          本文链接:https://www.haomeiwen.com/subject/qfhasctx.html