美文网首页
JavaRDD 转化为 Dataset的两种方案

JavaRDD 转化为 Dataset的两种方案

作者: Phoebe_Liu | 来源:发表于2019-04-10 17:56 被阅读0次

    JavaRDD 转化为 Dataset<Row>方案一:

    实体类作为schema定义规范,使用反射,实现JavaRDD转化为Dataset<Row>

    Student.java实体类:

    import java.io.Serializable;
    
    @SuppressWarnings("serial")
    public class Student implements Serializable {
        private String sid;
        private String sname;
        private int sage;
    
        public String getSid() {
            return sid;
        }
    
        public void setSid(String sid) {
            this.sid = sid;
        }
    
        public String getSname() {
            return sname;
        }
    
        public void setSname(String sname) {
            this.sname = sname;
        }
    
        public int getSage() {
            return sage;
        }
    
        public void setSage(int sage) {
            this.sage = sage;
        }
    
        @Override
        public String toString() {
            return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
        }
    }
    

    实现一

    SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate();
            final JavaSparkContext ctx = JavaSparkContext.fromSparkContext(spark.sparkContext());
            JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
    
            JavaRDD<Student> rowRDD = source.map(new Function<String, Student>() {
                public Student call(String line) throws Exception {
                    String parts[] = line.split(",");
                    Student stu = new Student();
                    stu.setSid(parts[0]);
                    stu.setSname(parts[1]);
                    stu.setSage(Integer.valueOf(parts[2]));
                    return stu;
                }
            });
    
            Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
            df.select("sid", "sname", "sage").coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");
    

    实现二

    SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate();
            final JavaSparkContext ctx = JavaSparkContext.fromSparkContext(spark.sparkContext());
            JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
    
            JavaRDD<Row> rowRDD = source.map(new Function<String, Row>() {
                public Row call(String line) throws Exception {
                    String[] parts = line.split(",");
                    String sid = parts[0];
                    String sname = parts[1];
                    int sage = Integer.parseInt(parts[2]);
    
                    return RowFactory.create(sid, sname, sage);
                }
            });
    
            ArrayList<StructField> fields = new ArrayList<StructField>();
            StructField field = null;
            field = DataTypes.createStructField("sid", DataTypes.StringType, true);
            fields.add(field);
            field = DataTypes.createStructField("sname", DataTypes.StringType, true);
            fields.add(field);
            field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
            fields.add(field);
    
            StructType schema = DataTypes.createStructType(fields);
    
            Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
            df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");
    

    Java创建ClassTag的方法,我能找到的有三个方法。其中第三种方法,只要需要classtag的地方都可以直接用,参数都不需要。

    1:ClassManifestFactory.classType( String.class )。
    2:ClassTag.MODULE.apply( String.class )。
    3:JavaSparkContext.MODULE.fakeClassTag( )

    1. List 转 Seq:

    List<String> tmpList = new ArrayList<>();
    tmpList.add("abc");
    Seq<String> tmpSeq = JavaConverters.asScalaIteratorConverter(tmpList.iterator()).asScala().toSeq();

    1. Seq 转 List:

    List<String> tmpList = scala.collection.JavaConversions.seqAsJavaList(tmpSeq);

    相关文章

      网友评论

          本文标题:JavaRDD 转化为 Dataset的两种方案

          本文链接:https://www.haomeiwen.com/subject/hbpoiqtx.html