作者|陈宇飞
编辑|Emily
AI前线出品| ID:ai-front
Google 在 2013 年发了一篇非常重要的 paper,来教大家 Google 是如何在 stream processing (事件流处理)方面做到 exactly once semantic 的,叫 MillWheel。这个实现并不是最早做到 exactly once 的(可能 trident 会稍微早一点),但是这里面通过 low watermark 和 per key storage 这两个概念来做绝对是创新。
为了做到 exactly once semantic,我们得先介绍几个概念
1、Persistent storage:在储存事件流处理完的结果之时,得有一个系统可以永久储存这些处理完的结果,因为很多处理完的结果是需要被再次调用的
2、Low watermark:事件流是从世界各地发往 Google 的服务器的,所以事件流在各个数据中心之间的传输是有延时的。 所以 MillWheel 需要你提供一个时间区间,所有的数据应该都在这个时间区间里面到达你的事件流处理器
3、Duplicate prevention:每一行来到的重复的数据都会被删除
每一行的 MillWheel 数据可以被理解成三个值:key,value 和 timestamp。在这里 lower watermark 是根据每个发送过来的时间戳计算的
数据处理作为一个整体可以根据用户提供的 DAG(无回路图)来决定这些事件流的内容被如何处理,这样用户可以叠加各种各样的计算方式。比如我在一个不知名高逼格问答网站上面点击了一个广告,这个点击可以根据广告 id,广告主 id,用户 id 来做聚合,但是处理逻辑是分开的,但是都可以在 MillWheel 的框架下面执行。
一个特别傻的用 MillWheel 做的某问答网站收钱系统具体 MillWheel 框架给每一行数据提供的保证是:每一行处理的数据都会根据每一个 key 做一个 checkpoint,而且每一行数据只提供一次。我们接下来看一看具体 MillWheel 是怎么做到这个保证的,以及我们可以如何利用这个保证。
既然我们要根据每一个 key 来做 checkpointing,那么每一行数据都得有一个可以把 key 从数据里面读取出来的逻辑。Google 内部有很多 deserialization protocol,比如 protobuf,会被用来做 key 的读取。这里 key 要看具体你需要处理的业务逻辑是什么,假设是一个广告系统的话广告主的 ID 或者点击广告的用户的 ID 都是一个合理的 key。
在提供一个 key 之后,MillWheel 还会提供一个 per key persistent storage,让你来更好处理你的业务逻辑。比如我需要给广告主提供的是每个广告每五分钟多少次点击,但是用户我不想每一个用户都存那么多东西,那可能每一个用户的点击我只要存 hyperloglog 就可以了,只要看他最近有没有很多点击来判断他是不是机器人,这个点击是否有效。
当然,并不是每一个广告的点击都会被送到系统里面:在获取 key 的时候我们也会获取这一行数据的时间戳,这个时间戳会被用来计算 low watermark。low watermark 的定义是现在已经到达 MillWheel 但是还没有被处理的数据的时间戳,但是这个时间戳不会超过一个用户定义的上线、不能比当下的时间晚太多。所以只要超过用户定义的时间的时间范围的数据,就是迟到的数据,迟到的数据的状态是不会被存到内存里面的。这个设计的厉害之处在于,如果数据处理一直很快,且所有的消息都没有迟到,那么 low watermark 会很接近现实的时间。如果数据出现迟到,再迟也不会超越用户设定的上限。
因为这里有一个 low watermark 的概念,那么我们就得确保有一个服务可以计算 low watermark。MillWheel 的设计是每一个事件流处理器会回报自己的最老还没处理的数据的时间戳,然后 Injector 会从每一个处理器收集最迟的时间戳。要收集最迟的时间戳因为每一个处理器的 watermark 应该都是一样且应该是最保守的。
每次处理每一行的数据的时候,只要过了 low watermark,MillWheel 需要做下面这些事情:
1、检查这个数据是不是重复了
2、处理用户提供的逻辑
3、所有的 state 存到数据库里
4、告诉发送数据的服务器已经处理完毕
5、服务器发送后面的数据发送给处理器
这里发送端是会给每一行 unique ID 的,然后接收端根据这个 unique ID 去做 dedup。有的时候为了优化速度,可以一次性从服务器拿很多行数据一起处理。
这里还有几个比较复杂的状况我们需要考虑。比如输出数据也是需要 checkpoint 的,不然的话有可能在同一个时间区间输出两个截然不同的数据,因为之前聚合结束的 state 没有被存下来。通过 checkpoint 输出,整个数据处理直接变成了一个 idempotent 的服务。当然本身有些数据处理就是 idempotent 的,那么这个时候可以省略 dedup,或者先 broadcast 给下游这个输出再 checkpoint。
这里还有一个需要注意的地方是每一个 key 必须只有一个 writer,而且每个 key 的 state 在储存的时候必须是 atomic 的,不然是没有办法保证每一个 key 的 state 是 consistent 的。
MillWheel 里面大概所有的流程论文后面主要讨论的是在 deploy 了之后效果如何以及一些 edge case,我这里摘几个比较有意思的
low watermark的计算是有延时的。整体来说再快的数据处理可能还是有两秒左右的延迟 延迟不会因为增加了机器的数量就减少,因为出现的慢的服务器可能性更大最后我再讲两句为什呢这个问题很重要。在一个互联网广告公司里面,各种各样的事件是要根据数据流来收钱的。Google 要拿着广告的点击去跟其他公司收钱,YouTube 要拿着它的 video view 去跟其他公司收钱。在这些情况下,收对钱是一件非常重要的事情:你要是钱收少了,公司遭受损失;你要是钱收多了,你的数据会跟第三方做审核的公司数据出现出入,会出现非常严重的商誉问题。所以这个数据流处理在这方面不能多不能少,最好每一行只处理一次只收一次钱。
这时候你可能想退一步,说为了处理收钱这个问题,我能不能直接每天或者每小时做一次 dedup,然后再回复给用户说你的广告拿到了多少点击,我要收你多少钱。这里还有一个问题是在很多情况下收钱这件事情是实时汇报给广告商的,因为广告商最好是是有能力随时看到自己的广告到底效果如何,然后可以选择增加或者减少预算。甚至在某些特定的情况下(比如 Superbowl 或者黑色星期五),广告商其实是本着“我今天就是要花这么多钱,哪个平台上面撒出去我是不管的”,那这个时候实时的 reporting 就变得尤其的重要。
最近讲的 system 的 paper 有点多,但是我书单还是没有读完的。等系统的书单读完了一定给各位写点 ML 相关的论文。
阅读原文链接:
https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf
本文系作者陈宇飞原创文章,已经授权 AI 前线公众号转发传播。
-全文完-
关注人工智能的落地实践,与企业一起探寻 AI 的边界,AICon 全球人工智能技术大会火热售票中,8 折倒计时一周抢票,详情点击:
《深入浅出TensorFlow》迷你书现已发布,关注公众号“AI前线”,ID:ai-front,回复关键字:TF,获取下载链接!
网友评论