CEP了,复杂了,但可以理解。这个书上的代码好像有问题,要稍后研究下~
代码
package org.bbk.flink
import java.util
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.pattern.Pattern
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.commons.lang3.time.FastDateFormat
/*
数据格式
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,38,0.52.1.1,2020-03-02 12:20:32
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,47,0.48.1.1,2020-03-02 12:20:35
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,50,0.51.1.1,2020-03-02 12:20:38
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,31,0.50.1.1,2020-03-02 12:20:39
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,52,0.46.1.1,2020-03-02 12:20:41
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,53,0.47.1.1,2020-03-02 12:20:43
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,55,0.48.1.1,2020-03-02 12:20:45
*/
case class DeviceDetail(sensorMac: String, deviceMac: String,
temperature: String, dampness: String,
pressure: String, date: String)
case class AlarmDevice(sensorMac: String, deviceMac: String, temperature: String)
object Demo {
private val format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
def main(args:Array[String]):Unit = {
val envStream = StreamExecutionEnvironment.getExecutionEnvironment
envStream.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
envStream.setParallelism(1)
import org.apache.flink.api.scala._
val sourceStream: DataStream[String] = envStream.socketTextStream("http://192.168.1.111", 9999)
val deviceStream: KeyedStream[DeviceDetail, String] = sourceStream.map(x => {
val strings: Array[String] = x.split(",")
DeviceDetail(strings(0), strings(1), strings(2), strings(3), strings(4), strings(5))
}).assignAscendingTimestamps(x => {
format.parse(x.date).getTime
}).keyBy(x => x.sensorMac)
val pattern: Pattern[DeviceDetail, AlarmDevice] = Pattern.begin[DeviceDetail]("start")
.where(x => x.temperature.toInt >= 40)
.followedByAny("follow")
.where(x => x.temperature.toInt >= 40)
.followedByAny("third")
.where(x => x.temperature.toInt >= 40)
.within(Time.minutes(3))
val patternResult: PatternStream[DeviceDetail] = CEP.pattern(deviceStream, pattern)
patternResult.select(new MyPatternResultFunction).print()
envStream.execute("startTemperature")
}
}
class MyPatternResultFunction extends PatternSelectFunction[DeviceDetail, AlarmDevice] {
override def select(pattern: util.Map[String, util.List[DeviceDetail]]): AlarmDevice = {
val startDetail: util.List[DeviceDetail] = pattern.get("start")
val followDetail: util.List[DeviceDetail] = pattern.get("follow")
val thirdDetail: util.List[DeviceDetail] = pattern.get("third")
val startResult: DeviceDetail = startDetail.listIterator().next()
val followResult: DeviceDetail = followDetail.listIterator().next()
val thirdResult: DeviceDetail = thirdDetail.listIterator().next()
println("第一条数据:" + startResult)
println("第二条数据:" + followResult)
println("第三条数据:" + thirdResult)
AlarmDevice(thirdResult.sensorMac, thirdResult.deviceMac, thirdResult.temperature)
}
}
网友评论