美文网首页
6. Spark数据分区示例:PageRank

6. Spark数据分区示例:PageRank

作者: 泊牧 | 来源:发表于2018-03-23 10:25 被阅读111次

    1. 算法简介

    PageRank 是执行多次连接的一个迭代算法,因此它是RDD 分区操作的一个很好的用例。算法会维护两个数据集:

    • 一个由(pageID, linkList) 的元素组成,包含每个页面的相邻页面的列表;
    • 另一个由(pageID, rank) 元素组成,包含每个页面的当前排序值。

    它按如下步骤进行计算:

    1. 将每个页面的排序值初始化为1.0。
    2. 在每次迭代中,对页面p,向其每个相邻页面(有直接链接的页面)发送一个值为rank(p)/numNeighbors(p) 的贡献值。
    3. 将每个页面的排序值设为0.15 + 0.85 * contributionsReceived。

    最后两步会重复几个循环,在此过程中,算法会逐渐收敛于每个页面的实际PageRank 值。在实际操作中,收敛通常需要大约10 轮迭代。

    2. 数据模拟

    假设一个由4个页面组成的小团体:A,B,C,D。相邻页面如下所示:
    A:B C
    B:A C
    C:A B D
    D:C

    4. 测试代码

    // Scala版PageRank
    import org.apache.spark.HashPartitioner  
    // 假设相邻页面列表以Spark objectFile的形式存储
    val links = sc.parallelize(List(
      ("A",List("B","C")),
      ("B",List("A","C")),
      ("C",List("A","B","D")),
      ("D",List("C"))
    )).partitionBy(new HashPartitioner(100))
      .persist()  
    // 将每个页面的排序值初始化为1.0;由于使用mapValues,生成的RDD
    // 的分区方式会和"links"的一样
    var ranks = links.mapValues(v => 1.0)
    // 运行10轮PageRank迭代
    for(i <- 0 until 10) {
      val contributions = links.join(ranks).flatMap {
        case (pageId, (links, rank)) =>
          links.map(dest => (dest, rank / links.size))
      }
      ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)
    }
    // 写出最终排名
    ranks.sortByKey().collect().foreach(println)
    
    运行结果

    5. 运行过程分析

    初始的linksRDD和ranksRDD如下所示:

    //linksRDD:
    Array[(String, List[String])] = Array(
      (A,List(B, C)), 
      (B,List(A, C)), 
      (C,List(A, B, D)), 
      (D,List(C)) )
    //ranksRDD:
    Array[(String, Double)] = Array((A,1.0), (B,1.0), (C,1.0), (D,1.0))
    

    首次迭代后的contributionsRDD和ranksRDD如下所示:

    //contributionsRDD:
    Array[(String, Double)] = Array(
     (A,0.5), (A,0.3333333333333333), 
     (B,0.5), (B,0.3333333333333333), 
     (C,0.5), (C,0.5), (C,1.0), (D,0.3333333333333333))
    //ranksRDD:
    Array[(String, Double)] = Array(
     (A,0.8583333333333333),
     (B,0.8583333333333333), 
     (C,1.8499999999999999), 
     (D,0.43333333333333335) )
    

    验证数据:
    第1次迭代:

    PR(A)=0.15 + 0.85 * (1/2 + 1/3) = 0.858333
    PR(B)=0.15 + 0.85 * (1/2 + 1/3) = 0.858333
    PR(C)=0.15 + 0.85 * (1/2 + 1/2 + 1/1) = 1.85
    PR(D)=0.15 + 0.85 * (1/3) = 0.433333
    

    第2次迭代:

    PR(A)=0.15 + 0.85 * (0.858333/2 + 1.85/3) = 1.038958191100
    PR(B)=0.15 + 0.85 * (0.858333/2 + 1.85/3) = 1.038958191100
    PR(C)=0.15 + 0.85 * (0.858333/2 + 0.858333/2 + 0.433333/1) = 1.247916100000
    PR(D)=0.15 + 0.85 * (1.85/3) = 0.67416667
    

    第3次迭代:

    PR(A)=0.15 + 0.85 * (1.038958191100/2 + 1.247916100000/3) = 0.945133459550833333
    PR(B)=0.15 + 0.85 * (1.038958191100/2 + 1.247916100000/3) = 0.945133459550833333
    PR(C)=0.15 + 0.85 * (1.038958191100/2 + 1.038958191100/2 + 0.67416667/1) = 1.606156131935
    PR(D)=0.15 + 0.85 * (1.247916100000/3) = 0.503576228333333333
    

    首先对当前的ranksRDD和静态的linksRDD 进行一次join() 操作,来获取每个页面ID对应的相邻页面列表和当前的排序值,然后使用flatMap创建出“contributions”来记录每个页面对各相邻页面的贡献。然后再把这些贡献值按照页面ID(根据获得共享的页面)分别累加起来,把该页面的排序值设为0.15 + 0.85 * contributionsReceived。

    虽然代码本身很简单,这个示例程序还是做了不少事情来确保RDD 以比较高效的方式进行分区,以最小化通信开销:

    1. 请注意,linksRDD 在每次迭代中都会和ranks 发生连接操作。由于links 是一个静态数据集,所以我们在程序一开始的时候就对它进行了分区操作,这样就不需要把它通过网络进行数据混洗了。实际上,linksRDD 的字节数一般来说也会比ranks 大很多,毕竟它包含每个页面的相邻页面列表(由页面ID 组成),而不仅仅是一个Double 值,因此这一优化相比PageRank 的原始实现(例如普通的MapReduce)节约了相当可观的网络通信开销。
    2. 出于同样的原因,我们调用links 的persist() 方法,将它保留在内存中以供每次迭代使用。
    3. 当我们第一次创建ranks 时,我们使用mapValues() 而不是map() 来保留父RDD(links)的分区方式,这样对它进行的第一次连接操作就会开销很小。
    4. 在循环体中,我们在reduceByKey() 后使用mapValues();因为reduceByKey() 的结果已经是哈希分区的了,这样一来,下一次循环中将映射操作的结果再次与links 进行连接操作时就会更加高效。

    注意:为了最大化分区相关优化的潜在作用,你应该在无需改变元素的键时尽量使用mapValues() 或flatMapValues()。

    相关文章

      网友评论

          本文标题:6. Spark数据分区示例:PageRank

          本文链接:https://www.haomeiwen.com/subject/epmtcftx.html