美文网首页
数据算法 Hadoop/Spark大数据处理---第九章

数据算法 Hadoop/Spark大数据处理---第九章

作者: _Kantin | 来源:发表于2018-07-08 11:19 被阅读6次

本章为推荐引擎

本章为推荐引擎问题,通过host和他的friends,一对一的构建映射关系,然后进行分组,再判断推荐好友的方式来实现


本章一共有两种实现方式

  1. 基于传统spark来实现
  2. 基于传统Scala来实现

++基于传统spark来实现++

public static void main(String[] args) throws Exception {
    if (args.length < 1) {
       System.err.println("Usage: SparkFriendRecommendation <input-path>");
       System.exit(1);
    }  
    final String friendsInputPath = args[0];
    
    // create the first RDD from input
    JavaSparkContext ctx = SparkUtil.createJavaSparkContext("SparkFriendRecommendation");
    JavaRDD<String> records = ctx.textFile(friendsInputPath, 1);

    // debug0
    List<String> debug1 = records.collect();
    for (String t : debug1) {
      System.out.println("debug1 record="+t);
    }

    // flatMapToPair
    //    <K2,V2> JavaPairRDD<K2,V2> flatMapToPair(PairFlatMapFunction<T,K2,V2> f)
    //    Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

    // PairFlatMapFunction<T, K, V>
    // T => Iterable<Tuple2<K, V>>
    JavaPairRDD<Long, Tuple2<Long,Long>> pairs =
          //                                      T       K     V
          records.flatMapToPair(new PairFlatMapFunction<String, Long, Tuple2<Long,Long>>() {
      @Override
      public Iterator<Tuple2<Long,Tuple2<Long,Long>>> call(String record) {
         // record=<person><TAB><friend1><,><friend2><,><friend3><,>... 
         String[] tokens = record.split("\t");
         //取得person的名字
         long person = Long.parseLong(tokens[0]);
         //取得friend列表
         String friendsAsString = tokens[1];
         //friend列表按照逗号分割为数组[]
         String[] friendsTokenized = friendsAsString.split(",");
         
         List<Long> friends = new ArrayList<Long>();         
         List<Tuple2<Long,Tuple2<Long, Long>>> mapperOutput =
             new ArrayList<Tuple2<Long,Tuple2<Long, Long>>>();         
         for (String friendAsString : friendsTokenized) {
            long toUser = Long.parseLong(friendAsString);
            friends.add(toUser);
            //自己的还有为directFriend,赋值上-1
            Tuple2<Long,Long> directFriend = T2(toUser, -1L);
            mapperOutput.add(T2(person, directFriend));
         }

         for (int i = 0; i < friends.size(); i++) {
            for (int j = i + 1; j < friends.size(); j++) {
                //possibleFriend1第二方为host,其它的双方可相互替换
                // possible friend 1
                Tuple2<Long,Long> possibleFriend1 = T2(friends.get(j), person);
                mapperOutput.add(T2(friends.get(i), possibleFriend1));
                // possible friend 2
                Tuple2<Long,Long> possibleFriend2 = T2(friends.get(i), person);
                mapperOutput.add(T2(friends.get(j), possibleFriend2));
            }
         } 
                 
         return mapperOutput.iterator();
      }
    });
      /**
       *输出的结果为:debug2 key=1 value=(2,-1)
       *              debug2 key=1 value=(3,-1)……之类的直接好友
       *              debug2 key=2 value=(3,1)
       *              debug2 key=3 value=(2,1)……之类的相互好友
       */
    List<Tuple2<Long,Tuple2<Long,Long>>> debug2 = pairs.collect();
    for (Tuple2<Long,Tuple2<Long,Long>> t2 : debug2) {
      System.out.println("debug2 key="+t2._1+"\t value="+t2._2);
    }

    //对key进行分组
    JavaPairRDD<Long, Iterable<Tuple2<Long, Long>>> grouped = pairs.groupByKey();

    // debug3
    List<Tuple2<Long, Iterable<Tuple2<Long, Long>>>> debug3 = grouped.collect();
    for (Tuple2<Long, Iterable<Tuple2<Long, Long>>> t2 : debug3) {
      System.out.println("debug3 key="+t2._1+"\t value="+t2._2);
    }
    
    // Find intersection of all List<List<Long>>
    // mapValues[U](f: (V) => U): JavaPairRDD[K, U]
    // Pass each value in the key-value pair RDD through a map function without changing the keys;
    // this also retains the original RDD's partitioning.
    JavaPairRDD<Long, String> recommendations =
        grouped.mapValues(new Function< Iterable<Tuple2<Long, Long>>, // input
                                        String                        // final output
                                      >() {
      @Override
      public String call(Iterable<Tuple2<Long, Long>> values) {
      
        // mutualFriends.key = the recommended friend 
        // mutualFriends.value = the list of mutual friends
        final Map<Long, List<Long>> mutualFriends = new HashMap<Long, List<Long>>();
        for (Tuple2<Long, Long> t2 : values) {
            //可能好友
            final Long toUser = t2._1;
            //共同好友
            final Long mutualFriend = t2._2;
            //共同好友的value=-1
            final boolean alreadyFriend = (mutualFriend == -1);

            if (mutualFriends.containsKey(toUser)) {
                if (alreadyFriend) {
                     mutualFriends.put(toUser, null);
                } 
                else if (mutualFriends.get(toUser) != null) {
                        mutualFriends.get(toUser).add(mutualFriend);
                }
            } 
            else {
                if (alreadyFriend) {
                     mutualFriends.put(toUser, null);
                } 
                else {
                     //在这里获取所有推荐的用户列表
                     List<Long> list1 = new ArrayList<Long>(Arrays.asList(mutualFriend));
                     mutualFriends.put(toUser, list1);
                }
            }
        }
        //对mutualFriends进行构建
        return buildRecommendations(mutualFriends);
      }
    });


    // debug4
    List<Tuple2<Long,String>> debug4 = recommendations.collect();
    for (Tuple2<Long,String> t2 : debug4) {
      System.out.println("debug4 key="+t2._1+ "\t value="+t2._2);
    }

    // done
    ctx.close();
    System.exit(0);
  }

  //buildRecommendations用于构建共同好友的输出格式
  static String buildRecommendations(Map<Long, List<Long>> mutualFriends) {
     StringBuilder recommendations = new StringBuilder();
     for (Map.Entry<Long, List<Long>> entry : mutualFriends.entrySet()) {
        if (entry.getValue() == null) {
           continue;
        }
        recommendations.append(entry.getKey());
        recommendations.append(" (");
        recommendations.append(entry.getValue().size());
        recommendations.append(": ");         
        recommendations.append(entry.getValue());
        recommendations.append("),");
     }
     return recommendations.toString();
  }
     
  static Tuple2<Long,Long> buildSortedTuple(long a, long b) {
     if (a < b) {
        return new Tuple2<Long, Long>(a,b);
     }
     else {
        return new Tuple2<Long, Long>(b,a);
     }
  }

  static Tuple2<Long,Long> T2(long a, long b) {
     return new Tuple2<Long,Long>(a, b);
  }

  static Tuple2<Long,Tuple2<Long,Long>> T2(long a, Tuple2<Long,Long> b) {
     return new Tuple2<Long,Tuple2<Long,Long>>(a, b);
  }


++基于传统Scala来实现++

def main(args: Array[String]): Unit = {
    //
    if (args.size < 2) {
      println("Usage: FriendRecommendation <input-path> <output-path>")
      sys.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("FriendRecommendation")
    val sc = new SparkContext(sparkConf)

    val input = args(0)
    val output = args(1)

    val records = sc.textFile(input)

    val pairs = records.flatMap(line => {
      val tokens = line.split("\\s")
      //获得person
      val person = tokens(0).toLong
      //获得friends列表
      val friends = tokens(1).split(",").map(_.toLong).toList
      //directFriend的全部赋值为-1
      val mapperOutput = friends.map(directFriend => (person, (directFriend, -1.toLong)))
      /**
        * 这一步就进行到了类似:
        * debug2 key=1 value=(2,-1)
        * debug2 key=1 value=(3,-1)……之类的直接好友
        * debug2 key=2 value=(3,1)
        *  debug2 key=3 value=(2,1)……之类的相互好友
        */

      val result = for {
        fi <- friends
        fj <- friends
        possibleFriend1 = (fj, person)
        possibleFriend2 = (fi, person)
        if (fi != fj)
      } yield {
        (fi, possibleFriend1) :: (fj, possibleFriend2) :: List()
      }
      mapperOutput ::: result.flatten
    })
    
    //
    // note that groupByKey() provides an expensive solution 
    // [you must have enough memory/RAM to hold all values for 
    // a given key -- otherwise you might get OOM error], but
    // combineByKey() and reduceByKey() will give a better 
    // scale-out performance
    //  
    val grouped = pairs.groupByKey()

    val result = grouped.mapValues(values => {
      val mutualFriends = new collection.mutable.HashMap[Long, List[Long]].empty
      values.foreach(t2 => {
        //可能好友
        val toUser = t2._1
        //共同好友
        val mutualFriend = t2._2
        val alreadyFriend = (mutualFriend == -1)
        if (mutualFriends.contains(toUser)) {
          if (alreadyFriend) {
            mutualFriends.put(toUser, List.empty)
          } else if (mutualFriends.get(toUser).isDefined && mutualFriends.get(toUser).get.size > 0 && !mutualFriends.get(toUser).get.contains(mutualFriend)) {
            val existingList = mutualFriends.get(toUser).get
            mutualFriends.put(toUser, (mutualFriend :: existingList))
          }
        } else {
          if (alreadyFriend) {
            mutualFriends.put(toUser, List.empty)
          } else {
            mutualFriends.put(toUser, List(mutualFriend))
          }
        }
      })
      mutualFriends.filter(!_._2.isEmpty).toMap
    })

    result.saveAsTextFile(output)

    //
    // formatting and printing it to console for debugging purposes...
    // 
    result.foreach(f => {
      val friends = if (f._2.isEmpty) "" else {
        val items = f._2.map(tuple => (tuple._1, "(" + tuple._2.size + ": " + tuple._2.mkString("[", ",", "]") + ")")).map(g => "" + g._1 + " " + g._2)
        items.toList.mkString(",")
      }
      println(s"${f._1}: ${friends}")
    })

    // done
    sc.stop();
  }

相关文章

网友评论

      本文标题:数据算法 Hadoop/Spark大数据处理---第九章

      本文链接:https://www.haomeiwen.com/subject/gtqouftx.html