美文网首页
数据读取与保存

数据读取与保存

作者: 万事万物 | 来源:发表于2021-07-17 09:31 被阅读0次

    摘要

    Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

    文件格式分为:

    • Text文件
    • Json文件
    • Csv文件
    • Sequence文件
    • Object文件;

    文件系统分为:

    • 本地文件系统
    • HDFS以及数据库。

    文件类数据读取与保存

    Text文件
    基本语法:

    • 数据读取:textFile(String)
    • 数据保存:saveAsTextFile(String)

    案例演示:经典的worldCount程序,并将程序计算结果写入到本地文件中

      @Test
      def textTest(): Unit ={
        // 创建sc
        val conf =new SparkConf().setMaster("local[4]").setAppName("test")
        val sc=new SparkContext(conf)
    
        // 读取文件
        val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)
    
        // 数据扁平化,
        val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))
    
        // 映射
        val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))
    
        // 计算单词个数
        val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1 + v2)
    
    
        //将数据写入目录中,该目录不能存在
        rdd4.saveAsTextFile("file:///C:/Users/123456/Desktop/worldCount_0001")
    
        // 关闭资源;养成良好编码习惯
        sc.stop()
    
      }
    

    worldCount_0001 是一个目录,并且不能存在

    程序结果 就像跑了一个MR,将数据按照分区存入不同的目录中。

    Sequence文件
    SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。在SparkContext中,可以调用sequenceFile[keyClass, valueClass](path)。

    案例演示:
    保存Sequence文件

     @Test
      def sequenceWriteTest(): Unit ={
        // 创建sc
        val conf =new SparkConf().setMaster("local[4]").setAppName("test")
        val sc=new SparkContext(conf)
    
        // 读取文件
        val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)
    
        // 数据扁平化,
        val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))
    
        // 映射
        val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))
    
        // 计算单词个数
        val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1 + v2)
    
    
        //将数据写入目录中,该目录不能存在
        rdd4.saveAsSequenceFile("file:///C:/Users/123456/Desktop/worldCount_0003")
    
        // 关闭资源;养成良好编码习惯
        sc.stop()
    
      }
    

    读取Sequence文件

      @Test
      def sequenceReadTest(): Unit ={
        // 创建sc
        val conf =new SparkConf().setMaster("local[4]").setAppName("test")
        val sc=new SparkContext(conf)
    
        // 读取文件
        val rdd1: RDD[(String, Int)] =sc.sequenceFile[String,Int](path = "file:///C:/Users/123456/Desktop/worldCount_0003",minPartitions = 4)
        //打印
        rdd1.foreach(e=>{
          println(e)
        })
    
        // 关闭资源;养成良好编码习惯
        sc.stop()
      }
    
    

    打印结果

    (python,1)
    (shell,4)
    (wahaha,1)
    (java,5)
    (hello,2)
    

    注意:
    sc.sequenceFile[String,Int] 需要指定返回参数类型 。

    Object对象文件
    对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v](path)函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。

    案例演示
    将数据保存成Object文件

      @Test
      def ObjectWriteTest(): Unit ={
        // 创建sc
        val conf =new SparkConf().setMaster("local[4]").setAppName("test")
        val sc=new SparkContext(conf)
    
        // 读取文件
        val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)
    
        // 数据扁平化,
        val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))
    
        // 映射
        val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))
    
        // 计算单词个数
        val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1 + v2)
    
    
        //将数据写入目录中,该目录不能存在
        rdd4.saveAsObjectFile("file:///C:/Users/123456/Desktop/worldCount_0002")
    
        // 关闭资源;养成良好编码习惯
        sc.stop()
    
      }
    

    读取 Object 文件

      @Test
      def ObjectReadTest(): Unit ={
        // 创建sc
        val conf =new SparkConf().setMaster("local[4]").setAppName("test")
        val sc=new SparkContext(conf)
    
        // 读取对象文件
        // sc.objectFile[(String,Int)] 需要指定数据类型,写入进去的是一个元组,读取的时候应该也元组的形式返回
        val rdd1=sc.objectFile[(String,Int)](path = "file:///C:/Users/123456/Desktop/worldCount_0002",minPartitions = 4)
    
        //打印
        rdd1.foreach(e=>{
          println(e)
        })
    
        // 关闭资源;养成良好编码习惯
        sc.stop()
      }
    

    结果

    (python,1)
    (wahaha,1)
    (shell,4)
    (hello,2)
    (java,5)
    

    注意:
    sc.objectFile[(String,Int)] 必须指定数据类型

    文件系统类数据读取与保存
    Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口。如TextInputFormat,新旧两个版本所引用分别是org.apache.hadoop.mapred.InputFormat、org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)

    相关文章

      网友评论

          本文标题:数据读取与保存

          本文链接:https://www.haomeiwen.com/subject/tgymultx.html