Spark 的 cogroup 和 join 算子

作者: stone_zhu | 来源:发表于2019-06-26 18:02 被阅读0次

    cogroup 这个算子使用的频率很低,join 算子使用频率较高,两者都是根据两个 RDD 的 key 进行关联。具体看下面的代码,先看下面的 2 个 RDD:

    SparkConf conf = new SparkConf()
                    .setAppName("co")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            List<Tuple2<String, Integer>> words1 = Arrays.asList(
                    new Tuple2<>("hello", 3),
                    new Tuple2<>("hello", 2),
                    new Tuple2<>("world", 7),
                    new Tuple2<>("hello", 12),
                    new Tuple2<>("you", 9)
            );
    
            List<Tuple2<String, Integer>> words2 = Arrays.asList(
                    new Tuple2<>("hello", 21),
                    new Tuple2<>("world", 24),
                    new Tuple2<>("hello", 25),
                    new Tuple2<>("you", 28)
            );
    
            JavaPairRDD<String, Integer> words1RDD = sc.parallelizePairs(words1);
            JavaPairRDD<String, Integer> words2RDD = sc.parallelizePairs(words2);
    
    

    上面的 RDD 中,words1RDD 和 words2RDD 中的 key 都有重复的。然后看看看两者分别用 cogroup 和 join 算子的操作结果,先看 cogroup 的:

                    int count = 1;
    
            JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> cogroupRDD = words1RDD.cogroup(words2RDD);
            List<Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>>> cogroupResult = cogroupRDD.collect();
            for (Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> t : cogroupResult){
                String word = t._1;
                Iterable<Integer> word1Counts = t._2._1;
                Iterable<Integer> word2Counts = t._2._2;
    
                String countInfo = "";
                for (Integer c1 : word1Counts) {
                    countInfo += c1 + "(words1RDD),";
                }
    
                for (Integer c2 : word2Counts) {
                    countInfo += c2 + "(words2RDD),";
                }
    
                System.out.println(String.format("第%s个元素为:%s -> %s", count, word, countInfo));
    
                count++;
            }
    

    输出结果为:

    第1个元素为:you -> 9(words1RDD),28(words2RDD),
    第2个元素为:hello -> 3(words1RDD),2(words1RDD),12(words1RDD),21(words2RDD),25(words2RDD),
    第3个元素为:world -> 7(words1RDD),24(words2RDD),
    

    再看 join 的:

    JavaPairRDD<String, Tuple2<Integer, Integer>> joinedRDD = words1RDD.join(words2RDD);
            List<Tuple2<String, Tuple2<Integer, Integer>>> joinedResult = joinedRDD.collect();
            for (Tuple2<String, Tuple2<Integer, Integer>> t : joinedResult) {
                System.out.println(String.format("第%s个元素为:%s -> %s(words1RDD),%s(words2RDD)", count, t._1, t._2._1, t._2._2));
                count++;
            }
    

    输出结果为:

    第1个元素为:you -> 9(words1RDD),28(words2RDD)
    第2个元素为:hello -> 3(words1RDD),21(words2RDD)
    第3个元素为:hello -> 3(words1RDD),25(words2RDD)
    第4个元素为:hello -> 2(words1RDD),21(words2RDD)
    第5个元素为:hello -> 2(words1RDD),25(words2RDD)
    第6个元素为:hello -> 12(words1RDD),21(words2RDD)
    第7个元素为:hello -> 12(words1RDD),25(words2RDD)
    第8个元素为:world -> 7(words1RDD),24(words2RDD)
    

    cogroup 算子计算过程会对相同的 key 做聚合操作,join 则不会。

    相关文章

      网友评论

        本文标题:Spark 的 cogroup 和 join 算子

        本文链接:https://www.haomeiwen.com/subject/trrhcctx.html