美文网首页Flink
对 Flink CEP 的理解

对 Flink CEP 的理解

作者: 丹之 | 来源:发表于2018-10-29 22:30 被阅读593次

    Flink CEP 是 Flink 的复杂处理库。它允许用户快速检测无尽数据流中的复杂模式。不过 Flink CEP 仅可用于通过 DataStream API处理。 参考细说Flink CEP,我们知道Flink 的每个模式包含多个状态,模式匹配的过程就是状态转换的过程,每个状态(state)可以理解成由Pattern构成,为了从当前的状态转换成下一个状态,用户可以在Pattern上指定条件,用于状态的过滤和转换。
    实际上Flink CEP 首先需要用户创建定义一个个pattern,然后通过链表将由前后逻辑关系的pattern串在一起,构成模式匹配的逻辑表达。然后需要用户利用NFACompiler,将模式进行分拆,创建出NFA(非确定有限自动机)对象,NFA包含了该次模式匹配的各个状态和状态间转换的表达式。整个示意图就像如下:

    上图中的三个pattern通过编译生成了NFA,NFA包含了四个状态,其中endstate是在编译的时候自动加上的,来作为终止状态。状态间转换是通过箭头表示的状态迁移边(StateTransition)来实现的,我们注意到state2做状态迁移的时候存在三条边(take,proceed,ingore),为什么有的状态只有一条边?有的状态有两条边?有的状态上有三条边?我们这里先埋个伏笔,之后我们会做解释。

    Flink-cep 实例讲解

    Flink-cep 三种状态迁移边

    在文章开始的时候,我们谈到了每个状态迁移会涉及到三类状态迁移边,分别是Take、Proceed、Ingore。

    • Take: 表示事件匹配成功,将当前状态更新到新状态,并前进到“下一个”状态;
    • Procceed: 当事件来到的时候,当前状态不发生变化,在状态转换图中事件直接“前进”到下一个目标状态;
    • IGNORE: 当事件来到的时候,如果匹配不成功,忽略当前事件,当前状态不发生任何变化。
    /构建链接patterns Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c");
                }
            }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a");
                }
            }).optional(); //创建nfa NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); 
    

    构建的nfa的状态示意图如下所示:


    Flink-cep 共享缓存SharedBuffer

    Flink-cep 模式匹配过程

    Flink CEP做模式匹配,是依赖模式流(PatternStream)来实现的,一个PatternStream对象表示模式检测到的序列所对应的流。为了使用PatternStream,我们首先要构建它,为此Flink提供了一个名为CEP的帮助类,它定义了一个pattern静态方法,将数据流和pattern作为形参传入。

    DataStream<String> inputStream = ... Pattern<String, ?> pattern = ...
    PatternStream<String> patternStream = CEP.pattern(inputStream, pattern); 
    

    然后我们会在PatternStream对象上调用select或flatSelect来获取某个模式下匹配到的事件来实现我们的业务逻辑。一般select的实现如下:

    public  SingleOutputStreamOperator select(final PatternSelectFunction patternSelectFunction, TypeInformation outTypeInfo) { //创建DataStream SingleOutputStreamOperator<Map<String, List>> patternStream =
                    CEPOperatorUtils.createPatternStream(inputStream, pattern); return patternStream.map( new PatternSelectMapper<>(
                    patternStream.getExecutionEnvironment().clean(patternSelectFunction)))
                .returns(outTypeInfo);
        } 
    

    在select里头主要还是通过CEPOperatorUtils.createPatternStream创建DataStream,在对createPatternStream进行分析,首先通过

     final NFACompiler.NFAFactory nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, false); 
    
    

    创建NFA工厂,然后将nfaFactory作为参数,创建CEP Operator对象。构建NAF工厂的过程,其实就是将pattern间的逻辑关系,转换成具体的各个状态,并用状态迁移边去描述状态间的迁移关系。具体的实现如下:

    public static  NFAFactory compileFactory(
        Pattern pattern,
        TypeSerializer inputTypeSerializer,
        boolean timeoutHandling) { if (pattern == null) { //如果模式为null,返回一个NFA工厂的实现,且不传递任何状态,意味着将创建一个空的NFA对象 return new NFAFactoryImpl(inputTypeSerializer, 0, 
                Collections.>emptyList(), timeoutHandling);
        } else { //构建一个Map来存储所有生成的状态 Map<String, State> states = new HashMap<>();
            long windowTime;
    
            Pattern succeedingPattern;
            State succeedingState;
            Pattern currentPattern = pattern; //构建最终态,并加入到Map中,这里将会从Pattern的尾部向头部进行遍历,所以构建的第一个状态是尾部的最终态 State currentState = new State<>(currentPattern.getName(), State.StateType.Final);
            states.put(currentPattern.getName(), currentState); //提取当前Pattern对象的窗口时间 windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() 
                : 0L; //不断向前遍历(不包含第一个Pattern对象) while (currentPattern.getPrevious() != null) { //相关变量交换 succeedingPattern = currentPattern;
                succeedingState = currentState;
                currentPattern = currentPattern.getPrevious(); //获得窗口时间 Time currentWindowTime = currentPattern.getWindowTime(); //如果当前Pattern的窗口时间比其之前Pattern的窗口时间小,则将之前Pattern的窗口时间更新为新的窗口时间 if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) {
                    windowTime = currentWindowTime.toMilliseconds();
                } //获取或构建状态 if (states.containsKey(currentPattern.getName())) {
                    currentState = states.get(currentPattern.getName());
                } else {
                    currentState = new State<>(currentPattern.getName(), State.StateType.Normal);
                    states.put(currentState.getName(), currentState);
                } //为当前状态设置跟后一个状态之间的转换(边),注意状态转换是“TAKE”,这里同时传入了Pattern所注入的条件 currentState.addStateTransition(new StateTransition(
                    StateTransitionAction.TAKE,
                    succeedingState,
                    (FilterFunction) succeedingPattern.getFilterFunction())); //如果后一个模式是非紧邻模式,则为当前状态构建自循环的“IGNORE”转换 if (succeedingPattern instanceof FollowedByPattern) {
                    currentState.addStateTransition(new StateTransition(
                        StateTransitionAction.IGNORE,
                        currentState, null ));
                }
            }
    
            final State beginningState; //获取或构建起始状态 if (states.containsKey(BEGINNING_STATE_NAME)) {
                beginningState = states.get(BEGINNING_STATE_NAME);
            } else {
                beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
                states.put(BEGINNING_STATE_NAME, beginningState);
            } //添加状态转换(起始状态,只能通过“TAKE”向下一状态转换) beginningState.addStateTransition(new StateTransition(
                StateTransitionAction.TAKE,
                currentState,
                (FilterFunction) currentPattern.getFilterFunction()
            )); //以所有的状态构建NFAFactoryImpl对象,它将用来创建NFA对象 return new NFAFactoryImpl(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
        }
    } 
    

    在CEP Operator的processElement是用来处理消息的,这里头一般会调用NFA对象的process方法,逐个处理事件,所以真正处理事件的逻辑被封装在NAF对象里头。另外需要注意的是,如果是普通数据流,其并行度被设置为1,也就是整个数据流没办法分区以并行执行,而是作为一个全局数据流参与模式匹配。这一点其实不难想象,因为我们在分析模式时,其有事件选择策略(严格紧邻还是非严格紧邻),也就是说事件前后顺序是模式的一部分,那么这时候如果普通事件流再分区执行,将会打破这种顺序,从而导致匹配失效。Process方法主要借助ComputationState对象,来记录计算过的事件的状态,具体的剖析我们可以参考Flink-CEP之NFA,这里就不再赘述,这部分实现逻辑分析有点过时,但是可以帮助更好的理解。

    来源:
    http://blog.chinaunix.net/uid-29038263-id-5765739.html

    相关文章

      网友评论

        本文标题:对 Flink CEP 的理解

        本文链接:https://www.haomeiwen.com/subject/rboitqtx.html