美文网首页
深入了解Flink Gelly图计算模型之Scatter-Gat

深入了解Flink Gelly图计算模型之Scatter-Gat

作者: 老羊_肖恩 | 来源:发表于2022-01-06 17:38 被阅读0次

  Flink提供了三种通用的基于迭代的图计算模型的实现(Flink-Gelly:Iterative Graph Processing),分别是:Vertex-Centric, Scatter-Gather和Gather-Sum-Apply,在上篇文章深入了解Flink Gelly图计算之—Vertex-Centric模型中我们详细的介绍了Vertex-Centric模型的实现原理和使用方法,接下来我们将详细的介绍一下Scatter-Gather模型的原理和实现过程,并以简单的示例来展示Scatter-Gather的使用。

Scatter-Gather模型概述

  Scatter-Gather模型也被称之为"signal/collect"模型,其核心思想和Vertex-Centric模型一样,从图中每个顶点的角度表达计算。其计算过程以同步迭代地方式进行,每次迭代过程称之为一个Superstep。在每个Superstep中,顶点为其他顶点生成消息,并根据接收到的消息更新其顶点的值。要在Flink Gelly中使用Scatter-Gather模型,用户只需要定义顶点在每个Superstep中的行为:
  Scatter: 当前顶点生成一个将要发送给其他顶点的消息。
  Gather: 当前顶点使用接收到的消息更新其当前顶点的值。

Scatter-Gather model

Scatter-Gather模型使用

  Flink Gelly 提供了Scatter-Gather的实现,用户只需要分别实现对应于Scatter和Gather阶段的两个方法即可。第一个方法是ScatterFunction,允许一个顶点向其他顶点发送消息,目标顶点会在发送消息的同一个Superstep中接收到发送的消息。第二个方法是GatherFunction,该方法定义了一个顶点如何根据接收到的消息更新当前顶点的值。将这两个方法的实现和最大迭代次数传递给Flink的runScatterGatherIteration方法即可运行Scatter-Gather模型,该方法会在输入图上运行Scatter-Gather模型,并返回一个更新了顶点值的新图。假设我们在下面的图上使用Vertex-Centric模型计算单源最短路径,并假设设顶点1为源点。在每个Superstep中,每个顶点将当前顶点值与连接邻接顶点的边上的值之和作为候选距离消息发送给它的所有邻居。每个顶点在接收到候选距离消息时,计算出最小的距离,如果发现了更小距离的路径,则更新当前顶点的值,并将当前顶点标记为活跃顶点。如果一个顶点在之前的Superstep中没有改变其顶点的值,那么在当前轮次的Superstep中,该顶点不会再对外发送候选消息。下面的代码详细地展示了如何使用Scatter-Gather模型解决单源点最短路径问题。

package com.quan.graph;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.ScatterFunction;

import java.util.LinkedList;
import java.util.List;

public class SG_SSSP {

    // Set 1 as the source.
    public static Integer srcId = 1;

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        List<Edge<Integer, Integer>> edgesList = new LinkedList<>();
        edgesList.add(new Edge<Integer, Integer>(1, 2, 12));
        edgesList.add(new Edge<Integer, Integer>(1, 6, 3));
        edgesList.add(new Edge<Integer, Integer>(1, 7, 14));
        edgesList.add(new Edge<Integer, Integer>(2, 6, 7));
        edgesList.add(new Edge<Integer, Integer>(2, 3, 10));
        edgesList.add(new Edge<Integer, Integer>(3, 4, 3));
        edgesList.add(new Edge<Integer, Integer>(3, 5, 5));
        edgesList.add(new Edge<Integer, Integer>(3, 6, 4));
        edgesList.add(new Edge<Integer, Integer>(4, 5, 4));
        edgesList.add(new Edge<Integer, Integer>(5, 6, 2));
        edgesList.add(new Edge<Integer, Integer>(5, 7, 8));
        edgesList.add(new Edge<Integer, Integer>(6, 7, 9));

        DataSet<Edge<Integer, Integer>> edges = env.fromCollection(edgesList);

        // Read the input data and create a graph.
        Graph<Integer, Integer, Integer> graph = Graph.fromDataSet(edges, new InitVertices(), env);

        // Convert the graph to undirected.
        Graph<Integer, Integer, Integer> undirected_graph = graph.getUndirected();

        // Define the maximum number of iterations.
        int maxIterations = 10;

        // Execute the vertex-centric iteration.
        Graph<Integer, Integer, Integer> result = undirected_graph.runScatterGatherIteration(
                new DistanceMessenger(), new MinMessageGather(), maxIterations);

        // Extract the vertices as the result.
        DataSet<Vertex<Integer, Integer>> singleSourceShortestPaths = result.getVertices();

        // Print the result.
        singleSourceShortestPaths.print();
    }

    // scatter: messaging
    @SuppressWarnings("serial")
        public static final class DistanceMessenger extends ScatterFunction<Integer, Integer, Integer, Integer> {

        @Override
        public void sendMessages(Vertex<Integer, Integer> vertex) throws Exception {
            if (vertex.getValue() < Integer.MAX_VALUE){
                for (Edge<Integer, Integer> e : getEdges()) {
                    sendMessageTo(e.getTarget(), vertex.getValue() + e.getValue());
                }
            }
        }
    }

    // gather: vertex update
    @SuppressWarnings("serial")
    public static final class MinMessageGather extends GatherFunction<Integer, Integer, Integer> {

        @Override
        public void updateVertex(Vertex<Integer, Integer> vertex, MessageIterator<Integer> inMessages) throws Exception {
            Integer minDistance = Integer.MAX_VALUE;
            for (Integer msg : inMessages) {
                minDistance = Math.min(msg, minDistance);
            }
            if (minDistance < vertex.getValue()){
                setNewVertexValue(minDistance);
            }
        }
    }

    @SuppressWarnings("serial")
    private static final class InitVertices implements MapFunction<Integer, Integer> {
        // Init all vertex with the max value expected the srd vertex.
        // Init the src vertex with 0.
        public Integer map(Integer id) {
            return id.equals(srcId) ? 0 : Integer.MAX_VALUE;
        }
    }
}
示例图

  这里我们用一个上图所示的无向图为例,在上图求解单源点最短路径,其中顶点1为源点,在上图中执行上述Scatter-Gather模型的结果如下图所示:

执行结果

Scatter-Gather模型参数配置

  我们可以时使用ScatterGatherConfiguration对象来配置Scatter-Gather模型,目前可以对Scatter-Gather模型进行如下配置:
  Name: 可以使用setName() 方法为Scatter-Gather模型设置一个名称,该名称会在日志和消息中显示。
  Parallelism: 可以使用 setParallelism()方法来设置每个轮次迭代的并行度,默认为1。

  Solution set in unmanaged memory: 可以使用setSolutionSetUnmanagedMemory() 方法来指定结果集是否保存在托管内存中,默认情况下结果集是运行在托管内存中。

  Aggregators: 可以使用registerAggregator()方法来为每个迭代注册聚合函数,注册的聚合函数可以在ScatterFunction方法和GatherFunction方法中访问。

  Broadcast Variables: 可以分别使用addBroadcastSetForUpdateFunction()方法和addBroadcastSetForUpdateFunction方法为ScatterFunctionGatherFunction方法添加广播变量(Broadcast Variables)。

  Number of Vertices: 为了控制每次迭代中访问的总顶点数,可以使用setOptNumVertices() 方法来设置。可以使用 getNumberOfVertices() 方法设置更新顶点方法和发送消息方法的总点数。如果该选项没有设置,该方法默认返回-1。

  Degrees: Accessing the in/out degree for a vertex within an iteration. This property can be set using the setOptDegrees() method. The in/out degrees can then be accessed in the vertex update function and in the messaging function, per vertex using the getInDegree() and getOutDegree() methods. If the degrees option is not set in the configuration, these methods will return -1.

  Messaging Direction: 默认情况下,Flink Gelly中的图是有向图,因此一个顶点只往其出度的方向发送消息,并根据其入度方向接收消息来更新顶点状态。因此可以通过设置Messaging Direction来改变这一默认情况。Messaging Direction分为EdgeDirection.IN, EdgeDirection.OUTEdgeDirection.ALL,可以通过方法setDirection()` method进行设置。

// configure the iteration
ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();

// set the iteration name
parameters.setName("Gelly Iteration");

// set the parallelism
parameters.setParallelism(16);

// register an aggregator
parameters.registerAggregator("sumAggregator", new LongSumAggregator());

// run the scatter-gather iteration, also passing the configuration parameters
Graph<Long, Double, Double> result =
            graph.runScatterGatherIteration(
            new Messenger(), new VertexUpdater(), maxIterations, parameters);

// set the number of vertices option to true
parameters.setOptNumVertices(true);

// set the degree option to true
parameters.setOptDegrees(true);

// run the scatter-gather iteration, also passing the configuration parameters
Graph<Long, Double, Double> result =
            graph.runScatterGatherIteration(
            new Messenger(), new VertexUpdater(), maxIterations, parameters);
相关阅读
  1. 深入了解Flink Gelly图计算模型之Vertex-Centric
  2. 深入了解Flink Gelly图计算模型之Gather-Sum-Apply
  3. Flink-Gelly:Iterative Graph Processing

相关文章

网友评论

      本文标题:深入了解Flink Gelly图计算模型之Scatter-Gat

      本文链接:https://www.haomeiwen.com/subject/ripkcrtx.html