本文通过GraphX的aggregateMessages方法计算社交网络中某个人的所有粉丝的平均年龄
算法过程
- 图1中假设2,3,4这三个人同时关注了1
- 图2中通过aggregateMessages的自定义Map函数,将每个关注者的年龄发送给被关注者
- 图3中通过aggregateMessages的自定义Reduce函数,对发送过来的数据进行累加
- 图4最终用年龄总和除以关注者的人数,得到关注者的平均年龄,这样被关注者1就增加了一个新的属性,粉丝的平均年龄
算法实现
数据准备
users.txt
1,Andy,40
2,Gigi,25
3,Maggie,20
4,Annie,18
5,Amanda,24
6,Matei,30
7,Martin,35
followers.txt
2 1
3 1
4 1
7 3
6 2
5 2
实现
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Graph, GraphLoader, VertexRDD}
object AggregateMessagesTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Aggregate Messages Test").setMaster("local[1]")
val sc = new SparkContext(conf)
val users = (sc.textFile("/tmp/users.txt")
.map(line => line.split(",")).map( row => (row(0).toLong, (row(1), row(2).toLong))))
val followerGraph = GraphLoader.edgeListFile(sc, "/tmp/followers.txt")
// 添加用户属性
val graph = followerGraph.outerJoinVertices(users) {
(_, _, attr) => attr.get
}
// 计算粉丝的数量以及年龄和
val followers: VertexRDD[(Int, Long)] = graph.aggregateMessages[(Int, Long)](
triplet => { // Map Function
// 发送顶点数和年龄到被关注者
triplet.sendToDst((1, triplet.srcAttr._2))
},
// 累计粉丝数量和年龄
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// 计算平均年龄
val avgAgeOfFollowers: VertexRDD[Double] =
followers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )
// 显示结果
avgAgeOfFollowers.collect.foreach(println(_))
}
}
输出结果
(1,21.0)
(3,35.0)
(2,27.0)
可以看到顶点1的粉丝平均年龄为21,和算法模拟过程的结果一致
网友评论