美文网首页大数据Spark大数据,机器学习,人工智能
旧版spark(1.6版本) 将rdd动态转为dataframe

旧版spark(1.6版本) 将rdd动态转为dataframe

作者: 董可伦 | 来源:发表于2018-06-28 13:56 被阅读1次

    转载请务必注明原创地址为:https://dongkelun.com/2018/05/11/rdd2df/

    前言

    旧版本spark不能直接读取csv转为df,没有spark.read.option("header", "true").csv这么简单的方法直接将第一行作为df的列名,只能现将数据读取为rdd,然后通过map和todf方法转为df,如果csv的列数很多的话用如Array((1,2..))即Arrar(元组)创建的话很麻烦,本文解决如何用旧版spark读取多列txt文件转为df

    1、新版

    为了直观明白本文的目的,先看一下新版spark如何实现

    1.1 数据

    data.csv,如图:


    image

    1.2 代码

    新版代码较简单,直接通过spark.read.option("header", "true").csv(data_path)即可实现!

    package com.dkl.leanring.spark.sql
    
    import org.apache.spark.sql.SparkSession
    
    object Txt2Df {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("Txt2Df").master("local").getOrCreate()
        val data_path = "files/data.csv"
        val df = spark.read.option("header", "true").csv(data_path)
        df.show()
      }
    }
    

    1.3 结果

    +----+----+----+----+----+
    |col1|col2|col3|col4|col5|
    +----+----+----+----+----+
    |  11|  12|  13|  14|  15|
    |  21|  22|  23|  24|  25|
    |  31|  32|  33|  34|  35|
    |  41|  42|  43|  44|  45|
    +----+----+----+----+----+
    

    2、旧版

    2.1 数据

    data.txt

    col1,col2,col3,col4,col5
    11,12,13,14,15
    21,22,23,24,25
    31,32,33,34,35
    41,42,43,44,45
    

    其中列数可任意指定

    2.2 代码

    package com.dkl.leanring.spark.sql
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    object Rdd2Df {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Rdd2Df").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        val data_path = "files/data.txt"
        val data = sc.textFile(data_path)
        val arr = data.collect()
        //arr1为除去第一行即列名的数据
        val arr1 = arr.slice(1, arr.length)
        val rdd = sc.parallelize(arr1)
        //列名
        val schema = StructType(arr(0).split(",").map(fieldName => StructField(fieldName, StringType, true)))
        val rowRDD = rdd.map(_.split(",")).map(p => Row(p: _*))
        sqlContext.createDataFrame(rowRDD, schema).show()
    
      }
    }
    

    2.3 结果

    +----+----+----+----+----+
    |col1|col2|col3|col4|col5|
    +----+----+----+----+----+
    |  11|  12|  13|  14|  15|
    |  21|  22|  23|  24|  25|
    |  31|  32|  33|  34|  35|
    |  41|  42|  43|  44|  45|
    +----+----+----+----+----+
    

    根据结果看,符合逾期的效果!

    相关文章

      网友评论

        本文标题:旧版spark(1.6版本) 将rdd动态转为dataframe

        本文链接:https://www.haomeiwen.com/subject/izygyftx.html