package com.silverbox.spark;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.collections.IteratorUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import com.google.common.base.Optional;
import scala.Tuple2;
/**
- 合并list
- @author Administrator
*/
public class Join4Listmap implements Serializable{
public static int i=0;
/**
模仿sql的join,合并两个list
*/
public static void main(String[] args) {
try {
SparkConf conf = new SparkConf().setAppName("Grouptop3").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Map<String,Object>> listm = new ArrayList<>();
Map<String,Object> m=new HashMap<>();
Map<String,Object> m2=new HashMap<>();
Map<String,Object> m3=new HashMap<>();
Map<String,Object> m4=new HashMap<>();
Map<String,Object> m5=new HashMap<>();
Map<String,Object> m6=new HashMap<>();
m.put("class", 10);
m.put("score", 1);
m3.put("class", 20);
m3.put("score", 2);
m4.put("class", 30);
m4.put("score", 3);
listm.add(m);
listm.add(m3);
listm.add(m4);
List<Map<String,Object>> listm2 = new ArrayList<>();
Map<String,Object> m21=new HashMap<>();
Map<String,Object> m22=new HashMap<>();
Map<String,Object> m23=new HashMap<>();
Map<String,Object> m24=new HashMap<>();
m21.put("class", 10);
m21.put("ad", "x11111111");
m22.put("class", 20);
m22.put("ad", "x22222222");
m23.put("class", 30);
m23.put("ad", "x33333333");
m24.put("class", 30);
m24.put("ad", "x1212121");
listm2.add(m21);
listm2.add(m22);
listm2.add(m23);
listm2.add(m24);
JavaRDD<Map<String, Object>> lines = sc.parallelize(listm);
JavaRDD<Map<String, Object>> lines2 = sc.parallelize(listm2);
//初始化map
lines= lines.map(new Function<Map<String, Object>,Map<String, Object>>(){
@Override
public Map<String, Object> call(Map<String, Object> m) throws Exception {
m.put("count", 1);
return m;
}
});
JavaPairRDD<String, Map<String,Object>> pairs = lines.mapToPair(new PairFunction<Map<String,Object>, String, Map<String,Object>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Map<String,Object>> call(Map<String,Object> line) throws Exception {
return new Tuple2<String, Map<String,Object>>(line.get("class").toString(), line);
}
});
JavaPairRDD<String, Map<String,Object>> pairs2 = lines2.mapToPair(new PairFunction<Map<String,Object>, String, Map<String,Object>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Map<String,Object>> call(Map<String,Object> line) throws Exception {
return new Tuple2<String, Map<String,Object>>(line.get("class").toString(), line);
}
});
JavaPairRDD<String, Tuple2<Map<String, Object>, Optional<Map<String, Object>>>> pairs3 = pairs.leftOuterJoin(pairs2);
pairs3.foreach(x->{
System.out.println("--------------pairs3------------");
System.out.println(x._1());
System.out.println(x._2());
});
System.out.println("------------end--pairs3------------");
/*打印结果:
--------------pairs3------------
20
({score=2, count=1, class=20},Optional.of({ad=x22222222, class=20}))
--------------pairs3------------
30
({score=3, count=1, class=30},Optional.of({ad=x33333333, class=30}))
--------------pairs3------------
30
({score=3, count=1, class=30},Optional.of({ad=x1212121, class=30}))
--------------pairs3------------
10
({score=1, count=1, class=10},Optional.of({ad=x11111111, class=10}))*/
//将Tuple2里面的map与Optional合并为一个map
JavaRDD<Map<String, Object>> mparis =
pairs3.map(new Function<Tuple2<String,Tuple2<Map<String,Object>,Optional<Map<String,Object>>>>,Map<String, Object>>(){
@Override
public Map<String, Object> call(
Tuple2<String, Tuple2<Map<String, Object>, Optional<Map<String, Object>>>> rdd) throws Exception {
Tuple2<Map<String, Object>, Optional<Map<String, Object>>> rd2 = rdd._2();
Map<String, Object> mrd = rd2._1();
Map<String, Object> opmd = rd2._2().orNull();
if(opmd!=null){
for(Entry<String, Object> en:opmd.entrySet()){
mrd.put(en.getKey(), en.getValue());
}
}else{
mrd.put("none", 1);
}
//System.out.println("-----map----mrd=====-----"+mrd);
return mrd;
}
});
mparis.foreach(x->{
System.out.println("------mparis.foreach------------"+x);//正确
});
/*打印结果:正确
------mparis.foreach------------{score=2, ad=x22222222, count=1, class=20}
------mparis.foreach------------{score=3, ad=x33333333, count=1, class=30}
------mparis.foreach------------{score=3, ad=x1212121, count=1, class=30}
------mparis.foreach------------{score=1, ad=x11111111, count=1, class=10}*/
//spark collect有个严重的bug,取的值不对,应使用下面的深度拷贝的副本
List<Map<String, Object>> joinList = mparis.collect();//不正确????
System.out.println("---------joinList----------------------------");
joinList.forEach(x->{
System.out.println("----------------joinList forEach===== "+x);
});
System.out.println("---------joinList--end--------------------------");
System.out.println("-------collect--joinList--==--------------------------"+joinList);
/* 打印结果,错误:
第2,3行的ad的值都是一样了,被覆盖了
----------------joinList forEach===== {score=2, ad=x22222222, count=1, class=20}
----------------joinList forEach===== {score=3, ad=x1212121, count=1, class=30}
----------------joinList forEach===== {score=3, ad=x1212121, count=1, class=30}
----------------joinList forEach===== {score=1, ad=x11111111, count=1, class=10}*/
/**
* 使用深度拷贝为副本解决spark的collect的问题
*/
JavaRDD<Map<String, Object>> copaympairs = mparis.map(new Function<Map<String, Object>, Map<String, Object>> (){
@Override
public Map<String, Object> call(Map<String, Object> m) throws Exception {
Map<String, Object> cm=new HashMap<>();
cm.putAll(m);
return cm;
}
});
List<Map<String, Object>> copyCollect = copaympairs.collect();
//以下打印正确结果
copyCollect.forEach(x->{
System.out.println("----------------copyCollect forEach===== "+x);
});
/*打印结果,正确:
第2、3行的ad的值不同,没有覆盖
----------------copyCollect forEach===== {score=2, ad=x22222222, count=1, class=20}
----------------copyCollect forEach===== {score=3, ad=x33333333, count=1, class=30}
----------------copyCollect forEach===== {score=3, ad=x1212121, count=1, class=30}
----------------copyCollect forEach===== {score=1, ad=x11111111, count=1, class=10}*/
/*结论:spark的collect方法收集的结果是错误的,这个bug使用拷贝的副本可以解决*/
sc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
网友评论