Hello GraphX

作者: Woople | 来源:发表于2018-06-12 10:50 被阅读71次

本文将通过一个简单样例来讲解,Spark GraphX中的一些基本概念和常规操作。

样例

首先需要在pom中配置GraphX的依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-graphx_2.10</artifactId>
    <version>1.6.3</version>
</dependency>

完整的样例代码

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object HelloGraphX {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Hello GraphX")
    val sc = new SparkContext(conf)

    // Create an RDD for the vertices
    val users: RDD[(VertexId, (String, String))] =
      sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
        (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
    // Create an RDD for edges
    val relationships: RDD[Edge[String]] =
      sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
        Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), Edge(5L, 0L, "colleague")))
    // Define a default user in case there are relationship with missing user
    val defaultUser = ("John Doe", "Missing")
    // Build the initial Graph
    val graph = Graph(users, relationships, defaultUser)
    // Notice that there is a user 0 (for which we have no information) connected to users 5 (franklin).
    graph.triplets.map(
      triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
    ).collect.foreach(println(_))
    // Remove missing vertices as well as the edges to connected to them
    val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
    // The valid subgraph will disconnect users 5 by removing user 0
    validGraph.vertices.collect.foreach(println(_))
    validGraph.triplets.map(
      triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
    ).collect.foreach(println(_))

    println("Count all users which are prof:"+validGraph.vertices.filter { case (id, (name, pos)) => pos == "prof" }.count)

    println("Count all the edges where src < dst:"+graph.edges.filter(e => e.srcId < e.dstId).count)

    sc.stop()
  }
}

最后会将结果输出到driver的日志中

rxin is the collab of jgonzal
franklin is the advisor of rxin
istoica is the colleague of franklin
franklin is the colleague of John Doe
franklin is the pi of jgonzal

......

istoica is the colleague of franklin
rxin is the collab of jgonzal
franklin is the advisor of rxin
franklin is the pi of jgonzal

......

Count all users which are prof:2

......

Count all the edges where src < dst:3

详解

  1. 样例中的第一步是构建了一个图,这里用到了最简单的构建图的方式,就是通过Graph的构造方法,分别将顶点的RDD和边的RDD作为参数传入。构建的关系图如下:


由于边的定义中有Edge(5L, 0L, "colleague"),但是并没有定义0这个顶点,所以在构建图的时候会使用Graph构造方法中的第三个参数,将顶点0的属性默认设置为("John Doe", "Missing")

  1. 过滤掉属性为"Missing"的顶点,得到一个新的图,然后通过下面的代码,将这个图遍历一次
    validGraph.vertices.collect.foreach(println(_))
    validGraph.triplets.map(
      triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
    ).collect.foreach(println(_))
  1. 通过graph.verticesgraph.edges可以将一个图分别拆分成顶点视图和边的视图,他们的返回值分别是VertexRDDEdgeRDD。所以通过RDD的一些方法,可以进一步对顶点或者边进行过滤。用户中属性是pro的只有5和2,所以Count all users which are prof:2;同样在Edge Table中DstId大于SrcId的有3个。

总结

  1. 通过一个简单的例子,展示了如何使用GraphX的API构建一个图,以及基本的操作
  2. 本文测试环境是基于HDP-2.6.0.3,文中的样例参考http://spark.apache.org/docs/1.6.3/graphx-programming-guide.html
  3. 在生产环境中构建图的数据源更多可能是来自某些外部的文件,也就是说需要通过下面这种方式加载一个图,这种情况的完整代码请参加附录
    // Load my user data and parse into tuples of user id and attribute list
    val users = (sc.textFile("/tmp/users.txt")
      .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))

    // Parse the edge data which is already in userId -> userId format
    val followerGraph = GraphLoader.edgeListFile(sc, "/tmp/followers.txt")

附录

import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}

object GraphXExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(s"GraphX Example")
    val sc = new SparkContext(conf)

    // Load my user data and parse into tuples of user id and attribute list
    val users = (sc.textFile("/tmp/users.txt")
      .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))

    // Parse the edge data which is already in userId -> userId format
    val followerGraph = GraphLoader.edgeListFile(sc, "/tmp/followers.txt")

    // Attach the user attributes
    val graph = followerGraph.outerJoinVertices(users) {
      case (uid, deg, Some(attrList)) => attrList
      // Some users may not have attributes so we set them as empty
      case (uid, deg, None) => Array.empty[String]
    }

    // Restrict the graph to users with usernames and names
    val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)

    // Compute the PageRank
    val pagerankGraph = subgraph.pageRank(0.001)

    // Get the attributes of the top pagerank users
    val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
      case (uid, attrList, Some(pr)) => (pr, attrList.toList)
      case (uid, attrList, None) => (0.0, attrList.toList)
    }

    println(s"The result is :${userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n")}")

    sc.stop()
  }
}

样例中有两个数据文件
users.txt

1,BarackObama,Barack Obama
2,ladygaga,Goddess of Love
3,jeresig,John Resig
4,justinbieber,Justin Bieber
6,matei_zaharia,Matei Zaharia
7,odersky,Martin Odersky
8,anonsys

followers.txt

2 1
4 1
1 2
6 3
7 3
7 6
6 7
3 7

要将这两个数据文件放到hdfs的/tmp路径下面。最后会将结果输出到driver的日志中

The result is :(1,(1.453834747463902,List(BarackObama, Barack Obama)))
(2,(1.3857595353443166,List(ladygaga, Goddess of Love)))
(7,(1.2892158818481694,List(odersky, Martin Odersky)))
(3,(0.9936187772892124,List(jeresig, John Resig)))
(6,(0.697916749785472,List(matei_zaharia, Matei Zaharia)))

相关文章

  • Hello GraphX

    本文将通过一个简单样例来讲解,Spark GraphX中的一些基本概念和常规操作。 样例 首先需要在pom中配置G...

  • Spark GraphX

    Spark GraphX概述 GraphX是Spark的一个组件,专门用来表示图以及进行图的并行计算。GraphX...

  • scala val 和def 的区别,graphx mapTri

    spark + scala + sql+graphx 的版本一致性的坑 在使用spark graphx 的mapE...

  • Spark GraphX图计算框架原理概述

    【转载】原文地址:原文地址 概述   GraphX是Spark中用于图和图计算的组件,GraphX通过扩展Spar...

  • GraphX 学习笔记

    “像顶点一样思考” ​ Spark GraphX是一个分布式图处理框架,Spark GraphX基于Spark平...

  • GraphX之Connected Components

    在Spark Graphx的org.apache.spark.graphx.lib包中有一些常用的图算法,其中一个...

  • K8S operator

    最近在看hugegraph-computer代码,基本上是自己实现了一个 GraphX的组件;graphX用了RD...

  • GraphX分布式存储

    GraphX分布式存储 GraphX将图数据以RDD分布式地存储在集群的节点上,使用顶点RDD(VertexRDD...

  • Spark GraphX

    Spark GraphX GraphX简介 主要特点 演化过程 应用场景 分布式图计算处理技术介绍 下面分别从图数...

  • neo4j之图计算

    如何通过neo4j做图计算 spark中graphx对neo4j的数据进行读取,然后通过graphx的相关算法进行...

网友评论

    本文标题:Hello GraphX

    本文链接:https://www.haomeiwen.com/subject/zatleftx.html