Spark(RDD,CSV)创建DataFrame方式

作者: Tim在路上 | 来源:发表于2019-03-11 09:33 被阅读1次

    spark将RDD转换为DataFrame

    1. 方法一(不推荐)

    spark将csv转换为DataFrame,可以先文件读取为RDD,然后再进行map操作,对每一行进行分割。
    再将schema和rdd分割后的Rows回填,sparkSession创建的dataFrame

     val spark = SparkSession
          .builder()
          .appName("sparkdf")
          .master("local[1]")
          .getOrCreate()
    
           //设置spark的上下文sparkContext
          val sc = spark.sparkContext
          val fileRDD = sc.textFile("/home/hadoop/Downloads/filesmall2.csv")
          //val rdd = fileRDD.filter(line => line.split("\t").length != 30)
          val df = spark.createDataFrame(fileRDD.map(line=>HttpSchema.parseLog(line)),HttpSchema.struct)
          df.show(3)
    

    这里的RDD是通过读取文件创建的所以也可以看做是将RDD转换为DataFrame

    object HttpSchema {
    
      def parseLog(x:String): Row = {
    
        var fields = x.split("\t")
    
        val _id = fields(0)
        val srcIp = fields(1)
        val srcPort = fields(2)
        //这种方法比较麻烦的地方是row里面的字段名要和struct中的字段对应上
        RowFactory.create(_id,srcIp,srcPort)
      }
    
      //设置schema描述
      val struct = StructType(
        Array( StructField("_id",StringType),
          StructField("srcIp",StringType),
          StructField("srcPort",StringType),
        )
      )
    }
    

    这也是这种方法不推荐使用的地方,因为返回的Row中的字段名要与schema中的字段名要一致,当字段多于22个这个需要集成一个

    1. 方法二
      //使用隐式转换的方式来进行转换
    val spark = SparkSession
          .builder()
          .appName("sparkdf")
          .master("local[1]")
          .getOrCreate()
          
          //使用隐式转换必须导入这个才可以使用只有import spark.implicits._之后,RDD才有toDF、toDS功能
    
    
          import spark.implicits._
          
           //设置spark的上下文sparkContext
          val sc = spark.sparkContext
    
          val fileRDD = sc.textFile("/home/hadoop/Downloads/filesmall2.csv")
    
          case class HttpClass(id:String,srcIp:String,srcPort:Int)
    
          val df = fileRDD.map(_.split("\t")).map(line=>HttpClass(line(0),line(1),line(2).toInt)).toDF()
    

    当然也可以不创建类对象

    rdd.map{x=>val par=x.split(",");(par(0),par(1).toInt)}.toDF("name","age")
    

    dataFrame转换为RDD只需要将collect就好,df.collect RDD[row]类型,就可以按row取出

    spark读取csv转化为DataFrame

    1. 方法一
     val conf = new SparkConf().setAppName("word count").setMaster("local[1]")
    
        val sc = new SparkContext(conf)
        println("spark version: " + sc.version)
    
        val spark = new SQLContext(sc)
    
        import spark.implicits._
    
        val df = spark.read.format("com.databricks.spark.csv")
          .option("header", "false")
          .option("inferSchema", "false") //是否自动推到内容的类型
          .option("delimiter",",")  //分隔符,默认为 ,
          .load("/home/hadoop/Downloads/Salary_Data.csv")
        df.show()
        
        
        //进行写数据
        data.repartition(1).write.format("com.databricks.spark.csv")
          .option("header", "false")//在csv第一行有属性"true",没有就是"false"
          .option("delimiter",",")//默认以","分割
          .save(outpath)
         sparkContext.stop()
    

    sparkContext.sql()操作完成后直接返回的是DataFrame
    当然可以间接采用将csv直接转换为RDD然后再将RDD转换为DataFrame

    1. 方法二
    // 读取数据并分割每个样本点的属性值 形成一个Array[String]类型的RDD 
    val rdd = sc.textFile("file:///home/xuqm/ML_Data/input/synthetic_control.data").map(_.split("\\s+")) 
    // 将rdd转换成LabeledPoint类型的RDD 
    val LabeledPointRdd = rdd.map(x=>LabeledPoint(0,Vectors.dense(x.map(_.toDouble))))
    // 转成DataFrame并只取"features"列 
    val data = spark.createDataFrame(LabeledPointRdd).select("features")
    

    相关文章

      网友评论

        本文标题:Spark(RDD,CSV)创建DataFrame方式

        本文链接:https://www.haomeiwen.com/subject/xdqvpqtx.html