什么是FlinkCEP
FlinkCEP(Complex event processing for Flink) 是在Flink实现的复杂事件处理库. 它可以让你在无界流或有界流中检测出特定的数据,有机会掌握数据中重要的那部分。
是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤
、关联
、聚合
等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。
- 目标:从有序的简单事件流中发现一些高阶特征
- 输入:一个或多个由简单事件构成的事件流
- 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
- 输出:满足规则的复杂事件
总结:FlinkCEP 用于编写规则,实现对流数据进行处理,最终获取我们想要的结果。
Flink CEP应用场景
-
风险控制
对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。 -
策略营销
用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。 -
运维监控
灵活配置多指标、多依赖来实现更复杂的监控模式。
Flink CEP入门案例
- 导入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.13.0</version>
</dependency>
- 给定一个流(必须指定水印)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
// 读取数据封装成javaBean
SingleOutputStreamOperator<Sensor> returns = source.flatMap((FlatMapFunction<String, Sensor>) (value, out) -> {
out.collect(new Sensor(value.split(",")));
}).returns(Types.POJO(Sensor.class));
// 添加水印
SingleOutputStreamOperator<Sensor> operator = returns.assignTimestampsAndWatermarks(
// 最大乱序程度
WatermarkStrategy.<Sensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((SerializableTimestampAssigner<Sensor>) (element, recordTimestamp) -> element.getTs() * 1000)
);
为了方便,将数据封装成了对象
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Sensor {
private Integer id;
private Long ts;
private Long vc;
public Sensor(String[] args){
this.id=Integer.valueOf(args[0]);
this.ts=Long.valueOf(args[1]);
this.vc=Long.valueOf(args[2]);
}
}
- 定义规则
// 定义模式
Pattern<Sensor, Sensor> beginPattern = Pattern.<Sensor>begin("begin")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
});
Pattern 别导错了
import org.apache.flink.cep.pattern.Pattern;
begin :cep 都是从 begin开始。
<X> Pattern<X, X> begin(final String name)
每个模式都必须有一个唯一的名称
,稍后您可以使用该名称来标识匹配的事件。
模式名称不能包含字符":"。
begin("begin")
- 将CEP 应用到流中
PatternStream<Sensor> pattern = CEP.pattern(watermarks, beginPattern);
SingleOutputStreamOperator<String> select = pattern.select((PatternSelectFunction<Sensor, String>)
// 从"begin" 名称中,获取模式里的数据
pattern1 -> pattern1.get("begin").toString());
- 返回结果
select.print();
env.execute();
- sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
- 程序
@Test
public void test01() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.readTextFile("input/sensor.txt");
//DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
// 读取数据封装成javaBean
SingleOutputStreamOperator<Sensor> flatMapStream = source.flatMap((FlatMapFunction<String, Sensor>) (value, out) -> {
out.collect(new Sensor(value.split(",")));
}).returns(Types.POJO(Sensor.class));
// 添加水印
SingleOutputStreamOperator<Sensor> watermarks = flatMapStream.assignTimestampsAndWatermarks(
// 最大乱序程度
WatermarkStrategy.<Sensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((SerializableTimestampAssigner<Sensor>) (element, recordTimestamp) -> element.getTs() * 1000)
);
// 定义模式
Pattern<Sensor, Sensor> beginPattern = Pattern.
<Sensor>begin("begin")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
});
// CEP
PatternStream<Sensor> pattern = CEP.pattern(watermarks, beginPattern);
SingleOutputStreamOperator<String> select = pattern.select((PatternSelectFunction<Sensor, String>)
pattern1 -> pattern1.get("begin").toString());
select.print();
env.execute();
}
- 运行结果,都是
sensor_1
的数据结果。
11> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
10> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20)]
12> [Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
模式API
单个模式
单个模式可以是单例模式或者循环模式.
单例模式
单例模式只接受一个事件
. 默认情况模式都是单例模式.
前面的例子就是一个单例模式
循环模式
循环模式可以接受多个事件
.
单例模式配合上量词
就是循环模式.(非常类似我们熟悉的正则表达式)
- times(n)
// 定义模式
Pattern<Sensor, Sensor> beginPattern = Pattern.
<Sensor>begin("begin")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
}).times(2);
用于循环匹配,必须满足N条数。
匹配规则:设 time(2)
原始数据
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
首先匹配到 sensor_1,1,10 ,因为必须要为2,所以就会一直等着,直到遇到另一个sensor_1
的数据
(sensor_1,1,10 | sensor_1,2,20)
sensor_1,1,10 匹配到之后,就不再匹配了,sensor_1,2,20继续往后匹配,直到遇到sensor_1,4,40
(sensor_1,2,20 | sensor_1,4,40)
sensor_1,4,40 同样等着,直到遇到下一个sensor_1
(sensor_1,4,40 | sensor_1,6,60)
因为流式处理,所以
sensor_1
与sensor_1
可能并不是有序的,中间可能相差好几条数据,但是匹配规则,就是满足了
time(n)
的退出,否则一直等着。若程序结束都没有遇到,就自动释放(不输出,因为不满足time(n)
)。
运行结果
9> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20)]
11> [Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
10> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
-
timesOrMore(n)
匹配规则和times(n)
类似,timesOrMore(n)
匹配规则则是n
条及n
条以上。
原始数据
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
匹配规则:设 timesOrMore(2)
第一次进来的是sensor_1,1,10
,不满足timesOrMore
的条件,所以等着。
第二次:进来sensor_1,2,20
,满足timesOrMore
的条件,记录起来。sensor_1,2,20
需要等待下一次sensor_1
的数据
(sensor_1,1,10 | sensor_1,2,20)
第三次:sensor_1,4,40
,sensor_1,1,10
满足, sensor_1,2,20
也满足
(sensor_1,1,10 | sensor_1,2,20 | ensor_1,4,40)
(sensor_1,2,20 | ensor_1,4,40)
第四次:sensor_1,6,60
,sensor_1,1,10
满足, sensor_1,2,20
满足,ensor_1,4,40 满足
(sensor_1,1,10 | sensor_1,2,20 | ensor_1,4,40 | sensor_1,6,60)
(sensor_1,2,20 | ensor_1,4,40 | sensor_1,6,60)
(ensor_1,4,40 | sensor_1,6,60)
最终输出结果
多并行度,输出顺序不一致,但是整体结果和上面推断是一致的。
10> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20)]
15> [Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
14> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
12> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
11> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
13> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
-
.times(n,m)
或许你已经发现了timesOrMore
的弊端,若数据中有10000条sensor_1
,那么最后一个结果绝对有一万条数据。这样的结果,很有可能造成OOM。所以有必要的可以限定的以下次数,比如使用times(n,m)
,它表示一个范围,从n
开始到m
结束(包含m
),比如 我们只要 2条,3条,4条的数据,那么就可以写成times(2,4)
原始数据
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
匹配规则:设 times(2,3)
第一次:sensor_1,1,10
,不满足,等着
第二次:sensor_1,2,20
,sensor_1,1,10
满足,sensor_1,2,20
等着
(sensor_1,1,10,sensor_1,2,20)
第三次: sensor_1,4,40
,sensor_1,1,10
满足,sensor_1,2,20
满足,sensor_1,4,40
等着
(sensor_1,1,10,sensor_1,2,20,sensor_1,4,40)
(sensor_1,2,20,sensor_1,4,40)
第四次,sensor_1,6,60
, 因为上限为3,sensor_1,1,10
不再往下执行,sensor_1,2,20
满足,sensor_1,4,40
满足,sensor_1,6,60
等着,直到遇到sensor_1
(sensor_1,2,20,sensor_1,4,40,sensor_1,6,60)
(sensor_1,4,40,sensor_1,6,60)
最终输出
上限是3,所以列表长度最大为3。
1> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
3> [Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
16> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
15> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20)]
2> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
times(n,m) 与 timesOrMore(n) 的区别就是,timesOrMore(n) 没有上限。
-
oneOrMore()
作用就是一次或多次,等价于timesOrMore(1)
条件
对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,例如前面用到的where就是一种条件
- 迭代条件
这是最普遍的条件类型
。使用它可以指定一个基于前面已经被接受的事件的属性或者它们的一个子集的统计数据来决定是否接受时间序列的条件。
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.<WaterSensor>begin("start")
.where(new IterativeCondition<WaterSensor>() {
@Override
public boolean filter(WaterSensor value, Context<WaterSensor> ctx) throws Exception {
return "sensor_1".equals(value.getId());
}
});
- 简单条件
这种类型的条件扩展了前面提到的IterativeCondition类,它决定是否接受一个事件只取决于事件自身的属性。
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.<WaterSensor>begin("start")
.where(new SimpleCondition<WaterSensor>() {
@Override
public boolean filter(WaterSensor value) throws Exception {
System.out.println(value);
return "sensor_1".equals(value.getId());
}
});
- 组合条件
把多个条件结合起来使用. 这适用于任何条件,你可以通过依次调用where()来组合条件。 最终的结果是每个单一条件的结果的逻辑AND。
如果想使用OR来组合条件,你可以像下面这样使用or()
方法。
匹配 sensor_1
和 sensor_7
的数据
原始数据
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30
程序
// 定义模式
Pattern<Sensor, Sensor> beginPattern = Pattern.
<Sensor>begin("begin")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
}).or(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_7".equals(value.getId());
}
});
运行结果
4> [Sensor(id=sensor_7, ts=7, vc=30)]
1> [Sensor(id=sensor_1, ts=2, vc=20)]
2> [Sensor(id=sensor_1, ts=4, vc=40)]
16> [Sensor(id=sensor_1, ts=1, vc=10)]
3> [Sensor(id=sensor_1, ts=6, vc=60)]
若是要用and
条件呢?CEP 并没有and()
,但是指定多个where()
,作用和and
一样。
// 定义模式
Pattern<Sensor, Sensor> beginPattern = Pattern.
<Sensor>begin("begin")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
}).where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return value.getTs()<4;
}
});
运行结果
14> [Sensor(id=sensor_1, ts=1, vc=10)]
15> [Sensor(id=sensor_1, ts=2, vc=20)]
- 停止条件(until)
如果使用循环模式
(oneOrMore, timesOrMore), 可以指定一个停止条件, 否则有可能会内存吃不消.
意思是满足了给定的条件的事件出现后,就不会再有事件被接受进入模式了。
原始数据
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30
当程序遇到sensor_2
时就停止
// 定义模式
Pattern<Sensor, Sensor> beginPattern = Pattern.
<Sensor>begin("begin")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
}).timesOrMore(2).until(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_2".equals(value.getId());
}
});
运行结果
8> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20)]
异常:until 只能用在循环条件。
org.apache.flink.cep.pattern.MalformedPatternException: The until condition is only applicable to looping states.
组合模式
把多个单个模式组合在一起就是组合模式. 组合模式由一个初始化模式(.begin(...))开头
- 原始数据
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30
-
严格连续(严格紧邻)
比如sensor_1
与sensor_1
之间必须相邻的比如:sensor_1,1,10
和sensor_1,2,20
,sensor_1,2,20
和sensor_1,4,40
就不行,之间还相隔着sensor_2,3,30
。
如何实现?使用next()
,就表示严格紧邻
// 定义模式
Pattern<Sensor, Sensor> beginPattern = Pattern
.<Sensor>begin("start")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
})
.next("next")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
});
输出结果,只有sensor_1,1,10
满足
12> [Sensor(id=sensor_1, ts=1, vc=10)]
严格模式还有一个很重要特性,就是能够解决乱序问题
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,3,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30
试问:此时运行sensor_1,2,20
是否满足?是否算得上与sensor_1,3,40
为相邻?
12> [Sensor(id=sensor_1, ts=1, vc=10)]
13> [Sensor(id=sensor_1, ts=2, vc=20)]
虽然再流输入的顺序上,
sensor_1,3,40
是靠在sensor_2,4,30
后的,但是从时间上来说,sensor_1,2,20
和sensor_1,3,40
是有序的。内部上是有一个类似排序机制,但是在实时上,可能就不好说了,这里为了测试,用得离线的方式,所以是没有问题的。
-
松散连续
忽略
匹配的事件之间
的不匹配的事件。
原始数据
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30
followedBy("by"):*松散连续
// 定义模式
Pattern<Sensor, Sensor> beginPattern = Pattern
.<Sensor>begin("start")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
})
.followedBy("by")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
});
输出结果
9> [Sensor(id=sensor_1, ts=1, vc=10)]
10> [Sensor(id=sensor_1, ts=2, vc=20)]
11> [Sensor(id=sensor_1, ts=4, vc=40)]
-
非确定的松散连续
更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配
当且仅当数据为a,c,b,b时,对于followedBy模式而言命中的为{a,b},对于followedByAny而言会有两次命中{a,b},{a,b}
原始数据
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30
// 定义模式
Pattern<Sensor, Sensor> beginPattern = Pattern
.<Sensor>begin("start")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
})
//.next("next")
.followedByAny("by")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
});
输出结果
6> [Sensor(id=sensor_1, ts=1, vc=10)]
8> [Sensor(id=sensor_1, ts=4, vc=40)]
4> [Sensor(id=sensor_1, ts=1, vc=10)]
3> [Sensor(id=sensor_1, ts=1, vc=10)]
5> [Sensor(id=sensor_1, ts=2, vc=20)]
7> [Sensor(id=sensor_1, ts=2, vc=20)]
网友评论