美文网首页
Pregel-大规模图数据处理系统

Pregel-大规模图数据处理系统

作者: 一天天就知道睡觉 | 来源:发表于2018-11-29 21:55 被阅读0次

    这篇文章主要是对Pregel论文的翻译,还可能夹杂一点自己的思考。需要注意的是,本文只是在看Pregel论文的时的记录并不是全文翻译,所以你应该仅以此作参考。文章中Vertex被翻译为顶点,节点(Pregel说法为Worker)则表示分布式系统中的执行机。


    1. 介绍

    现在很多都要求对大规模图数据的处理,诸如网页图和社交网络图,那么如何高效地在图上做计算就变成了一个大的挑战,在此背景下,Pregel出世。图上的算法常见的如最短路径、PageRank等变种算法。在图处理中还有一些其他的挑战如图的最小切分问题,关联组件问题(译者注:应该是指分布式环境下节点通信的花销问题。)

    图上的算法经常面临的问题就是内存访问有很差的局部性,每个顶点所做的工作又相当之少。在分布式环境下,局部性会更加恶化,每个计算节点面临更高的宕机风险。尽管现在大图无处不在而且商用价值很高,但我们发现市面上没有任何可扩展的通用的分布式系统可运行任何图计算算法。(译者注:google发布这篇论文已经是2010的事了,前面这句话对于现在已经不适用了。)

    Pregel是一个可扩展、具有容错并且提供了可以灵活表示各种图算法的AP的分布式系统I。Pregel受BSP模型的启发,也有超级步的概念,每个超级步内部对每个顶点执行用户定义的函数。关于BSP模型,这里不做介绍,请自行了解。通过每个顶点的出边发送其消息到邻接顶点。

    BSP模型的大同步概念使得我们无需花费精力去考虑每一个超级步内顶点的执行顺序问题,对于其上的算法实现也能更容易推倒其语义(semantics)。正因为如此,Pregel也无需考虑加锁和数据竞争等问题。在设计原则上,Pregel和异步模型比也应当具有竞争力。 

    2. 计算模型

    Pregel的图数据:

        1.Pregel的输入是有向图,每个顶点有一个唯一的string类型的标识符和用户定义的可更改的值;

        2.每条边连接其源顶点和目标顶点,每条边有用户定义的可更改的值;

    Pregel的计算模型包含

        1.图的输入;

        2.当图的初始化完成后,则是一系列由同步点划分的超级步,直至算法停止;

        3.输出;

    在每个超级步内,那些顶点并行执行相同的由用户定义的函数,而这些函数代表给定算法的逻辑拓扑。一个顶点可以更改其值和其出边的值,可以接收上一个超级步发送给它的消息,也可以发送消息给其他顶点(会在下一个超级步被接收),甚至可以转换图的拓扑。在此模型中,边并非是最被关心的(first-class citizens),边没有相关的算法。

    在每个超级步内,活跃状态的顶点都会参与计算;

    顶点在一个超级步中voting to halt使其自己变为非活跃状态,除非有外力激活,否则非活跃顶点不会再有任何动作。在后面的超级步中也不会处理这个顶点,除非顶点收到消息。如果顶点被消息重新激活,那么它需要显示地再次让自己变为非活跃态;

    算法的结束条件是每个顶点都voting to halt,从活跃状态变为非活跃状态,并且没有消息再需传送;

    下图是一个简单的例子,说明Pregel框架中顶点的状态机变迁。

    此例子中,每个顶点的起始状态都为活跃状态,然后在超级步0中每个顶点执行相同的操作:通过出边向其邻接顶点发送消息。消息中携带的是自己的顶点值。为了便于说明,将四个顶点从左到右命名为a、b、c、d。

    然后在超级步1中,b顶点接收到来自a和c的消息。经过比较,b顶点自身的值最大,那么b顶点不会做改变,于是b顶点自己主动voting to halt让自己变为非活跃态,下图以阴影表示。c顶点接收到d顶点的消息,无更改,则让自己变为非活跃状态。在超级步1内,这两个变为非活跃态的顶点不会再发送消息。知道最后所有顶点都变为非活跃态,算法结束。

    图源自Pregel论文

    Pregel选用BSP模型即消息传递模型而不是远程读(remote reads)或者其他共享内存的方法(译者注:这里的意思是Pregel为什么用消息推送而不是去拉取消息模型),基于以下两个原因:

        1.消息推送模型足够高效地代表图模型,没有远程读的必要;

        2.消息推送模型性能更好

    在一个集群中,远程读延时太高。在消息推送中,我们可以用分批次的异步发送消息而解决这个问题。

    在Pregel中,顶点和其边都存在一个机器上,那么网络传输的内容就仅仅是消息,而不会再有顶点的状态或者边的信息等等。Pregel用BSP模型让这一切变得不再复杂。

    3. C++ API

    图源自Pregel论文

    上面这幅图就是系统提供的API。这是一个模板类,编写Pregel程序时需要引入此作为基类。用户需要覆盖其Compute()函数,定义好的Vertex方法允许Compute()获取其顶点和边的值,发送消息。可以获取邻接顶点的值通过GetValue(),可以通过MultableValue()更改其值。通过迭代器访问和修改边的值。类模板的参数都是可定制的,比如用PB。

    3.1 消息传递

    每个顶点通过传送消息直接和其他顶点通信,消息包含消息内容和目标顶点。消息的格式用户自定义。

    每个顶点可以发送任意数量的消息。每个顶点通过迭代器访问其接收的所有消息,在此,保证消息不会重复但不保证消息的顺序。顶点可以通过邻接顶点而获知其非邻接点的信息(图遍历)。

    3.2 聚合器

    这个Combiners是为了减少开销而提出的。当某个顶点需要给其他机器上的顶点发送消息,而消息的内容仅仅是一个整形值,并且这个算法只关心消息的和。所以为了减小开销,可以将消息整合之后再发出去。

    这个Combiners不是系统系统的,因为系统并不清楚算法具体的执行,所以这个是由用户自己定义的。系统只是提供了Combiner()的基类,用户可以继承此基类。要注意的是系统提供的Combiner并不保证这个集合的顺序等,所以在实现的时候,能集合的消息一定要是可交换顺序的。

    3.3  放大器

    Pregel的Aggregator可以用于全局通信,监管和数据。

    Pregel定义了一系列的Aggregator,比如sum、max、min等,可以用于统计。实际用于诸如统计每个顶点的出边数量。

    Pregel还可以用于全局协同。比如一个超级步中Compute()的一个分支被执行直到and操作符确定所有顶点都满足某个条件,然后另外的分支继续执行直到算法结束。

    用户要想实现自己的Aggregator则需要继承Aggregator基类,然后实现所有输入数值怎么被归结到一个数值。Aggregators也必须满足可交换性质。

    默认的Aggregator都是只能接收同一个超级步的数值,你也可以定义接收所有超级步数值的Aggregator。

    3.4 拓扑转换

    有的图算法需要对图的拓扑做转化,比如聚类算法会将一个划分用一个顶点代替。此部分略过。

    3.5 输入和输出

    图数据组织的的形式是多样的,比如txt,关系数据集合,Bigtable的数据。Pregel的数据的输出都可以是任意的,可以将不同组织形式的数据转换为Pregel的数据组织形式,也可以将Pregel产生的输出转换为不同的数据组织形式。Pregel提供了Reader和Writer的基类,如用户想实现其他数据组织的读写,需自己继承实现。

    4. 实现

    Pregel是为了Google的集群框架为设计,每个集群包含上千台普通PC机,集群之间可通信但在物理上是分离的。应用产生的粗腰持久化数据是保存在GFS或者BigTable上,临时数据则保存在本地磁盘。

    4.1 基本框架

    Pregel将图分区,每个分区保存一部分顶点和顶点所有的出边。图的划分仅仅根据顶点的Id,我们可以根据顶点的Id就知道顶点被划分到哪台机器上,也就是对顶点Id进行哈希,默认哈希函数为 Id mod N,N为分片值(译者注:G* graph Database也是这么干的)。当然具体的哈希也可以由用户自定义。

    不考虑容错的话,一个Pregel应用程序的执行包含以下几个步骤:

        1.一个集群的所有机器都运行同一个应用程序,其中有一台机器充当master,负责任务协调。其他Worker机器根据系统提供的名字服务找到Master的地址进行注册。

        2.图数据被划分为多少个分区,每个Worker机器保存多少个分区都是都master控制的,这个控制权也提供给了用户。一台Worker保存多于一份的分区数据会有更好的并行性和负载均衡,通常来讲会提升性能。每个Worker负责记录分区的状态,执行Compute(),接收和发送消息。每个Worker都有一份全局图数据的划分情况表。

        3.master将用户输入的数据划分到集群里。输入数据被当作记录的集合,每个记录包含任意数量的顶点和边。对输入数据的划分仅仅根据文件的边界,而和图本身的属性无关。数据加载完成之后,所有顶点处于活跃状态。

        4.Worker的超级步的执行由master驱动。对于每个分区,每个Worker用一个线程循环遍历其所有活跃顶点。每个Worker负责执行Compute(),结束之后向master报告其还有多少活跃顶点,直至所有顶点变为非活跃态。

        5. 在计算停止之后,master可能会让所有Worker保存其分区数据。

    4.2 容错

    Pregel的容错是用检查点实现的。每一个超级步之前都要先持久化其当前状态,包括顶点,边和消息等。master分别保存aggregator的值。

    Worker机器的失效由master保持的心跳来检测。当Worker失效之后,master会将其分区数据重新分配到其他可用的机器上。然后新Worker机器加载数据之后根据检查点和保存的状态数据重新执行恢复到上次失效的位置。不用说,检查点技术开销是很大的,检查点持久化状态数据的频率需要综合考虑存储开销和恢复的开销。

    Pregel实现一种叫有限恢复(confined recovery)的概念以提升其恢复的延时和花销。在数据载入和每个超级步中都会保存其出边的消息,然后依据这些恢复到Worker失效前的上一个状态。这个办法通过仅执行分区的数据来进行恢复而节省资源并且降低延时。保存出边消息会带来花销,但一般来讲这部分的I/O不会成为瓶颈。

    4.3 Worker实现

    在内存存一个map表,以顶点Id为键,值为顶点值和其出边信息和状态标志位。然后遍历顶点执行Compute(),有一个接收的消息的迭代器和一个所有出边的迭代器。

    4.4 Master实现

    Master主要负责协调各个Worker,每个Worker在注册时会被分配唯一的标识符。Master会保留活跃Worker的状态,包含了其标识符,地址信息,和它被分配的分区。Master这部分的数据体量跟分区数量成比例而不是跟图的顶点和边的数量成比例。

    Master给所有活跃的Worker发送相同的请求,然后等着Worker的返回。Master的所有操作如输入、输出、计算、保存、恢复都是在Barriers 同步点这个地方停下。如果这个同步点成功master再开始下一个阶段。在computation barrier中,这个同步点就是每个超级步之间的那个同步点。

    4.5 放大器

    每个Worker都有一个Aggregator实例的集合,每个以类型和实例名唯一标识。当一个Worker在图的分区上执行超级步时,Worker收集所有传给Aggregator的值然后产生一个值在本地。一个Aggregator部地简化分区上的顶点。最后在一个超级步结束时Worker生成一个执行树来简化之前部分简化的Aggregator生成全局的值然后发送给Master。Master在下一个超级步开始时将这个全局值发送给Worker。用一个树状的简化模式而不是流水线时为了提高并行性。

    5.应用

    Pregel论文中跑了很多的算法,这里只翻译PageRank算法。

    图源自Pregel论文

    实现的PageRankVertex类继承自基类Vertex,并实现Compute()接口每个Worker都执行类的Compute()。需要注意的是超级步0负责发送消息,这相当于是初始化步骤,所以会看到代码直接判断是否处于超级步1及以上。

    具体实现是:

        1.如果大于超级步0则读取消息迭代器里面所有的消息,求总和然后更改顶点值。

        2.如果超级步小于30,则读取所有出边,然后向所有出边发送消息,否则变为非活跃态。

    最后由Master判断是否所有顶点都变为非活跃态且没有消息被传送,进而结束算法。

    谢谢大家,只是做了一点微小的贡献。

    相关文章

      网友评论

          本文标题:Pregel-大规模图数据处理系统

          本文链接:https://www.haomeiwen.com/subject/pvnwqqtx.html