美文网首页
spark+java的collect方法bug问题

spark+java的collect方法bug问题

作者: 舒尔诚 | 来源:发表于2019-01-28 11:59 被阅读0次

    package com.silverbox.spark;

    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;

    import org.apache.commons.collections.IteratorUtils;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaFutureAction;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;

    import com.google.common.base.Optional;

    import scala.Tuple2;

    /**

    • 合并list
    • @author Administrator

    */
    public class Join4Listmap implements Serializable{
    public static int i=0;

    /**
    模仿sql的join,合并两个list
    */
    public static void main(String[] args) {

        try {
            SparkConf conf = new SparkConf().setAppName("Grouptop3").setMaster("local");
    
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            
            
            
            List<Map<String,Object>> listm = new ArrayList<>();
            Map<String,Object> m=new HashMap<>();
            Map<String,Object> m2=new HashMap<>();
            Map<String,Object> m3=new HashMap<>();
            Map<String,Object> m4=new HashMap<>();
            Map<String,Object> m5=new HashMap<>();
            Map<String,Object> m6=new HashMap<>();
            
            m.put("class", 10);
            m.put("score", 1);
            
            
            
            m3.put("class", 20);
            m3.put("score", 2);
            
            
            m4.put("class", 30);
            m4.put("score", 3);
            
            listm.add(m);
            listm.add(m3);
            listm.add(m4);
    
            
            
            List<Map<String,Object>> listm2 = new ArrayList<>();
            Map<String,Object> m21=new HashMap<>();
            Map<String,Object> m22=new HashMap<>();
            Map<String,Object> m23=new HashMap<>();
            Map<String,Object> m24=new HashMap<>();
            
            m21.put("class", 10);
            m21.put("ad", "x11111111");
            
            m22.put("class", 20);
            m22.put("ad", "x22222222");
            
            m23.put("class", 30);
            m23.put("ad", "x33333333");
    
            
            m24.put("class", 30);
            m24.put("ad", "x1212121");
            
            
            listm2.add(m21);
            listm2.add(m22);
            listm2.add(m23);
            listm2.add(m24);
    
            
             JavaRDD<Map<String, Object>> lines = sc.parallelize(listm);
             
             JavaRDD<Map<String, Object>> lines2 = sc.parallelize(listm2);
             
             
             //初始化map
             lines= lines.map(new Function<Map<String, Object>,Map<String, Object>>(){
    
                @Override
                public Map<String, Object> call(Map<String, Object> m) throws Exception {
                    m.put("count", 1);
                    return m;
                }
                 
             });
            
            
             
            
            JavaPairRDD<String, Map<String,Object>> pairs = lines.mapToPair(new PairFunction<Map<String,Object>, String, Map<String,Object>>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<String, Map<String,Object>> call(Map<String,Object> line) throws Exception {
                    
                    return new Tuple2<String, Map<String,Object>>(line.get("class").toString(), line);
                }
            });
             
            
            JavaPairRDD<String, Map<String,Object>> pairs2 = lines2.mapToPair(new PairFunction<Map<String,Object>, String, Map<String,Object>>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<String, Map<String,Object>> call(Map<String,Object> line) throws Exception {
                    
                    return new Tuple2<String, Map<String,Object>>(line.get("class").toString(), line);
                }
            });
            
             
            JavaPairRDD<String, Tuple2<Map<String, Object>, Optional<Map<String, Object>>>> pairs3 = pairs.leftOuterJoin(pairs2);
             
            pairs3.foreach(x->{
                System.out.println("--------------pairs3------------");
                System.out.println(x._1());
                System.out.println(x._2());
    
            });
            System.out.println("------------end--pairs3------------"); 
             
            
            /*打印结果:
            --------------pairs3------------
            20
            ({score=2, count=1, class=20},Optional.of({ad=x22222222, class=20}))
            --------------pairs3------------
            30
            ({score=3, count=1, class=30},Optional.of({ad=x33333333, class=30}))
            --------------pairs3------------
            30
            ({score=3, count=1, class=30},Optional.of({ad=x1212121, class=30}))
            --------------pairs3------------
            10
            ({score=1, count=1, class=10},Optional.of({ad=x11111111, class=10}))*/
            
            
            
            
            //将Tuple2里面的map与Optional合并为一个map
            JavaRDD<Map<String, Object>> mparis = 
                    pairs3.map(new Function<Tuple2<String,Tuple2<Map<String,Object>,Optional<Map<String,Object>>>>,Map<String, Object>>(){
    
                @Override
                public Map<String, Object> call(
                        Tuple2<String, Tuple2<Map<String, Object>, Optional<Map<String, Object>>>> rdd) throws Exception {
                    
                    Tuple2<Map<String, Object>, Optional<Map<String, Object>>> rd2 = rdd._2();
                    Map<String, Object> mrd = rd2._1();
                    Map<String, Object> opmd = rd2._2().orNull();
                    if(opmd!=null){
                        for(Entry<String, Object> en:opmd.entrySet()){
                            mrd.put(en.getKey(), en.getValue());
                        }
                    }else{
                        mrd.put("none", 1);
                    }
                    
                    //System.out.println("-----map----mrd=====-----"+mrd);
                    return mrd;
                }
                
            });
            
            mparis.foreach(x->{
                System.out.println("------mparis.foreach------------"+x);//正确
            });
            
            /*打印结果:正确
            ------mparis.foreach------------{score=2, ad=x22222222, count=1, class=20}
            ------mparis.foreach------------{score=3, ad=x33333333, count=1, class=30}
            ------mparis.foreach------------{score=3, ad=x1212121, count=1, class=30}
            ------mparis.foreach------------{score=1, ad=x11111111, count=1, class=10}*/
            
            
            
            
            
            //spark collect有个严重的bug,取的值不对,应使用下面的深度拷贝的副本
            
            List<Map<String, Object>> joinList = mparis.collect();//不正确????
            System.out.println("---------joinList----------------------------");
            joinList.forEach(x->{
                System.out.println("----------------joinList  forEach===== "+x);
            });
            System.out.println("---------joinList--end--------------------------");
            System.out.println("-------collect--joinList--==--------------------------"+joinList);
            
            
        /*  打印结果,错误:
                       第2,3行的ad的值都是一样了,被覆盖了
            ----------------joinList  forEach===== {score=2, ad=x22222222, count=1, class=20}
            ----------------joinList  forEach===== {score=3, ad=x1212121, count=1, class=30}
            ----------------joinList  forEach===== {score=3, ad=x1212121, count=1, class=30}
            ----------------joinList  forEach===== {score=1, ad=x11111111, count=1, class=10}*/
            
            
            
            /**
             * 使用深度拷贝为副本解决spark的collect的问题
             */
            JavaRDD<Map<String, Object>> copaympairs = mparis.map(new Function<Map<String, Object>, Map<String, Object>> (){
    
                @Override
                public Map<String, Object> call(Map<String, Object> m) throws Exception {
                    Map<String, Object> cm=new HashMap<>();
                    cm.putAll(m);
                    return cm;
                }
                
            });
            
            List<Map<String, Object>> copyCollect = copaympairs.collect();
            //以下打印正确结果
            copyCollect.forEach(x->{
                System.out.println("----------------copyCollect  forEach===== "+x);
            });
            
            /*打印结果,正确:
                         第2、3行的ad的值不同,没有覆盖
            ----------------copyCollect  forEach===== {score=2, ad=x22222222, count=1, class=20}
            ----------------copyCollect  forEach===== {score=3, ad=x33333333, count=1, class=30}
            ----------------copyCollect  forEach===== {score=3, ad=x1212121, count=1, class=30}
            ----------------copyCollect  forEach===== {score=1, ad=x11111111, count=1, class=10}*/
            
            /*结论:spark的collect方法收集的结果是错误的,这个bug使用拷贝的副本可以解决*/
            
            sc.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        
        
    }
    

    }

    相关文章

      网友评论

          本文标题:spark+java的collect方法bug问题

          本文链接:https://www.haomeiwen.com/subject/prcajqtx.html