美文网首页
flink应用CEP之高温预警场景应用

flink应用CEP之高温预警场景应用

作者: 万州客 | 来源:发表于2022-05-19 08:49 被阅读0次

CEP了,复杂了,但可以理解。这个书上的代码好像有问题,要稍后研究下~

代码

package org.bbk.flink

import java.util
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.pattern.Pattern
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.commons.lang3.time.FastDateFormat

/*
数据格式
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,38,0.52.1.1,2020-03-02 12:20:32
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,47,0.48.1.1,2020-03-02 12:20:35
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,50,0.51.1.1,2020-03-02 12:20:38
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,31,0.50.1.1,2020-03-02 12:20:39
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,52,0.46.1.1,2020-03-02 12:20:41
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,53,0.47.1.1,2020-03-02 12:20:43
00-34-5E-5F-89-A4,00-01-6C-06-A6-29,55,0.48.1.1,2020-03-02 12:20:45
 */

case class DeviceDetail(sensorMac: String, deviceMac: String,
                        temperature: String, dampness: String,
                        pressure: String, date: String)
case class AlarmDevice(sensorMac: String, deviceMac: String, temperature: String)

object Demo {
  private val format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
  def main(args:Array[String]):Unit = {
    val envStream = StreamExecutionEnvironment.getExecutionEnvironment
    envStream.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    envStream.setParallelism(1)

    import org.apache.flink.api.scala._

    val sourceStream: DataStream[String] = envStream.socketTextStream("http://192.168.1.111", 9999)
    val deviceStream: KeyedStream[DeviceDetail, String] = sourceStream.map(x => {
      val strings: Array[String] = x.split(",")
      DeviceDetail(strings(0), strings(1), strings(2), strings(3), strings(4), strings(5))
    }).assignAscendingTimestamps(x => {
      format.parse(x.date).getTime
    }).keyBy(x => x.sensorMac)

    val pattern: Pattern[DeviceDetail, AlarmDevice] = Pattern.begin[DeviceDetail]("start")
      .where(x => x.temperature.toInt >= 40)
      .followedByAny("follow")
      .where(x => x.temperature.toInt >= 40)
      .followedByAny("third")
      .where(x => x.temperature.toInt >= 40)
      .within(Time.minutes(3))

    val patternResult: PatternStream[DeviceDetail] = CEP.pattern(deviceStream, pattern)
    patternResult.select(new MyPatternResultFunction).print()

    envStream.execute("startTemperature")
  }
}

class MyPatternResultFunction extends PatternSelectFunction[DeviceDetail, AlarmDevice] {
  override def select(pattern: util.Map[String, util.List[DeviceDetail]]): AlarmDevice = {
    val startDetail: util.List[DeviceDetail] = pattern.get("start")
    val followDetail: util.List[DeviceDetail] = pattern.get("follow")
    val thirdDetail: util.List[DeviceDetail] = pattern.get("third")

    val startResult: DeviceDetail = startDetail.listIterator().next()
    val followResult: DeviceDetail = followDetail.listIterator().next()
    val thirdResult: DeviceDetail = thirdDetail.listIterator().next()

    println("第一条数据:" + startResult)
    println("第二条数据:" + followResult)
    println("第三条数据:" + thirdResult)

    AlarmDevice(thirdResult.sensorMac, thirdResult.deviceMac, thirdResult.temperature)
  }
}

相关文章

网友评论

      本文标题:flink应用CEP之高温预警场景应用

      本文链接:https://www.haomeiwen.com/subject/ejhrurtx.html