本章为推荐引擎
本章为推荐引擎问题,通过host和他的friends,一对一的构建映射关系,然后进行分组,再判断推荐好友的方式来实现
本章一共有两种实现方式
- 基于传统spark来实现
- 基于传统Scala来实现
++基于传统spark来实现++
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: SparkFriendRecommendation <input-path>");
System.exit(1);
}
final String friendsInputPath = args[0];
// create the first RDD from input
JavaSparkContext ctx = SparkUtil.createJavaSparkContext("SparkFriendRecommendation");
JavaRDD<String> records = ctx.textFile(friendsInputPath, 1);
// debug0
List<String> debug1 = records.collect();
for (String t : debug1) {
System.out.println("debug1 record="+t);
}
// flatMapToPair
// <K2,V2> JavaPairRDD<K2,V2> flatMapToPair(PairFlatMapFunction<T,K2,V2> f)
// Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
// PairFlatMapFunction<T, K, V>
// T => Iterable<Tuple2<K, V>>
JavaPairRDD<Long, Tuple2<Long,Long>> pairs =
// T K V
records.flatMapToPair(new PairFlatMapFunction<String, Long, Tuple2<Long,Long>>() {
@Override
public Iterator<Tuple2<Long,Tuple2<Long,Long>>> call(String record) {
// record=<person><TAB><friend1><,><friend2><,><friend3><,>...
String[] tokens = record.split("\t");
//取得person的名字
long person = Long.parseLong(tokens[0]);
//取得friend列表
String friendsAsString = tokens[1];
//friend列表按照逗号分割为数组[]
String[] friendsTokenized = friendsAsString.split(",");
List<Long> friends = new ArrayList<Long>();
List<Tuple2<Long,Tuple2<Long, Long>>> mapperOutput =
new ArrayList<Tuple2<Long,Tuple2<Long, Long>>>();
for (String friendAsString : friendsTokenized) {
long toUser = Long.parseLong(friendAsString);
friends.add(toUser);
//自己的还有为directFriend,赋值上-1
Tuple2<Long,Long> directFriend = T2(toUser, -1L);
mapperOutput.add(T2(person, directFriend));
}
for (int i = 0; i < friends.size(); i++) {
for (int j = i + 1; j < friends.size(); j++) {
//possibleFriend1第二方为host,其它的双方可相互替换
// possible friend 1
Tuple2<Long,Long> possibleFriend1 = T2(friends.get(j), person);
mapperOutput.add(T2(friends.get(i), possibleFriend1));
// possible friend 2
Tuple2<Long,Long> possibleFriend2 = T2(friends.get(i), person);
mapperOutput.add(T2(friends.get(j), possibleFriend2));
}
}
return mapperOutput.iterator();
}
});
/**
*输出的结果为:debug2 key=1 value=(2,-1)
* debug2 key=1 value=(3,-1)……之类的直接好友
* debug2 key=2 value=(3,1)
* debug2 key=3 value=(2,1)……之类的相互好友
*/
List<Tuple2<Long,Tuple2<Long,Long>>> debug2 = pairs.collect();
for (Tuple2<Long,Tuple2<Long,Long>> t2 : debug2) {
System.out.println("debug2 key="+t2._1+"\t value="+t2._2);
}
//对key进行分组
JavaPairRDD<Long, Iterable<Tuple2<Long, Long>>> grouped = pairs.groupByKey();
// debug3
List<Tuple2<Long, Iterable<Tuple2<Long, Long>>>> debug3 = grouped.collect();
for (Tuple2<Long, Iterable<Tuple2<Long, Long>>> t2 : debug3) {
System.out.println("debug3 key="+t2._1+"\t value="+t2._2);
}
// Find intersection of all List<List<Long>>
// mapValues[U](f: (V) => U): JavaPairRDD[K, U]
// Pass each value in the key-value pair RDD through a map function without changing the keys;
// this also retains the original RDD's partitioning.
JavaPairRDD<Long, String> recommendations =
grouped.mapValues(new Function< Iterable<Tuple2<Long, Long>>, // input
String // final output
>() {
@Override
public String call(Iterable<Tuple2<Long, Long>> values) {
// mutualFriends.key = the recommended friend
// mutualFriends.value = the list of mutual friends
final Map<Long, List<Long>> mutualFriends = new HashMap<Long, List<Long>>();
for (Tuple2<Long, Long> t2 : values) {
//可能好友
final Long toUser = t2._1;
//共同好友
final Long mutualFriend = t2._2;
//共同好友的value=-1
final boolean alreadyFriend = (mutualFriend == -1);
if (mutualFriends.containsKey(toUser)) {
if (alreadyFriend) {
mutualFriends.put(toUser, null);
}
else if (mutualFriends.get(toUser) != null) {
mutualFriends.get(toUser).add(mutualFriend);
}
}
else {
if (alreadyFriend) {
mutualFriends.put(toUser, null);
}
else {
//在这里获取所有推荐的用户列表
List<Long> list1 = new ArrayList<Long>(Arrays.asList(mutualFriend));
mutualFriends.put(toUser, list1);
}
}
}
//对mutualFriends进行构建
return buildRecommendations(mutualFriends);
}
});
// debug4
List<Tuple2<Long,String>> debug4 = recommendations.collect();
for (Tuple2<Long,String> t2 : debug4) {
System.out.println("debug4 key="+t2._1+ "\t value="+t2._2);
}
// done
ctx.close();
System.exit(0);
}
//buildRecommendations用于构建共同好友的输出格式
static String buildRecommendations(Map<Long, List<Long>> mutualFriends) {
StringBuilder recommendations = new StringBuilder();
for (Map.Entry<Long, List<Long>> entry : mutualFriends.entrySet()) {
if (entry.getValue() == null) {
continue;
}
recommendations.append(entry.getKey());
recommendations.append(" (");
recommendations.append(entry.getValue().size());
recommendations.append(": ");
recommendations.append(entry.getValue());
recommendations.append("),");
}
return recommendations.toString();
}
static Tuple2<Long,Long> buildSortedTuple(long a, long b) {
if (a < b) {
return new Tuple2<Long, Long>(a,b);
}
else {
return new Tuple2<Long, Long>(b,a);
}
}
static Tuple2<Long,Long> T2(long a, long b) {
return new Tuple2<Long,Long>(a, b);
}
static Tuple2<Long,Tuple2<Long,Long>> T2(long a, Tuple2<Long,Long> b) {
return new Tuple2<Long,Tuple2<Long,Long>>(a, b);
}
++基于传统Scala来实现++
def main(args: Array[String]): Unit = {
//
if (args.size < 2) {
println("Usage: FriendRecommendation <input-path> <output-path>")
sys.exit(1)
}
val sparkConf = new SparkConf().setAppName("FriendRecommendation")
val sc = new SparkContext(sparkConf)
val input = args(0)
val output = args(1)
val records = sc.textFile(input)
val pairs = records.flatMap(line => {
val tokens = line.split("\\s")
//获得person
val person = tokens(0).toLong
//获得friends列表
val friends = tokens(1).split(",").map(_.toLong).toList
//directFriend的全部赋值为-1
val mapperOutput = friends.map(directFriend => (person, (directFriend, -1.toLong)))
/**
* 这一步就进行到了类似:
* debug2 key=1 value=(2,-1)
* debug2 key=1 value=(3,-1)……之类的直接好友
* debug2 key=2 value=(3,1)
* debug2 key=3 value=(2,1)……之类的相互好友
*/
val result = for {
fi <- friends
fj <- friends
possibleFriend1 = (fj, person)
possibleFriend2 = (fi, person)
if (fi != fj)
} yield {
(fi, possibleFriend1) :: (fj, possibleFriend2) :: List()
}
mapperOutput ::: result.flatten
})
//
// note that groupByKey() provides an expensive solution
// [you must have enough memory/RAM to hold all values for
// a given key -- otherwise you might get OOM error], but
// combineByKey() and reduceByKey() will give a better
// scale-out performance
//
val grouped = pairs.groupByKey()
val result = grouped.mapValues(values => {
val mutualFriends = new collection.mutable.HashMap[Long, List[Long]].empty
values.foreach(t2 => {
//可能好友
val toUser = t2._1
//共同好友
val mutualFriend = t2._2
val alreadyFriend = (mutualFriend == -1)
if (mutualFriends.contains(toUser)) {
if (alreadyFriend) {
mutualFriends.put(toUser, List.empty)
} else if (mutualFriends.get(toUser).isDefined && mutualFriends.get(toUser).get.size > 0 && !mutualFriends.get(toUser).get.contains(mutualFriend)) {
val existingList = mutualFriends.get(toUser).get
mutualFriends.put(toUser, (mutualFriend :: existingList))
}
} else {
if (alreadyFriend) {
mutualFriends.put(toUser, List.empty)
} else {
mutualFriends.put(toUser, List(mutualFriend))
}
}
})
mutualFriends.filter(!_._2.isEmpty).toMap
})
result.saveAsTextFile(output)
//
// formatting and printing it to console for debugging purposes...
//
result.foreach(f => {
val friends = if (f._2.isEmpty) "" else {
val items = f._2.map(tuple => (tuple._1, "(" + tuple._2.size + ": " + tuple._2.mkString("[", ",", "]") + ")")).map(g => "" + g._1 + " " + g._2)
items.toList.mkString(",")
}
println(s"${f._1}: ${friends}")
})
// done
sc.stop();
}
网友评论