美文网首页
Spark 学习笔记(三)-数据读存-JSON

Spark 学习笔记(三)-数据读存-JSON

作者: vision_zhang | 来源:发表于2018-04-26 16:46 被阅读0次

    JSON是一种半结构化的数据格式,最简单的读取方式是将数据作为文本文件读取,然后使用JSON解析器来对RDD的值进行映射操作。

    • 读取JSON:将数据作为文本文件读取,这种方法在所有的编程语言中都可以使用。方法假设文件中每一行都是一条JSON记录。如果数据跨行了,就需要读取整个文件,然后对文件进行解析。
      可以使用mapPartitions()来重用解析器,这个对每一个分区进行操作。
      Java中使用Jackson进行JSON操作。
      Java中通常将记录读取到一个代表结构的类(与JavaBean同)
    例子数据
    {"name":"上海滩","singer":"叶丽仪","album":"香港电视剧主题歌","path":"mp3/shanghaitan.mp3"}
    {"name":"一生何求","singer":"陈百强","album":"香港电视剧主题歌","path":"mp3/shanghaitan.mp3"}
    {"name":"红日","singer":"李克勤","album":"怀旧专辑","path":"mp3/shanghaitan.mp3"}
    {"name":"爱如潮水","singer":"张信哲","album":"怀旧专辑","path":"mp3/airucaoshun.mp3"}
    {"name":"红茶馆","singer":"陈惠嫻","album":"怀旧专辑","path":"mp3/redteabar.mp3"}
    
    package spark_Function;
    
    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.Iterator;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.codehaus.jackson.map.ObjectMapper;
    
    
    
    
    public class json {
    
        public static void main(String[] args) {
            // TODO 自动生成的方法存根
    
            SparkConf conf = new SparkConf().setMaster("local").setAppName("MyMp3");
            JavaSparkContext jsc = new JavaSparkContext(conf);
                    JavaRDD<String> input = jsc.textFile("G:/sparkRS/JSON.json");
            JavaRDD<Mp3Info> result = input.mapPartitions(new ParseJson());
            result.foreach(x -> System.out.println(x));
    
    
            jsc.close();
    
        }
    
    }
    
    
    
    class ParseJson implements FlatMapFunction<Iterator<String>,Mp3Info>{
        /**
         * 
         */
        private static final long serialVersionUID = 8603650874403773926L;
    
        @Override
        public Iterator<Mp3Info> call(Iterator<String> lines) throws Exception {
            // TODO 自动生成的方法存根
            ArrayList<Mp3Info> mp3 = new ArrayList<Mp3Info>();
            ObjectMapper mapper = new ObjectMapper();
            while(lines.hasNext()){
                    String line = lines.next();
                try{
                    mp3.add(mapper.readValue(line, Mp3Info.class));
                }catch(Exception e){
    
    
                }       
            }
            return mp3.iterator();
        }
    
    }
    
    
    
    
    class Mp3Info implements Serializable{
        private static final long serialVersionUID = -3811808269846588364L;
        private String name;
        private String album;
        private String path;
        private String singer;
    
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getAlbum() {
            return album;
        }
        public void setAlbum(String album) {
            this.album = album;
        }
        public String getPath() {
            return path;
        }
        public void setPath(String path) {
            this.path = path;
        }
        public String getSinger() {
            return singer;
        }
        public void setSinger(String singer) {
            this.singer = singer;
        }
        @Override
        public String toString() {
            return "Mp3Info [name=" + name + ", album=" 
                     + album + ", path=" + path + ", singer=" + singer + "]";
        }
    
    }
    

    ObjectMapper类是Jackson库的主要类,提供方法将java对象与json结构匹配,

    处理格式不正确的记录可能会引起很要中的错误,尤其是像JSON这样的半结构化数据来说。对于大规模的数据集来说格式错误很常见,所以如果选这跳过错误的数据应该使用累加器来跟踪错误。

    • 保存JSON
      写出JSON一般不需要考虑格式错误的数据,并且也知道要写出的数据类型,
      读是将字符串RDD转化为解析好的JSON数据
      写由结构化的数据组成的RDD转为字符串RDD,然后使用文本文件API写出去
    package spark_Function;
    
    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.Iterator;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.codehaus.jackson.map.ObjectMapper;
    
    
    
    
    public class json {
    
        public static void main(String[] args) {
            // TODO 自动生成的方法存根
    
            SparkConf conf = new SparkConf().setMaster("local").setAppName("MyMp3");
            JavaSparkContext jsc = new JavaSparkContext(conf);
    
    
            JavaRDD<String> input = jsc.textFile("G:/sparkRS/JSON.json");
            JavaRDD<Mp3Info> result = input.mapPartitions(new ParseJson()).
                                          filter(
                                              x->x.getAlbum().equals("怀旧专辑")
                                          );
            JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
            result.foreach(x->System.out.println(x));
            formatted.saveAsTextFile("G:/sparkRS/wjson");
    
            jsc.close();
    
        }
    
    }
    
    class WriteJson implements FlatMapFunction<Iterator<Mp3Info>, String> {
        /**
         * 
         */
        private static final long serialVersionUID = -6590868830029412793L;
    
        public Iterator<String> call(Iterator<Mp3Info> song) throws Exception {
            ArrayList<String> text = new ArrayList<String>();
            ObjectMapper mapper = new ObjectMapper();
            while (song.hasNext()) {
                Mp3Info person = song.next();
                text.add(mapper.writeValueAsString(person));
            }
            return text.iterator();
        }
    }
    
    
    
    class Mp3Info implements Serializable{
        private static final long serialVersionUID = -3811808269846588364L;
        private String name;
        private String album;
        private String path;
        private String singer;
    
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getAlbum() {
            return album;
        }
        public void setAlbum(String album) {
            this.album = album;
        }
        public String getPath() {
            return path;
        }
        public void setPath(String path) {
            this.path = path;
        }
        public String getSinger() {
            return singer;
        }
        public void setSinger(String singer) {
            this.singer = singer;
        }
        @Override
        public String toString() {
            return "Mp3Info [name=" + name + ", album=" 
                     + album + ", path=" + path + ", singer=" + singer + "]";
        }
    
    }
    

    相关文章

      网友评论

          本文标题:Spark 学习笔记(三)-数据读存-JSON

          本文链接:https://www.haomeiwen.com/subject/wvyplftx.html