7. Graph的初始化

2018-12-20

    本文图文来自@玄畅 2014年的博客, 并根据spark1.6.0进行了调整

    1. 概述

    生成Graph对象本质上是生成EdgeRDD, VertexRDD, 计算TripletsRDD的过程. 前面描述过构筑图的话, 至少需要边的信息, 附带着可以携带vertex的各种property来构筑

    2. 生成方法

    2.1 fromEdges()

    从边来构筑一个图, 输入是一个包含了Edge对象的RDD

       * Construct a graph from a collection of edges.
       * @param edges the RDD containing the set of edges in the graph
       * @param defaultValue the default vertex attribute to use for each vertex
       * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
       * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
       * @return a graph with edge attributes described by `edges` and vertices
       *         given by all vertices in `edges` with value `defaultValue`
      def fromEdges[VD: ClassTag, ED: ClassTag](
          edges: RDD[Edge[ED]],
          defaultValue: VD,
          edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
          vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
        GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)

    2.2 fromEdgeTuples()

    从另外一种边, 也就是srcVertex到targetVertex的关系对来构筑图, 输入时一个RDD, 里面是一系列用 source vertex->target vertex的tuple组成的对象

       * Construct a graph from a collection of edges encoded as vertex id pairs.
       * @param rawEdges a collection of edges in (src, dst) form
       * @param defaultValue the vertex attributes with which to create vertices referenced by the edges
       * @param uniqueEdges if multiple identical edges are found they are combined and the edge
       * attribute is set to the sum.  Otherwise duplicate edges are treated as separate. To enable
       * `uniqueEdges`, a [[PartitionStrategy]] must be provided.
       * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
       * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
       * @return a graph with edge attributes containing either the count of duplicate edges or 1
       * (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex.
      def fromEdgeTuples[VD: ClassTag](
          rawEdges: RDD[(VertexId, VertexId)],
          defaultValue: VD,
          uniqueEdges: Option[PartitionStrategy] = None,
          edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
          vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
        val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
        val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
        uniqueEdges match {
          case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
          case None => graph


    3. GraphImpl的过程


    3.1 构建边EdgeRDD


    • 从文件中加载信息,转换成tuple的形式,即(srcId, dstId)
    val rawEdgesRdd: RDD[(Long, Long)] = 
        sc.textFile(input).filter(s => s != "0,0").repartition(partitionNum).map {
          case line =>
            val ss = line.split(",")
            val src = ss(0).toLong
            val dst = ss(1).toLong
            if (src < dst)
              (src, dst)
              (dst, src)
    • Graph.fromEdgeTuples(rawEdgesRdd)

    源数据为分割的两个点ID,把源数据映射成Edge(srcId, dstId, attr)对象, attr默认为1。这样元数据就构建成了RDD[Edge[ED]],如下面的代码

    val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
    • RDD[Edge[ED]]进一步转化成EdgeRDDImpl[ED, VD]


    val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
    def apply[VD: ClassTag, ED: ClassTag](
          edges: RDD[Edge[ED]],
          defaultVertexAttr: VD,
          edgeStorageLevel: StorageLevel,
          vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
        fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)

      在apply调用fromEdgeRDD之前,代码会调用EdgeRDD.fromEdges(edges)RDD[Edge[ED]]转化成EdgeRDDImpl[ED, VD]

    def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDDImpl[ED, VD] = {
        val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
          val builder = new EdgePartitionBuilder[ED, VD]
          iter.foreach { e =>
            builder.add(e.srcId, e.dstId, e.attr)
          Iterator((pid, builder.toEdgePartition))


    def toEdgePartition: EdgePartition[ED, VD] = {
        val edgeArray = edges.trim().array
        new Sorter(Edge.edgeArraySortDataFormat[ED])
          .sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)
        val localSrcIds = new Array[Int](edgeArray.size)
        val localDstIds = new Array[Int](edgeArray.size)
        val data = new Array[ED](edgeArray.size)
        val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
        val global2local = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
        val local2global = new PrimitiveVector[VertexId]
        var vertexAttrs = Array.empty[VD]
        if (edgeArray.length > 0) {
          index.update(edgeArray(0).srcId, 0)
          var currSrcId: VertexId = edgeArray(0).srcId
          var currLocalId = -1
          var i = 0
          while (i < edgeArray.size) {
            val srcId = edgeArray(i).srcId
            val dstId = edgeArray(i).dstId
            localSrcIds(i) = global2local.changeValue(srcId,
              { currLocalId += 1; local2global += srcId; currLocalId }, identity)
            localDstIds(i) = global2local.changeValue(dstId,
              { currLocalId += 1; local2global += dstId; currLocalId }, identity)
            data(i) = edgeArray(i).attr
            if (srcId != currSrcId) {
              currSrcId = srcId
              index.update(currSrcId, i)
            i += 1
          vertexAttrs = new Array[VD](currLocalId + 1)
        new EdgePartition(
          localSrcIds, localDstIds, data, index, global2local, local2global.trim().array, vertexAttrs,
    • toEdgePartition的第一步就是对边进行排序。


    • toEdgePartition的第二步就是填充localSrcIds,localDstIds, data, index, global2local, local2global, vertexAttrs


      global2local是一个简单的,key值非负的快速hash mapGraphXPrimitiveKeyOpenHashMap, 保存vertextId和本地索引的映射关系。global2local中包含当前partition所有srcIddstId与本地索引的映射关系。


      我们知道相同的srcId可能对应不同的dstId。按照srcId排序之后,相同的srcId会出现多行,如上图中的index desc部分。index中记录的是相同srcId中第一个出现的srcId与其下标。



    // 根据本地下标取VertexId
    localSrcIds/localDstIds -> index -> local2global -> VertexId
    // 根据VertexId取本地下标,取属性
    VertexId -> global2local -> index -> data -> attr object

    3.2 构建点VertexRDD

    private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
          edges: EdgeRDDImpl[ED, VD],
          defaultVertexAttr: VD,
          edgeStorageLevel: StorageLevel,
          vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
        val edgesCached = edges.withTargetStorageLevel(edgeStorageLevel).cache()
        val vertices = VertexRDD.fromEdges(edgesCached, edgesCached.partitions.size, defaultVertexAttr)
        fromExistingRDDs(vertices, edgesCached)


    def fromEdges[VD: ClassTag](
          edges: EdgeRDD[_], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
        //1 创建路由表
        val routingTables = createRoutingTables(edges, new HashPartitioner(numPartitions))
        //2 根据路由表生成分区对象vertexPartitions
        val vertexPartitions = routingTables.mapPartitions({ routingTableIter =>
          val routingTable =
            if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
          Iterator(ShippableVertexPartition(Iterator.empty, routingTable, defaultVal))
        }, preservesPartitioning = true)
        //3 创建VertexRDDImpl对象
        new VertexRDDImpl(vertexPartitions)


    • 创建路由表


    private[graphx] def createRoutingTables(
          edges: EdgeRDD[_], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
        // 将edge partition中的数据转换成RoutingTableMessage类型,
        val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(


    def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _])
        : Iterator[RoutingTableMessage] = {
        val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
        edgePartition.iterator.foreach { e =>
          map.changeValue(e.srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
          map.changeValue(e.dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
        map.iterator.map { vidAndPosition =>
          val vid = vidAndPosition._1
          val position = vidAndPosition._2
          toMessage(vid, pid, position)
    private def toMessage(vid: VertexId, pid: PartitionID, position: Byte): RoutingTableMessage = {
        val positionUpper2 = position << 30
        val pidLower30 = pid & 0x3FFFFFFF
        (vid, positionUpper2 | pidLower30)

      根据代码,我们可以知道程序使用int32-31比特位表示标志位,即01: isSrcId ,10: isDstId30-0比特位表示边分区ID。这样做可以节省内存。

    • 根据路由表生成分区对象
    private[graphx] def createRoutingTables(
          edges: EdgeRDD[_], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
        // 将edge partition中的数据转换成RoutingTableMessage类型,
        val numEdgePartitions = edges.partitions.size
          iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)),
          preservesPartitioning = true)


     def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
        : RoutingTablePartition = {
        val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
        val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
        val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
        for (msg <- iter) {
          val vid = vidFromMessage(msg)
          val pid = pidFromMessage(msg)
          val position = positionFromMessage(msg)
          pid2vid(pid) += vid
          srcFlags(pid) += (position & 0x1) != 0
          dstFlags(pid) += (position & 0x2) != 0
        new RoutingTablePartition(pid2vid.zipWithIndex.map {
          case (vids, pid) => (vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid)))

    该方法从RoutingTableMessage获取数据,将vid, 边pid, isSrcId/isDstId重新封装到pid2vid,srcFlags,dstFlags这三个数据结构中。它们表示当前顶点分区中的点在边分区的分布。
    想象一下,重新分区后,新分区中的点可能来自于不同的边分区,所以一个点要找到边,就需要先确定边的分区号pid, 然后在确定的边分区中确定是srcId还是dstId, 这样就找到了边。
    新分区中保存vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid))这样的记录。这里转换为toBitSet保存是为了节省空间。


    def apply[VD: ClassTag](
          iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD,
          mergeFunc: (VD, VD) => VD): ShippableVertexPartition[VD] = {
        val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
        // 合并顶点
        iter.foreach { pair =>
          map.setMerge(pair._1, pair._2, mergeFunc)
        // 不全缺失的属性值
        routingTable.iterator.foreach { vid =>
          map.changeValue(vid, defaultVal, identity)
        new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable)
    ShippableVertexPartition[VD: ClassTag](
    val index: VertexIdToIndexMap,
    val values: Array[VD],
    val mask: BitSet,
    val routingTable: RoutingTablePartition)


    3.3 生成Graph对象


    使用上述构建的edgeRDDvertexRDD,使用 new GraphImpl(vertices, new ReplicatedVertexView(edges.asInstanceOf[EdgeRDDImpl[ED, VD]])) 就可以生成Graph对象。

    class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
        var edges: EdgeRDDImpl[ED, VD],
        var hasSrcId: Boolean = false,
        var hasDstId: Boolean = false) 



