美文网首页风控
利用Flink实现超大规模用户行为分析

利用Flink实现超大规模用户行为分析

作者: 尼小摩 | 来源:发表于2018-06-29 17:43 被阅读2592次

    Flink 作为底层的流处理框架。主要出于以下几点原因:

    第一,Flink 是一个纯流式系统,吞吐量实际测试可达 100K EPS。而不像某些框架是用 mini batch 的模式来达到所谓的流式处理的;
    第二,面对不同的用户数据格式,我们必须支持多种数据源,这一点上 Flink 内置的对多种数据源的支持(CSV,Kafka,Hbase,Text,Socket 数据等)也为用户数据的接入提供了便利;
    第三,Flink 强大的窗口机制(包括翻转窗口,滑动窗口,session 窗口,全窗口以及允许用户自定义窗口)可以满足复杂的业务逻辑,使得用户可以编写复杂的业务规则;
    第四,Flink 内置的 RocksDB 数据存储格式使其数据处速度快且资源消耗少,在 Checkpoint 上起到了至关重要的作用;
    第五,Flink 对算子(operator)的高可控性,使得用户可以灵活添加删除或更改算子行为。这一点对于动态部署有着至关重要的意义。

    规则引擎方面我们有两个选择:

    Flink 原生 CEP 组件和 Drools 规则引擎。那么两者各有什么优势和劣势呢?首先我们看一下 Flink CEP。当前稳定的 Flink1.3 版本的 CEP 是一套极具通用性、易于使用的实时流式事件处理方案。作为 Flink 的原生组件,省去了第三方库与 Flink 配合使用时可能会导致的各种问题。但其功能现阶段看来还比较基础,不能表达复杂的业务场景,同时它不能够做到动态更新(这是一个痛点)。具体如何解决我们稍后会看到。

    什么是 Drools?

    Drools 是一套基于 JVM 的,实现了 RETE 算法的规则引擎。它可以将多变的规则从硬编码中解放出来,以规则脚本的形式存在。右边图中显示的是一个典型的 Drools 规则的定义方式。可以看到,其语义与 Java 非常类似。既可以导入既有的 Java POJO(图中 Person 类),也可以在规则文件中直接定义类(EventA)。when 语句中是具体的判断条件,then 语句中是满足判断条件之后所做的操作。操作可以是任意的,不仅限于对满足条件的那个对象进行操作。比如你可以在 then 里调用某个 Java 类的方法,或者调用某个全局变量。总之,可以在 Drools 规则文件中 import Java 类,然后对其进行操作。

    Drools 有些什么优缺点呢?

    它最大优势在于语法规则简单,类似 Java,编写门槛不高、能够无缝化与 Java 集成,且用户可以对 Drools 规则进行动态配置。但这套方案也存在着自己的不足之处:例如其内置聚合功能速度缓慢,不适合我们自身或者客户使用场景下的大量聚合操作任务。另外,其内置事件序列处理机制也需要消耗大量内存资源。

    下面我们来看一个具体的例子。

    可以看到我们这里有一条检测 VPN 可疑行为的规则。规则当中包含三条判断条件。
    第一条 metric 用来判断一小时能登录失败的次数。
    第二条演示的是用户与设备之间的实体关系,表达式 expression == “[vpn.user, vpn.device]”说明了这一点。
    第三条演示的是在序列算法下异常值大于 50 的行为。
    最后会将满足条件的三个行为收集起来发送给下游的模块。下游模块可以是另一个算子,或者是持久化结果的 DB。

    有了 Flink 作为流计算引擎,有了 Drools 作为规则引擎,那么我们如何将两者结合放到一个系统里发挥作用呢。我们需要做的是将源数据输入到 Flink 生成所谓的事件流,同时将 Drools 规则文本读取到 Flink 生成所谓的规则流。而 Flink 中提供了一个 CoFlatMapFunction 可以将两个流结合起来进行分析。在这个 function 里我们所要做的就是将在 Flink 里结合机器学习算法计算出来的结果与 Drools 规则进行匹配。

    但事实上,这个方案在实际运行当中会有一些性能上的问题。这些问题主要表现在长周期行为的分析上。比如,机器学习算法需要对长周期行为(数据往往跨越三个月)进行计算,得出异常值。那么这种情况下我们需要维护算法生成的长周期行为的状态。具体方法可以是直接保存在 Drools Engine 中,或者将其保存在外部 DB 中,再或者可以利用 Flink 的 stateful operator 来维护状态。但现有情况下,每种方法都多多少少会有一些问题。接下来我们看看具体问题都有哪些。

    需要保存过往窗口的状态,作为中间结果送入 Drools 规则引擎进行计算。Flink 内置的窗口机制在窗口结束时会清除窗口状态。 Flink内置的RocksDB存储结构在窗口清理时会自动删除数据。 Flink产生的长周期聚合结果被送入 Drool 规则引擎进行匹配的时候往往会消耗大量内存。可以看到,主要的痛点就在于中间结果的维护和资源消耗的问题。面对这些问题我们可以尝试以下的做法。

    首先想到的是用 redis,memcached 之类的 KV store 来保存中间结果。但实际测试结果表明,它们的性能赶不上 Flink 的速度。所以在追求高吞吐量的情况下,此方法行不通。其次,可以通过修改 Flink RockDB backend 的源码来解决窗口清理时自动删除数据的问题。同时为了保证过期数据不挤压,需要引入“TTL”(time to live)属性,是的 rocksdb 在超时的时候自动删除过期数据。内存问题主要是由 Drools 引擎引起的。因为每一条事件与规则匹配都会生成一个 Fact,默认情况下 fact 无论是否匹配,Drools 都不会立刻删除它。你必须手动的删除它。但当事件数量过大或者规则数量过大时,即使你手动删除没有匹配的 fact,可能也会出现某一时间段大量 fact 存在于内存中的情况。所以可行的办法是设定阈值来控制内存中允许同时存在的 fact 的数量,同时清理失效的 fact。或者也可以尽量保持规则简单化。复杂的聚合规则交给 Flink 去做。

    可以看到,以上方案所产生的性能问题主要在于 Drools。其实除了以上的方案,我们还有一个 Plan B。Flink1.4 Snapshot 版本增加了一些新功能。利用这些新功能,我们可以直接使用 Flink CEP 并做到动态更新。这些功能主要包括:新版本加入了对算子粒度的操作。我们可以 checkpoint 某一个算子的状态。同时 Flink CEP 中新增了 pattern group 的概念。可以将多个规则 pattern 归为同一个 group。这样增加了规则的表达能力。利用这些功能,我们重新设计了一个系统来实现规则的动态更新。下面我们来看一下新设计的工作流程。

    简单来讲,整个工作流程就是用户更新规则,新规则被翻译成 Java 源码,然后编译并打包成可执行 jar,这个时候系统将触发 Flink 的 Savepoint,保存当前 operator 的状态,然后 cancel 当前运行的 Flink Job,然后把新生成的 jar 发布到 Flink 上去,同时读取最新的 operator 状态,恢复整个系统的运行。值得提出的一点是,根据规则文件里规则的数量和复杂度。我们可以划分规则生成多个 jar 发布到 Flink 上。这样单个 job 的负载就不至于过高。这种动态生成规则代码的方式扩展性和并发性更出色,不存在单一大负载算子。缺陷在于从 Savepoint 到整个流程恢复会有数秒延迟。

    相关文章

      网友评论

        本文标题:利用Flink实现超大规模用户行为分析

        本文链接:https://www.haomeiwen.com/subject/fnnayftx.html