package com.atguigu.apiTest
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import scala.util.Random
object SourceTestUDSource {
def main(args: Array[String]): Unit = {
//自定义source
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream4: DataStream[SensorReading] = env.addSource(new SensorSource())
stream4.print("stream4").setParallelism(1)
env.execute()
}
}
/**
* //传感器读数样例类
* case class SensorReading(id:String, timestamp:Long, temperature: Double)
*/
class SensorSource() extends SourceFunction[SensorReading] {
//定义一个flag,表示数据源是否正常运行
var running: Boolean = true
//关闭数据源的生成
override def cancel(): Unit = {
running=false
}
//正常生成数据
override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
//初始化一个随机数生成器
val rand = new Random()
//持续生成
//初始化定义一组数据
var curTemp = 1.to(10).map(
i => ("sensor_" + i, 60 + rand.nextGaussian()* 20 )
)
while (running) {
//更新前一次数据的值
curTemp = curTemp.map(
t => (t._1, t._2 + rand.nextGaussian())
)
//获取当前时间戳
val curTime: Long = System.currentTimeMillis()
curTemp.foreach(
//使用sourceContext上下文输出
t=> sourceContext.collect(SensorReading(t._1,curTime,t._2))
)
Thread.sleep(500)
}
running=true
}
}
网友评论