美文网首页
Naiad: A Timely Dataflow System

Naiad: A Timely Dataflow System

作者: 西部小笼包 | 来源:发表于2019-12-16 19:48 被阅读0次

    难度:五星
    (论文地址)[http://nil.csail.mit.edu/6.824/2018/papers/naiad.pdf]

    这篇论文十分难啃,应该是除了RAFT之外,目前我读到最难啃的一篇。网上讨论的资料也不多。我这篇解读也是从一个HIGH LEVEL的视角去介绍我的理解。

    这个论文,我的阅读经验是,反复看,不懂的就去网上搜搜。然后再回来看
    没错这个就是上一章SPARK讲到的迭代。哈哈

    为什么要学习naiad

    首先他是SPARK的补充,他解决SPARK不擅长解决的问题,增量计算和流处理。
    其次,他设计优雅,性能很好,目前开源。

    回忆一下,SPARK在迭代计算的场景下提高了性能。相同的DATA被多次使用。但是如果输入的数据发生了改变怎么办?
    这个可能会发生,比如爬虫更新一个网页的新链接。或者应用追加一行记录在日志文件尾。
    SPARK针对这个情况不得不重头开始。比如重跑所有的PAGE RANK的迭代。即使只有一个LINK改变了。

    增量处理

    输入节点: 输入可以是一个文件,一组EVENT(WEB 请求,TWEETS),或者是插入一个图的记录。
    固定的数据流
    长久的有状态的节点:这个有点像被缓存的RDD,但是时可变的。状态会在新纪录到达这个VERTEX时改变。

    当然迭代还可以产生环。
    在SPARK里是没有环的,它的血统图是基于(DAG,有向无环图),迭代会增加新的RDD,但是没有RDD会依赖自己的孩子。
    为什么有环是好的呢? 比如PAGE RANK 可以把RANK 的更新送回开头去计算。

    所以NAIAD有了如下的结构


    image.png

    但是有环了之后会有一个问题。就是顺序的问题。比如一个延迟的PAGE RANK的RANK的更新可能会导致下一次的迭代还在用旧的RANK。
    所以我们要考虑如何避免时光旅行的更新。
    我们要保证2点。

    1. 节点不会看到过去迭代的更新,当他已经离开了那个迭代。
    2. 节点不会发出的来自未来迭代的更新

    解决上述这个问题的方案,就是用一个多维的时间戳。

    消息时间戳t可定义为下面2个元素的元组:

    e∈N​:Epoch号
    c =⟨c1,c2,…,ck⟩∈Nk​:Loop counter向量,每个维度对应对应循环上下文的循环次数,可用于区分循环上下文,且可跟踪循环进度

    image.png

    只有在环里会更新消息时间戳。一个环都有3类节点,环的入口,环的出口,已经环的反馈(指向前面的边)入口的时候,会增加一个向量维度,出口的时候会减少一个向量维度。反馈,会在最后一个向量维度上+1。

    当然除了这3类节点,还有整个DATA FLOW图的,输入顶点,和输出顶点。

    • 输入顶点:接收外部生产者的消息
      • 输入的消息会标记epoch,外部生产者可通知顶点该epoch不会有消息进来,且当所有epoch消息都不会进来时可以“关闭”顶点
    • 输出顶点:将消息输出给外部的消费者
      • 输出的消息也会标记epoch,顶点可通知外部消费者该epoch的消息不会出来,且当所有epoch消息都不会出来时可以“关闭”顶点

    除了上述5种顶点,其他的顶点都是实际的运算的顶点,运算的顶点会实现2个回调,和调用2个方法。这个我们之后会介绍。

    基于上述的知识我们已经可以回答下面这个问题


    image.png
    epoch 1:
        (a, 2): goes around the loop three times (value 4 -> 8 -> 16)
          A: (1, []), I: (1, [0]), ... F: (1, [1]), ... F: (1, [2]), ... E: (1, [])
        (b, 6): goes around the loop only once (value 6 -> 12)
          A: (1, []), I: (1, [0]), ... F: (1, [1]), ... E: (1, [])
    epoch 2:
        (a, 5): dropped at A due to DISTINCT.
    

    timestamp间有一个偏序,
    (1, [5, 7]) < (1, [5, 8]) < (1, [6, 2]) < (2, [1, 1])

    (1, [5, 7]) < (1, [5])

    对顶层的LOOP,我们无法比较, 如
    (1, lA [5]) 和 (1, lB [2])

    有了上述的排序,我们就可以决定什么时间点是可以释放他们的。下游节点会得到一个一只的输出顺序。

    高层API和底层API

    在上层API里,程序员可以基于LINQ (SQL -LIKE)或者 BSP(Pregel-ish)的方式取构建应用,这些上层API被以LIBARAY的形式提供出来在timely dataflow的基础上
    底层API 就是要求开发者显示的去处理每个TIMESTAMP。

    每个顶点v实现下面2个回调:

    • v.OnRecv(edge, msg, timestamp):收到来自edge的消息
    • v.OnNotify(timestamp):没有更多<= timestamp的消息到来

    回调可被自定义(重写)

    回调上下文中,可能会调用下面2个系统提供的方法:

    • this.SendBy(edge, msg, timestamp):向一条边发送消息
    • this.NotifyAt(timestamp):在timestamp时候,进行调用通知

    调用关系是:

    • u.SendBy(e, m, t) -> v.OnRecv(e, m, t)eu->v的边
    • v.NotifyAt(t) -> v.OnNotify(t)
    • 每个顶点的回调都会被排队,但必须保证:
      • t' <= t时,v.OnNotify(t)发生在v.OnRecv(e, m, t')后,因为前者是作为t之前所有消息都已收到,不会再来的信号。重写的回调也得满足这个要求。

    上述的API的设计其实是为了达到2个目标:
    第一个,逐步的释放某一时间(epoch)下的记录,结束的时候给与通知。
    或者第二个,缓存这个时间下的所有记录,然后确保这个时间下不再有记录了,发送通知。

    我们来看一个具体的例子
    在这例子里,我们要输出所有的DISTINCT的MSG,以及每一个MSG的出现次数
    前者我们可以用上述第一个目标,后者我们可以基于第二个目标。
    那么所有收到消息的VERTEX,会去做判断。如果是个新的时间,会拉一道阀门(当底层组件可以确保这个时间的所有消息发光了,就会触发这个阀门(OnNoitfy),实现第二个目标的任务)
    (当然也可以是实时的去发,就用send 和onRecv就可以实现第一个目标的效果)


    image.png

    如何实现上述的阀门

    协议去找到何时发送通知给VERTEX
    直觉就是,我们可以发送通知只要不再有可能让先前的节点产生更早的记录。
    论文里先是给出了单线程的实现方法。
    也就是基于pointstamp

    任何时刻,未来消息的时间戳受到当前未处理事件(消息和通知请求)以及图结构的限制:

    • 消息只会沿边传输,且时间戳仅会被2.1.3.小节的三种顶点修改。

    • 由于事件的发送不能产生时间回溯,因此可以计算事件产生的消息时间戳的下界。将这种算法应用到未处理事件上,则可判断顶点通知是否正确

    这里定义Pointstamp,对应每个事件:是由时间+位置(边或者顶点)

    • 对于v.SendBy(e, m, t),对应的是(t, e)
    • 对于v.NotifyAt(t),对应的是(t, v)

    我们可以把point map这2个属性构成一个2维平面。X轴是位置,Y轴是TIMESTAMP

    那么在时间上,我们有一个剪头的关系,代表could-result-in,比如在上述的例子里,(1,【2】)在B 可以C-R-I (1,【3】)也可以C-R-I (1,【】)
    所以对于(1,【】)这个时间戳,他不应该被发出通知,知道前面的所有1的时间离开LOOP的CONTEXT,然后移除ACTIVE POINTSTMAP,来确保没有更多的消息了。
    为了达到监控没有活着的ACTIVE POINTSTAMP,我们设置了一个叫OC的变量,(occurency count,未完成的事件发生个数),一开始为1,当被之后的VERTEX(OnNotify)还是EDGE(OnRecv)收到了之后,都会-1. 当然发出的时候(send, notifyat)也会加1

    当OC为0,我们可以确保这个POINTSTAMP已经结束了它的使命,可是我们不能保证当前这个VERTEX,不会再有别的POINTSTAMP过来,什么时候可以保证。
    也就是不会再有前面的节点指向当前这个节点。
    为了跟踪上述的CASE,我们要再开一个变量叫PC(Precursor count, could-result-in顺序下,前面的pointstamp个数)
    一旦一个节点从POINTSTAMP集合里移除了,我们就可以把他指向的之后的POINTSTAMP,都递减。如果这个顶点上所有的PC为0了,那么该顶点就可以触发栅栏。调度器可以将任何通知发给他。

    再来回顾一下

    调度器维护一组活跃的pointstamp,每个元素包含2个计数器:

    • Occurrence countOC):未完成的事件发生个数
    • Precursor countPC):could-result-in顺序下,前面的pointstamp个数

    当顶点产生和撤销事件时,OC根据下面更新

    image.png
    • v.SendBy(e,m,t)前,v.NotifyAt(t)前:OC[(t,e/v)] += 1
    • v.OnRecv(e,m,t)后,v.OnNotify(t)后:OC[(t,e/v)] -= 1

    当pointstamp活跃时,PC根据下面初始化

    • 置为已有could-result-in的活跃pointstamp个数
    • 同时,增加当前pointstamp之后的pointstamp PC

    当pointstamp不活跃时:

    • OC值为0,移除活跃pointstamp集合

    • 递减之后的pointstamp PC

      PC值为0,则该pointstamp为frontier,调度器可将任何通知发给它

    系统初始化时,在下面位置初始化一个pointstamp:

    • 位置为每个输入顶点
    • 时间戳为:第一个epoch,以及全为0的loop count
    • OC为1,PC为0

    当输入节点的输入完毕时:

    • 若epoch e完毕,则创建e+1的pointstamp,删除原有的pointstamp
    • 通知下游epoch e的消息已经输入完毕
    • 若输入节点关闭时,删除当前位置的所有pointstamp,允许输入到下游的事件最终可从计算中消失

    有了单机的版本,我们如何扩展到分布式

    分布式版本

    image.png

    架构中包含一组Worker线程:
    这组线程管理一个timely dataflow顶点的分区
    Worker间会通信(本地使用共享内存,远程使用TCP协议)
    参与分布式的进度跟踪协议,以协调通知的发送

    最简单的方案是基于单线程的版本,我们搞一个全局的协调者。然后把所有事件的转移都告诉这个协调者,在协调者上RUN我们的单机版算法。再回过来通知分布式的节点就可以。这个方法的缺点是慢,因为我们每次交互需要协调者的ACK。

    有没有一种方法,可以直接在本地算frontier的呢?
    如果要在本地算,那么当一个顶点的OC更新了,要通知后面的节点PC的更新,这些消息都要经过网络。所以本地的frontier一定会落后全局的frontier,因为可能网络堵塞,那些更新还没到。 不过没关系,这样我们只会更慢的去触发栅栏,这是安全的。但是随之而来的问题是,我们要广播海量的更新通知。

    为了解决这个问题,论文里给了2个优化手段。第一个是缓存通知,不是立刻更新,当OC变化的时候,我们可以缓存下来,只有当OC为0的时候,POINTSTAMP MAP为0的时候,再把消息通知出去。

    另外一个优化手段,就是我们以逻辑视图的维度去更新而不是物理视图。一个逻辑视图展开可以由很多个WORKER,那么我们可以合并这些WORKER的更新,这样MESSAGE会少很多。所以在这个CASE里,我们以逻辑映射的顶点和边来维护映射的POINTMAP

    容错

    方法1, 写一个全局的同步阻塞的CHECK POINTS,那么恢复的时候就可以从最后一个CHECK POINT开始,重新计算。
    这个方法的确定是会暂停时间来打CHECK POINTS。

    方法2, 在发送消息前LOG所有的消息去磁盘
    这样我们可以不用CHECK POINT,恢复可以从任何点开始,只要把LOG从磁盘上REPLAY下。但是每个步骤会拖慢原来的性能,要付出代价假设没有失败的时候。

    Q: 为什么不能像SPARK那样去恢复呢?
    A:因为这里的更新的都是细粒度的,我们没法用SPARK的方式。
    这里有篇论文提出了更好的方案, https://arxiv.org/abs/1503.08877

    性能

    image.png

    NAIAD 在迭代的批处理上比DryadLINQ 快10倍 (PR,SCC,WCC)
    在图处理上, 比 PowerGraph 快10倍 (PR on TWITTER)
    在迭代的机器学习上,比Vowpal Wabbit 快40倍(logistic regression)

    相关文章

      网友评论

          本文标题:Naiad: A Timely Dataflow System

          本文链接:https://www.haomeiwen.com/subject/lzulnctx.html