美文网首页
RDD常用算子列表

RDD常用算子列表

作者: 杨赟快跑 | 来源:发表于2019-06-26 15:53 被阅读0次

    1.常用的Transformation算子

    类型 功能描述
    map 将RDD中的每个元素传入自定义函数,获取一个新的元素,然后用新的元素组成新的RDD
    filter 对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除
    flatMap 与map类似,但是对每个元素都可以返回一个或多个新元素
    groupByKey 根据key进行分组,每个key对应一个Iterable<value>
    reduceByKey 对相同的key对应的多个value进行reduce操作
    sortByKey 对每个元素按照key进行排序,默认升序,传入false则降序,还可以传入自定义的比较器
    join 对两个包含<key,value>对的RDD进行join操作,类似于mysql中的表关联,返回<key,(value1,value2)>
    cogroup 同join,但是每个key对应的元祖中是两个Iterable类型,即<key,(Iterable<value1>,Iterable<value2>)>

    2.实战演练

    Java版本

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.VoidFunction;
    import scala.Tuple2;
    import java.util.*;
    
    public class TransformationStudy {
    
        public static void main(String[] args) {
            //map();
            //filter();
            //flatMap();
            //groupByKey();
            //reduceByKey();
            //sortByKey();
            //join();
            cogroup();
        }
    
        private static void map(){
            SparkConf conf = new SparkConf()
                    .setAppName("map")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
            sc.setLogLevel("warn");
            JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
            JavaRDD<Integer> multiNumberRDD = numberRDD.map(new Function<Integer, Integer>() {
                @Override
                public Integer call(Integer integer) throws Exception {
                    return integer * 2;
                }
            });
            multiNumberRDD.foreach(new VoidFunction<Integer>() {
                @Override
                public void call(Integer integer) throws Exception {
                    System.out.println(integer);
                }
            });
            sc.close();
        }
    
    
        private static void filter(){
            SparkConf conf = new SparkConf()
                    .setAppName("filter")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            sc.setLogLevel("warn");
    
            JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
    
            JavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) throws Exception {
                    return (integer & 1) == 0;
                }
            });
    
            evenNumberRDD.foreach(new VoidFunction<Integer>() {
                @Override
                public void call(Integer integer) throws Exception {
                    System.out.println(integer);
                }
            });
    
            sc.close();
        }
    
    
        private static void flatMap(){
            SparkConf conf = new SparkConf()
                    .setAppName("flatMap")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            sc.setLogLevel("warn");
    
            List<String> list = Arrays.asList("hello you", "hello me", "hello world");
    
            JavaRDD<String> lines = sc.parallelize(list);
    
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) throws Exception {
                    String[] split = s.split(" ");
                    List<String> ls = new ArrayList<>(split.length);
                    Collections.addAll(ls, split);
                    return ls.iterator();
                }
            });
            words.foreach(new VoidFunction<String>() {
                @Override
                public void call(String s) throws Exception {
                    System.out.println(s);
                }
            });
    
            sc.close();
        }
    
        private static void groupByKey(){
            SparkConf conf = new SparkConf()
                    .setAppName("groupByKey")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            sc.setLogLevel("warn");
    
            List<Tuple2<String,Integer>> list = Arrays.asList(
                    new Tuple2<>("class1",90),
                    new Tuple2<>("class2",80),
                    new Tuple2<>("class1",75),
                    new Tuple2<>("class2",65)
            );
    
            JavaPairRDD<String,Integer> scoresRDD = sc.parallelizePairs(list);
    
            JavaPairRDD<String,Iterable<Integer>> groupedScoresRDD = scoresRDD.groupByKey();
    
    
            groupedScoresRDD.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
                @Override
                public void call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception {
                    System.out.println("class:" + stringIterableTuple2._1);
                    for (Integer a_2 : stringIterableTuple2._2) {
                        System.out.println(a_2);
                    }
                    System.out.println("================================");
                }
            });
    
            sc.close();
        }
    
    
        private static void reduceByKey(){
            SparkConf conf = new SparkConf()
                    .setAppName("reduceByKey")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            sc.setLogLevel("warn");
    
            List<Tuple2<String,Integer>> list = Arrays.asList(
                    new Tuple2<>("class1",90),
                    new Tuple2<>("class2",80),
                    new Tuple2<>("class1",75),
                    new Tuple2<>("class2",65)
            );
    
            JavaPairRDD<String,Tuple2<Integer,Integer>> scoresRDD = sc.parallelizePairs(list).mapValues(new Function<Integer, Tuple2<Integer,Integer>>() {
                @Override
                public Tuple2<Integer,Integer> call(Integer integer) throws Exception {
                    return new Tuple2<Integer,Integer>(integer,1);
                }
            });
    
            JavaPairRDD<String,Tuple2<Integer,Integer>> reducedScoresRDD = scoresRDD.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
                @Override
                public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> integerIntegerTuple2, Tuple2<Integer, Integer> integerIntegerTuple22) throws Exception {
                    return new Tuple2<>(integerIntegerTuple2._1+integerIntegerTuple22._1 , integerIntegerTuple2._2+integerIntegerTuple2._2);
                }
            });
    
            reducedScoresRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Integer>>>() {
                @Override
                public void call(Tuple2<String, Tuple2<Integer, Integer>> stringTuple2Tuple2) throws Exception {
                    System.out.println(stringTuple2Tuple2._1 + ":" + stringTuple2Tuple2._2._1.doubleValue() / stringTuple2Tuple2._2._2);
                }
            });
    
            sc.close();
        }
    
    
        private static void sortByKey(){
            SparkConf conf = new SparkConf()
                    .setAppName("sortByKey")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            sc.setLogLevel("warn");
    
            List<Tuple2<Integer,String>> list = Arrays.asList(
                    new Tuple2<>(75,"leo"),
                    new Tuple2<>(50,"tom"),
                    new Tuple2<>(100,"marray"),
                    new Tuple2<>(86,"jack")
            );
    
            JavaPairRDD<Integer,String> scoresRDD = sc.parallelizePairs(list);
    
            JavaPairRDD<Integer,String> sortedScoresRDD = scoresRDD.sortByKey(false);
    
            sortedScoresRDD.foreach(x->{
                System.out.println(x._2+": "+x._1);
            });
    
            sc.close();
        }
    
        private static void join(){
            SparkConf conf = new SparkConf()
                    .setAppName("join")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            sc.setLogLevel("warn");
    
            List<Tuple2<Integer,String>> students = Arrays.asList(
                    new Tuple2<>(1,"leo"),
                    new Tuple2<>(2,"tom"),
                    new Tuple2<>(3,"marray"),
                    new Tuple2<>(4,"jack")
            );
    
            List<Tuple2<Integer,Integer>> scores = Arrays.asList(
                    new Tuple2<>(1,75),
                    new Tuple2<>(2,50),
                    new Tuple2<>(3,100),
                    new Tuple2<>(4,86)
            );
    
            JavaPairRDD<Integer,String> studentsRDD = sc.parallelizePairs(students);
            JavaPairRDD<Integer,Integer> scoresRDD = sc.parallelizePairs(scores);
    
            JavaPairRDD<Integer,Tuple2<String,Integer>> joinedRDD = studentsRDD.join(scoresRDD).sortByKey();
    
            joinedRDD.foreach(x->{
                System.out.println("students id : " + x._1);
                System.out.println("students name : " + x._2._1);
                System.out.println("students score : " + x._2._2);
                System.out.println("==================================");
            });
    
            sc.close();
        }
    
    
        private static void cogroup(){
            SparkConf conf = new SparkConf()
                    .setAppName("cogroup")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            sc.setLogLevel("warn");
    
            List<Tuple2<Integer,String>> students = Arrays.asList(
                    new Tuple2<>(1,"leo1"),
                    new Tuple2<>(2,"tom1"),
                    new Tuple2<>(3,"marray1"),
                    new Tuple2<>(4,"jack1"),
                    new Tuple2<>(1,"leo2"),
                    new Tuple2<>(2,"tom2"),
                    new Tuple2<>(3,"marray2"),
                    new Tuple2<>(4,"jack2")
    
            );
    
    
            List<Tuple2<Integer,Integer>> scores = Arrays.asList(
                    new Tuple2<>(1,75),
                    new Tuple2<>(2,50),
                    new Tuple2<>(3,100),
                    new Tuple2<>(4,86),
                    new Tuple2<>(1,67),
                    new Tuple2<>(2,61),
                    new Tuple2<>(3,98),
                    new Tuple2<>(4,78)
            );
    
            JavaPairRDD<Integer,String> studentsRDD = sc.parallelizePairs(students);
            JavaPairRDD<Integer,Integer> scoresRDD = sc.parallelizePairs(scores);
    
            JavaPairRDD<Integer,Tuple2<Iterable<String>,Iterable<Integer>>> cogroupRDD = studentsRDD.cogroup(scoresRDD).sortByKey();
    
            cogroupRDD.foreach(x->{
                System.out.println("students id : " + x._1);
                System.out.println("students name : " + x._2._1);
                System.out.println("students score : " + x._2._2);
                System.out.println("==================================");
            });
    
            sc.close();
        }
    }
    

    Scala版本

    import org.apache.spark.{SparkConf, SparkContext}
    
    object TransformationStudy {
    
      def main(args: Array[String]): Unit = {
        cogroup()
      }
    
      private def map(): Unit ={
        val conf = new SparkConf().setAppName("map").setMaster("local")
        val sc = new SparkContext(conf)
        sc.setLogLevel("warn")
        val list = Array(1,2,3,4,5,6)
        val mapRDD = sc.parallelize(list,1)
        mapRDD.map(x=>2*x).foreach(x=>println(x))
        sc.stop()
      }
    
      private def filter(): Unit ={
        val conf = new SparkConf().setAppName("filter").setMaster("local")
        val sc = new SparkContext(conf)
        sc.setLogLevel("warn")
        val list = Array(1,2,3,4,5,6,7,8,9,10)
        val listRDD = sc.parallelize(list,1)
        list.filter(x => (x&1) == 0).foreach(x=>println(x))
        sc.stop()
      }
    
      private def flatMap(): Unit ={
        val conf = new SparkConf().setAppName("flatMap").setMaster("local")
        val sc = new SparkContext(conf)
        sc.setLogLevel("warn")
        val list = Array("hello me", "hello you", "hello world")
        val listRDD = sc.parallelize(list,1)
        listRDD.flatMap(x=>x.split(" ")).foreach(x=>println(x))
        sc.stop()
      }
    
      private def groupByKey(): Unit ={
        val conf = new SparkConf().setAppName("groupByKey").setMaster("local")
        val sc = new SparkContext(conf)
        sc.setLogLevel("warn")
        val list = Array(("class1", 80),("class2",76),("class1",90),("class2",93))
        val listRDD = sc.parallelize(list,1)
        listRDD.groupByKey().foreach(x=>{
          println(x)
        })
        sc.stop()
      }
    
      private def reduceByKey(): Unit ={
        val conf = new SparkConf().setAppName("reduceByKey").setMaster("local")
        val sc = new SparkContext(conf)
        sc.setLogLevel("warn")
        val list = Array(("class1", 80),("class2",76),("class1",90),("class2",93))
        val listRDD = sc.parallelize(list,1)
        val mapedListRDD = listRDD.map(x=>(x._1,(x._2,1)))
        mapedListRDD.reduceByKey((x,y)=>(x._1+y._1, x._2+y._2)).map(x=>(x._1, x._2._1.toDouble / x._2._2)).foreach(println)
      }
    
      private def sortByKey(): Unit ={
        val conf = new SparkConf().setAppName("sortByKey").setMaster("local")
        val sc = new SparkContext(conf)
        sc.setLogLevel("warn")
        val list = Array((75, "leo"), (50, "tom"), (100, "marry"), (86, "jack"))
        val listRDD = sc.parallelize(list,1)
        listRDD.sortByKey(ascending = false).foreach(println)
      }
    
      private def join(): Unit ={
        val conf = new SparkConf().setAppName("join").setMaster("local")
        val sc = new SparkContext(conf)
        sc.setLogLevel("warn")
        val students = Array((1, "leo"), (2, "tom"), (3, "marry"), (4, "jack"))
        val scores = Array((1, 75), (2, 50), (3, 100), (4, 86))
        val studentsRDD = sc.parallelize(students,1)
        val scoresRDD = sc.parallelize(scores,1)
        studentsRDD.join(scoresRDD).sortByKey().foreach(println)
      }
    
      private def cogroup(): Unit ={
        val conf = new SparkConf().setAppName("cogroup").setMaster("local")
        val sc = new SparkContext(conf)
        sc.setLogLevel("warn")
        val students = Array((1, "leo"), (2, "tom"), (3, "marry"), (4, "jack"))
        val scores = Array((1, 75), (2, 50), (3, 100), (4, 86),(1, 33), (2, 45), (3, 99), (4, 67))
        val studentsRDD = sc.parallelize(students,1)
        val scoresRDD = sc.parallelize(scores,1)
        studentsRDD.cogroup(scoresRDD).sortByKey().foreach(println)
      }
    }
    

    3.常用的Action算子

    类型 功能描述
    reduce 将RDD中的所有元素进行聚合操作,第一个和第二个元素聚合,值与第三个元素聚合,值与第四给元素聚合,以此类推
    collect 将RDD中所有元素获取到本地客户端,若数据量特别大,可能会造成网络拥堵
    count 获取RDD元素的总个数
    take(n) 获取RDD中前n给元素
    saveAsTextFile 将RDD元素保存到文件中,对每个元素调用toString方法,saveAsTextFile按照执行task的多少生成多少个文件,如果只想生成一个文件,则在RDD上调用coalesce(1,true).saveAsTextFile(),此时,Spark只起一个task来执行保存的动作,也就只有一个文件产生了,又或者,可以调用repartition(1),它其实是coalesce的一个包装,默认第二个参数为true
    countByKey 对每个key对应的值进行count计数
    foreach 遍历RDD中的每个元素,与collect不同,foreach是在worker节点上对数据进行遍历操作,与collect把数据传回客户端进行遍历操作相比,性能更好

    4.实战演练

    java版本

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;
    import scala.Tuple2;
    
    import java.util.*;
    
    public class ActionStudy {
    
        public static void main(String[] args) {
            //reduce();
            //collect();
            //count();
            //take();
            //saveAsTextFile();
            countByKey();
        }
    
        private static void reduce(){
            SparkConf conf = new SparkConf()
                    .setAppName("reduce")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            sc.setLogLevel("warn");
    
            JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10),1);
    
            Integer sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            });
    
            System.out.println(sum);
    
            sc.close();
        }
    
    
        private static void collect(){
            SparkConf conf = new SparkConf()
                    .setAppName("collect")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            sc.setLogLevel("warn");
    
            JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
    
            List<Integer> numbers = numberRDD.collect();
    
            for (Integer e : numbers){
                System.out.println(e);
            }
    
            sc.close();
        }
    
    
        private static void count(){
            SparkConf conf = new SparkConf()
                    .setAppName("count")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            sc.setLogLevel("warn");
    
            JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
    
            long count = numberRDD.count();
    
            System.out.println(count);
    
            sc.close();
        }
    
    
        private static void take(){
            SparkConf conf = new SparkConf()
                    .setAppName("take")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            sc.setLogLevel("warn");
    
            JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
    
            List<Integer> numbers = numberRDD.take(3);
    
            for (Integer e : numbers){
                System.out.println(e);
            }
    
            sc.close();
        }
    
    
    
    
        private static void saveAsTextFile(){
            SparkConf conf = new SparkConf()
                    .setAppName("saveAsTextFile")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            sc.setLogLevel("warn");
    
            JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10),3);
    
            numberRDD.saveAsTextFile("./output");
    
            sc.close();
        }
    
    
        private static void countByKey(){
            SparkConf conf = new SparkConf()
                    .setAppName("countByKey")
                    .setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            sc.setLogLevel("warn");
    
            List<Tuple2<Integer,String>> list = Arrays.asList(
                    new Tuple2<>(75,"leo"),
                    new Tuple2<>(50,"tom"),
                    new Tuple2<>(100,"marray"),
                    new Tuple2<>(86,"jack"),
                    new Tuple2<>(75,"leo"),
                    new Tuple2<>(50,"tom"),
                    new Tuple2<>(100,"marray"),
                    new Tuple2<>(86,"jack")
            );
    
            JavaPairRDD<Integer,String> scoresRDD = sc.parallelizePairs(list);
    
            Map<Integer,Long> countMap = scoresRDD.countByKey();
    
            System.out.println(countMap);
    
            sc.close();
        }
    }
    

    相关文章

      网友评论

          本文标题:RDD常用算子列表

          本文链接:https://www.haomeiwen.com/subject/cevccctx.html