Flink的复杂事件处理(complex event processing, CEP)库能够在无界数据流中通过匹配定义好的事件模式来发现一系列事件之间的关联规律,从而有效支持趋势分析、风险监控、欺诈检测等业务场景。
在源码注释中,可以得知它是基于《Efficient Pattern Matching over Event Streams》这篇论文的思想实现的。该论文提出了一种在事件流上进行高效模式匹配的方法,即带匹配缓存的非确定有限状态机,又称为NFA自动机,非确定指的是每个状态对应的下一状态是不确定的
。
NFA包含了该次模式匹配的各个状态和状态间转换的表达式
Flink-cep 三种状态迁移边
,在yarn中也有有限状态机组件
image.png
- 先介绍两个概念:
1.状态:state1/state2/state3/endstat都是状态的名称。
2.转换:take/ignore/proceed都是转换的名称。 - 在这NFA匹配规则里,本质上是一个状态转换的过程。三种转换的含义如下所示:
1.Take: 表示事件匹配成功,将当前状态更新到新状态,并前进到“下一个”状态;
2.Procceed: 当事件来到的时候,当前状态不发生变化,在状态转换图中事件直接“前进”到下一个目标状态;
3.Ignore: 当事件来到的时候,如果匹配不成功,忽略当前事件,当前状态不发生任何变化。
//定义模式:在1秒之内连续登陆失败两次
val loginFailPattern = Pattern.begin[LoginEvent]("start")
.where(_.`type` == "c")
.followedBy("middle")
.where(_.`type` == "a")
.optional();
//.within(Time.seconds(2))
下面我们来分析下以上伪代码:
Event(0, "a");
Event(1, "c");
Event(2, "b");
Event(3, "b");
Event(4, "a");
Event(5, "d");
1、当第一条消息event0来的时候,由于start状态只有Take状态迁移边,这时event0匹配失败,消息被丢失,start状态不发生任何变化;
2、当第二条消息event1来的时候,匹配成功,这时用event1更新start当前状态,并且进入下一个状态,既mid状态。而这时我们发现mid状态存在Proceed状态迁移边,意味着事件来临时,可以直接进入下一个状态
,这里就是endstat
状态(endstate是在编译的时候自动加上的
),说明匹配结束,存在第一个匹配结果[event1];
3、当第三条消息event2来临时,由于之前我们已经进入了mid状态,所以nfa会让我们先匹配mid的条件,匹配失败,由于mid状态存在Ingore状态迁移边,所以当前mid状态不发生变化,event2继续往回匹配start的条件,匹配失败
,这时event2被丢弃;
4、同样的event3也不会影响nfa的所有状态,被丢弃。
5、当第五条消息event4来临时,匹配mid的条件成功,更新当前mid状态,并且进入“下一个状态”,那就是endstat状态,说明匹配结束,存在第二个匹配结果[event1, event4]。
Flink-CEP 共享版本匹配缓存SharedBuffer,解决缓存膨胀问题
a b+ c
"a", "b1", "d1", "b2", "d2", "b3" "c"将具有以下结果:
Strict Contiguity 严格连接: {a b3 c}–"d1"后"b1"因"b1"被丢弃,同样发生在"b2"因为"d2"。
宽松的连续性:{a b1 c}, {a b1 b2 c}, {a b1 b2 b3 c}, {a b2 c}, {a b2 b3 c}, {a b3 c}-"d"被忽略。
非确定性松弛邻接:{a b1 c}, {a b1 b2 c}, {a b1 b3 c}, {a b1 b2 b3 c}, {a b2 c}, {a b2 b3 c}, {a b3 c}
- 注意{a b1 b3 c},这是"b"'s之间松弛邻接的结果。
以上匹配过程中,每个状态需要缓冲堆栈来保存匹配成功的事件,我们把各个状态的对应缓冲堆栈集称之为缓冲区,由于上述例子有多少种输出,理论上我们需要创建多少个独立的缓冲区,太浪费内存
image.png
其中图a、b、c是原始的R1、R2、R3缓存,图d则是整合在一起的共享版本缓存。它会将所有序列的前向指- 针附加上一个版本号(采用杜威十进制法,点号分隔),并且遵循以下两个规则:
1、迁移到下一个状态时,版本号增加一位,如a[1]状态的版本号是1(为了符合习惯写作1.0),a[i]状态的版本号是1.0、1.1,b状态的版本号是1.0.0、1.1.0……以此类推;
2、当序列发生分裂时,处于当前状态的版本号位加1。例如e3事件产生了2.0版本,e6事件产生了1.1版本。
依照这种规则,就可以根据前向指针上版本号的递增规律和前缀来回溯出正确的序列了。Flink CEP中将此缓存设计为SharedBuffer类,但是版本的设计有些不同。
计算状态
对于每一个序列,NFA自动机还需要维护一些最基础的状态数据,以方便执行状态转移和匹配逻辑,论文中将其称为computation state
,即计算状态。基础的计算状态结构如下图所示,包含以下数据项:
当前的版本号;
当前的状态;
指向匹配缓存中最近一个事件的指针;
整个序列的起始时间;
其他必要的上下文数据存储。
感谢
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/cep/#individual-patterns
https://www.cxyzjd.com/article/weixin_37766087/105874539
https://cloud.tencent.com/developer/article/1705503
网友评论