美文网首页
深入了解Flink Gelly图计算模型之Gather-Sum-

深入了解Flink Gelly图计算模型之Gather-Sum-

作者: 老羊_肖恩 | 来源:发表于2022-01-07 08:22 被阅读0次

      Flink提供了三种通用的基于迭代的图计算模型的实现(Flink-Gelly:Iterative Graph Processing),分别是:Vertex-Centric, Scatter-Gather和Gather-Sum-Apply,在之前的文章深入了解Flink Gelly图计算之—Vertex-Centric模型深入了解Flink Gelly图计算模型之Vertex-Centric中我们详细的介绍了Vertex-Centric模型和Scatter-Gather模型的实现原理和使用方法,接下来我们将详细的介绍一下Gather-Sum-Apply模型的原理和实现过程,并以简单的示例来展示Gather-Sum-Apply模型的使用方法。

    Gather-Sum-Apply模型概述

      Gather-Sum-Apply模型的计算过程也是基于同步迭代,每个迭代的轮次称之为Superstep。每个Superstep由以下三个阶段组成:
    Gather: 在每个轮次的迭代中,每个顶点并行的在其邻接顶点点和边上执行udf,并产生一个中间结果。
    Sum: 每个顶点将其在Gather阶段产生的一系列中间结果应用udf进行聚合,生成一个单独的值。
    Apply: 每个顶点根据Sum阶段产生的新结果只和当前顶点的属性值应用udf对当前顶点进行更新操作。
      我们同样以单源点最短路径为例,简单介绍GSA每个阶段的具体执行逻辑。假设我们存在如下一个有向图,我们需要计算顶点1到其余三个顶点2,3,4的最短路径。首先我们需要对每个顶点的值进行初始化,其中顶点1的值(值记在顶点上的方框里)初始化为0,其余顶点的值默认初始化为正无穷。接下来以下图为例,详细介绍三个阶段的主要功能,在第一轮迭代中:
      (1)Gather:顶点1没有邻居节点(考虑是有向图,没有边指向顶点1),因此不用计算中间距离值。顶点2的邻居为1,中间值结果为:\small 2-2;顶点3的邻居为1,中间值结果为:\small 3-3;顶点4的邻居为1,2,3,中间值结果为:\small 4-4, 4-∞, 4-∞
      (2)Sum:上个阶段执行完成后,顶点2的中间值结果为:\small 2-2;顶点3的中间值结果为:\small 3-3;顶点4的中间值结果为:\small 4-4, 4-∞, 4-∞。该阶段执行聚合操作,挑选出最小的中间值为新值,因此,聚合的结果为:\small 2-2,3-3,4-4
      (3)Apply:根据Sum聚合的结果,将每个顶点的当前值按照取最小进行更新,则该轮Apply之后,每个顶点的值为\small 1:0, 2:2, 3:3,4:4
    第一轮迭代完成后,在第二轮迭代中,顶点4的Gather阶段中间值为变成\small 4-4, 4-3, 4-5,Sum阶段的聚合结果为3,最终的Apply阶段后,每个顶点的值为\small 1:0, 2:2, 3:3,4:3。然后在下一轮迭代中,没有任何顶点回在Apply阶段更新当前值,因此整个GSA模型迭代结束,最终结果为\small 1:0, 2:2, 3:3,4:3

    Gather-Sum-Apply

    Gather-Sum-Apply模型使用

      上面我们大致介绍了GSA模型三个阶段的基本执行过程。GSA模型在实际使用的时候也非常简单,只需要用户根据实际需求实现三个阶段的UDF即可,即GatherFunctionSumFunctionApplyFunction。下面我们给出了一段示例代码,简单展示了#Gather-Sum-Apply模型的使用方法。

    package com.quan.graph;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.graph.Edge;
    import org.apache.flink.graph.Graph;
    import org.apache.flink.graph.Vertex;
    import org.apache.flink.graph.gsa.ApplyFunction;
    import org.apache.flink.graph.gsa.GatherFunction;
    import org.apache.flink.graph.gsa.Neighbor;
    import org.apache.flink.graph.gsa.SumFunction;
    
    import java.util.LinkedList;
    import java.util.List;
    
    public class GSA_SSSP {
    
        // Set 1 as the source.
        public static Integer srcId = 1;
    
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            List<Edge<Integer, Integer>> edgesList = new LinkedList<>();
            edgesList.add(new Edge<Integer, Integer>(1, 2, 12));
            edgesList.add(new Edge<Integer, Integer>(1, 6, 3));
            edgesList.add(new Edge<Integer, Integer>(1, 7, 14));
            edgesList.add(new Edge<Integer, Integer>(2, 6, 7));
            edgesList.add(new Edge<Integer, Integer>(2, 3, 10));
            edgesList.add(new Edge<Integer, Integer>(3, 4, 3));
            edgesList.add(new Edge<Integer, Integer>(3, 5, 5));
            edgesList.add(new Edge<Integer, Integer>(3, 6, 4));
            edgesList.add(new Edge<Integer, Integer>(4, 5, 4));
            edgesList.add(new Edge<Integer, Integer>(5, 6, 2));
            edgesList.add(new Edge<Integer, Integer>(5, 7, 8));
            edgesList.add(new Edge<Integer, Integer>(6, 7, 9));
    
            DataSet<Edge<Integer, Integer>> edges = env.fromCollection(edgesList);
    
            // Read the input data and create a graph.
            Graph<Integer, Integer, Integer> graph = Graph.fromDataSet(edges, new InitVertices(), env);
    
            // Convert the graph to undirected.
            Graph<Integer, Integer, Integer> undirected_graph = graph.getUndirected();
    
            // Define the maximum number of iterations.
            int maxIterations = 10;
    
            // Execute the vertex-centric iteration.
            Graph<Integer, Integer, Integer> result = undirected_graph
                    .runGatherSumApplyIteration(
                            new CalculateDistances(),
                            new ChooseMinDistance(),
                            new UpdateDistance(),
                            maxIterations);
    
            // Extract the vertices as the result.
            DataSet<Vertex<Integer, Integer>> singleSourceShortestPaths = result.getVertices();
    
            // Print the result.
            singleSourceShortestPaths.print();
        }
    
        // - - -  UDFs - - - //
        // Gather: Calculate candidate distances from neighbors and edges of each vertex.
        private static final class CalculateDistances extends GatherFunction<Integer, Integer, Integer> {
    
            public Integer gather(Neighbor<Integer, Integer> neighbor) {
                return (neighbor.getNeighborValue() < Integer.MAX_VALUE) ?
                        neighbor.getNeighborValue() + neighbor.getEdgeValue() : Integer.MAX_VALUE;
            }
        }
    
        // Sum: Choose the min distance from candidate distances which calculate by GatherFunction
        private static final class ChooseMinDistance extends SumFunction<Integer, Integer, Integer> {
    
            public Integer sum(Integer newValue, Integer currentValue) {
                return Math.min(newValue, currentValue);
            }
        }
    
        // Apply: Update distance for each node when newDistance smaller than oldDistance
        private static final class UpdateDistance extends ApplyFunction<Integer, Integer, Integer> {
    
            public void apply(Integer newDistance, Integer oldDistance) {
                if (newDistance < oldDistance) {
                    setResult(newDistance);
                }
            }
        }
    
        @SuppressWarnings("serial")
        private static final class InitVertices implements MapFunction<Integer, Integer> {
            // Init all vertex with the max value expected the srd vertex.
            // Init the src vertex with 0.
            public Integer map(Integer id) {
                return id.equals(srcId) ? 0 : Integer.MAX_VALUE;
            }
        }
    }
    

    我们使用的数据集也是前面两篇文章中用到的数据即,即下图所示的图。


    示例图

    上图在GSA模型上的执行结果如下图所示


    执行结果

    Flink Gelly三种图计算模型的对比

      Flink官网给出了三种图计算模型的对比,通过对比我们可以得出以下结论:
    1. Vertex-Centric模型和Scatter-Gather模型中每个顶点可以与其他任意顶点进行通信,而Gather-Sum-Apply模型仅能与邻居顶点进行通信。
    2. Scatter-Gather模型和Gather-Sum-Apply模型根据节点的状态确定是否与其他顶点进行消息通信。
    3. Vertex-Centric模型可以自定义任意的更新逻辑和更新方法,Scatter-Gather模型的更新逻辑依赖于接收到的消息,而Gather-Sum-Apply模型的更新逻辑完全依赖于邻居顶点和邻接边的值。

    Flink Gelly图计算模型对比
    1. Flink-Gelly:Iterative Graph Processing
    2. 深入了解Flink Gelly图计算模型之Scatter-Gather
    3. 深入了解Flink Gelly图计算模型之Vertex-Centric

    相关文章

      网友评论

          本文标题:深入了解Flink Gelly图计算模型之Gather-Sum-

          本文链接:https://www.haomeiwen.com/subject/mwpkcrtx.html