美文网首页
SparkCore之文件类数据读取与保存

SparkCore之文件类数据读取与保存

作者: 大数据小同学 | 来源:发表于2020-08-10 08:05 被阅读0次

    Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
    文件格式分为:Text文件、Json文件、Csv文件、Sequence文件以及Object文件;
    文件系统分为:本地文件系统、HDFS、HBASE以及数据库。

    Text文件

    1. 数据读取:textFile(String)
    scala> val hdfsFile = sc.textFile("hdfs://hadoop102:9000/fruit.txt")
    hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at <console>:24
    
    1. 数据保存: saveAsTextFile(String)
    scala> hdfsFile.saveAsTextFile("/fruitOut")
    

    Json文件

    如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。
    注意:使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件。

    1. 导入解析json所需的包
    scala> import scala.util.parsing.json.JSON
    
    1. 上传json文件到HDFS
    [liujh@hadoop102 spark]$ hadoop fs -put ./examples/src/main/resources/people.json /
    
    1. 读取文件
    scala> val json = sc.textFile("/people.json")
    json: org.apache.spark.rdd.RDD[String] = /people.json MapPartitionsRDD[8] at textFile at <console>:24
    
    1. 解析json数据
    scala> val result  = json.map(JSON.parseFull)
    result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[10] at map at <console>:27
    
    1. 打印
    scala> result.collect
    res11: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))
    

    Sequence文件

    SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以调用 sequenceFile keyClass, valueClass
    注意:SequenceFile文件只针对PairRDD

    1. 创建一个RDD
    scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
    rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24
    
    1. 将RDD保存为Sequence文件
    scala> rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")
    
    1. 查看该文件
    [liujh@hadoop102 seqFile]$ pwd
    /opt/module/spark/seqFile
    [liujh@hadoop102 seqFile]$ ll
    总用量 8
    -rw-r--r-- 1 liujh liujh 108 10月  9 10:29 part-00000
    -rw-r--r-- 1 liujh liujh 124 10月  9 10:29 part-00001
    -rw-r--r-- 1 liujh liujh 0 10月  9 10:29 _SUCCESS
    [liujh@hadoop102 seqFile]$ cat part-00000
    SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritableط
    
    1. 读取Sequence文件
    scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile")
    seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at sequenceFile at <console>:24
    
    1. 打印读取后的Sequence文件
    scala> seq.collect
    res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))
    

    对象文件

    对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFilek,v 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出。因为是序列化所以要指定类型。

    1. 创建一个RDD
    scala> val rdd = sc.parallelize(Array(1,2,3,4))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24
    
    1. 将RDD保存为Object文件
    scala> rdd.saveAsObjectFile("file:///opt/module/spark/objectFile")
    
    1. 查看该文件
    [liujh@hadoop102 objectFile]$ pwd
    /opt/module/spark/objectFile
    [liujh@hadoop102 objectFile]$ ll
    总用量 8
    -rw-r--r-- 1 liujh liujh 142 10月  9 10:37 part-00000
    -rw-r--r-- 1 liujh liujh 142 10月  9 10:37 part-00001
    -rw-r--r-- 1 liujh liujh 0 10月  9 10:37 _SUCCESS
    [liujh@hadoop102 objectFile]$ cat part-00000 
    SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableW@`l
    
    1. 读取Object文件
    scala> val objFile = sc.objectFile[Int]("file:///opt/module/spark/objectFile")
    objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at objectFile at <console>:24
    
    1. 打印读取后的Sequence文件
    scala> objFile.collect
    res19: Array[Int] = Array(1, 2, 3, 4)
    
    关注微信公众号
    简书:https://www.jianshu.com/u/0278602aea1d
    CSDN:https://blog.csdn.net/u012387141

    相关文章

      网友评论

          本文标题:SparkCore之文件类数据读取与保存

          本文链接:https://www.haomeiwen.com/subject/sewjyhtx.html