字面意思为,匹配的事件重复的次数越多越好。文字描述不够直观,我们直接上代码。
package ceptest
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
object TestSkipStrategyextends App {
val env = StreamExecutionEnvironment.createLocalEnvironment()
// Create a DataStream from a list of elements
val myInts: DataStream[String] =env.fromElements("a", "b", "c1", "c2", "c3", "d", "c4", "e", "c5", "d")
val pattern2 = Pattern.begin[String]("start", AfterMatchSkipStrategy.skipPastLastEvent()).where(_.startsWith("c")).oneOrMore.greedy
.followedBy("middle").where(_.startsWith("d"))
val patternStream = CEP.pattern(myInts, pattern2)
patternStream.select(patternSelectFun => {
val start = patternSelectFun.get("start")
val middle = patternSelectFun.get("middle")
println(start.mkString("->") +"->" + middle.mkString("->"))
})
env.execute("test cep")
}
上面代码的输出结果为:
结果输出可以看到,只把最长的匹配结果进行了输出。怎么做到这一点呢!!!
AfterMatchSkipStrategy.skipPastLastEvent()和greedy结合使用,完美解决。
结果输出中,第二个匹配事件的输出也是很值得注意的。当我们没有设定连续性时,默认是relaxed contiguity。官网说明如下:
For looping patterns (e.g. oneOrMore() and times()) the default is relaxed contiguity
网友评论