美文网首页我爱编程
【译】MLXTEND之StackingCVRegressor

【译】MLXTEND之StackingCVRegressor

作者: wong小尧 | 来源:发表于2018-04-02 14:17 被阅读0次

    www.DataCamp.com 中有很多数据科学家的cheat sheet,可以放在手边参考,大部分情况就够用了,以下是个人整理的一些详细的例子。

    spark中通常使用rdd,但是这样代码可读性差,目前rdd的很多方法已经不再更新了。dataframe大部分使用Spark SQL操作,速度会比rdd的方法更快,dataset是dataframe的子集,大部分api是互通的,目前主流是在使用Spark SQL。

    Spark SQL概述

    1. SQL只是Spark SQL的一个功能而已
    2. 可以访问hive、json、parquet等文件的数据
    3. Spark SQL 提供了SQL、Dataframe和Dataset的API

    DataFrames具有如下特点:

    • Ability to scale from kilobytes of data on a single laptop to petabytes on a large cluster(支持单机KB级到集群PB级的数据处理)
    • Support for a wide array of data formats and storage systems(支持多种数据格式和存储系统,如图所示)
    • State-of-the-art optimization and code generation through the Spark SQL Catalyst optimizer(通过Spark SQL Catalyst优化器可以进行高效的代码生成和优化)
    • Seamless integration with all big data tooling and infrastructure via Spark(能够无缝集成所有的大数据处理工具)
    • APIs for Python, Java, Scala, and R (in development via SparkR)

    生成一个DF进行测试

     import sparkSession.implicits._
     var test_df = Seq((1,Array("1.0")),(2,Array("2.0")),(3,Array("3.0"))).toDF("imei","feature")    //可以直接将schema写在toDF里面。
    

    rdd转化为DF

    在Spark SQL中SQLContext是创建DataFrames和执行SQL的入口,在spark-1.5.2中已经内置了一个sqlContext
    1.在本地创建一个文件,有三列,分别是id、name、age,用空格分隔,然后上传到hdfs上
    hdfs dfs -put person.txt /
    2.在spark shell执行下面命令,读取数据,将每一行的数据使用列分隔符分割
    val lineRDD = sc.textFile("hdfs://node1:9000/person.txt").map(_.split(" "))
    3.定义case class(相当于表的schema)
    case class Person(id:Int, name:String, age:Int)
    4.将RDD和case class关联
    val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
    5.将RDD转换成DataFrame
    val personDF = personRDD.toDF
    6.对DataFrame进行处理
    personDF.show
    

    1.选出若干列(字段)

    1.1 选出一列(字段)

    imei_features_df.col("id")
    org.apache.spark.sql.Column
    (col返回为Column,select 返回为dataframe,df(col_name)为Column类型)

    imei_features_df.select((imei_features_df.col("value") + 10).as("value2")).show//修改了把一列的值加十,再修改列名
    org.apache.spark.sql.DataFrame

    imei_features_df.select("value").show

    imei_features_df.selectExpr("value") 选出名字为value的一列。
    查看两列(两个字段)
    import org.apache.spark.sql.functions
    imei_features_df.select(functions.col("name"),functions.col("value")).show
    df.select(df("name"),df("value")+1).show()
    df.select("name","value").show() select操作,典型的弱类型,untyped操作
    不能写成 df.select("name","value"+1).show()这样。
    df.select($"name", $"value" + 1).show() // 使用表达式,scala的语法,要用$符号作为前缀
    另外selectExpr:也可以对指定字段进行特殊处理:
    df.selectExpr("id", "col2 as time", "round(col3)").show(false)

    用apply:获取指定字段(只能一个,返回为Column)

         val id1 = df.apply("id")
         val id2 = df("id")
    

    1.2 选出两列,把其中一列加1

    imei_features_df.select(col("name"),col("value") + 1).show

    imei_features_df.select(imei_features_df("name"),imei_features_df("value") + 1).show

    1.3 获取指定字段统计信息

    stat方法可以用于计算指定字段或指定字段之间的统计信息,比如方差,协方差等。这个方法返回一个DataFramesStatFunctions类型对象。
    df.stat.freqItems(Seq ("c1","c2")).show()
      下面代码演示根据c4字段,统计c1字段值出现频率在30%以上的内容。在df中字段c1的内容为"a, b, a, c, d, b"。其中a和b出现的频率为2 / 6,大于0.3,(注意:这个方法算出来的结果经常出错)
    df.stat.freqItems(Seq ("c1") , 0.3).show()

    2.选取若干行记录

    2.1 first, head, take, takeAsList:获取若干行记录

    (1)first获取第一行
    (2)head获取第一行,head(n: Int)获取前n行记录
    (3)take(n: Int)获取前n行数据
    (4)takeAsList(n: Int)获取前n行数据,并以List的形式展现
      以Row或者Array[Row]的形式返回一行或多行数据。first和head功能相同。
      take和takeAsList方法会将获得到的数据返回到Driver端,所以,使用这两个方法时需要注意数据量,以免Driver发生OutOfMemoryError

    2.2 limit

    limit方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和take与head不同的是,limit方法不是Action操作。
    df.limit(3).show

    3.删除操作

    3.1.删除指定字段(列),保留其他字段

    df.drop("id")
    df.drop(df("id"))
    

    3.2.取出前n行记录,得到一个新的dataframe。take和head是Action操作,limit则不是。

    df.limit(n)

    3.3.删除空值

    df.na.drop() 删除带有空值的行

    4.排序

    生成数据

    import org.apache.spark.sql.SparkSession
    
    val spark = SparkSession.builder().appName("DfSortDesc").master("local").getOrCreate()
    val data = Array((7, 2, 3), (1, 8, 6), (4, 5, 9))
    val df = spark.createDataFrame(data).toDF("col1", "col2", "col3")
    

    4.1 orderBy和sort: 按指定字段排序,默认为升序,用法相同

    //升序排列,只对数字类型和日期类型生效
    df.orderBy("col2").show()
    //降序排列,只对数字类型和日期类型生效
    df.orderBy( - df("col2")).show(false)//加"-"表示降序排列
    df.orderBy(df("col2").desc).show
    df.orderBy(desc("col2")).show
    df.orderBy(-col("col2")).show
    df.orderBy(col("col2").desc).show
    //sort降序,sort用法和orderBy相同,可以直接替换
    df.sort(desc("col2")).show
    //一列升序,一列降序,同时调整两列
    df.sort(desc("col2"),asc("col1")).show
    

    4.2 sortWithinPartitions

    和sort类似,但是是使用Partition来对其他字段排序

    5. 去重

    (1)distinct:返回一个不包含重复记录的Dataframe(整体去重),结果和dropDuplicates不传入指定字段的结果相同。
    返回当前不重复的row(行)记录df.distinct()
    (2)dropDuplicates:根据指定字段去重
    根据指定字段去重。
    df.dropDuplicates(Seq("col_name"))

    6.聚合(groupBy&agg)

    6.1.group by

    6.1.1 groupBy: 根据字段进行group by操作

    groupBy方法有两种调用方式,可以传入String类型的字段名,也可传入Column类型的对象:

    df.groupBy("col1")
    df.groupBy(df("col"))
    
    //分组计数(各个年龄的人出现了多少次)
    df.groupBy("age").count().show()
    

    6.1.2 cube和rollup:group by的扩展

    功能和SQL中的group by cube/rollup类似

    6.1.3 GroupedData对象

    该方法得到的是GroupedData类型对象,在GroupedData的API中提供了group by之后的操作,比如,
    max(colNames: String)方法,数字类型字段最大值
    min(colNames: String
    )方法,数字类型字段的最小值
    mean(colNames: String)方法,数字类型字段的平均值
    sum(colNames: String
    )方法,数字类型字段的和
    count()方法,分组中元素个数
    agg方法,对指定字段进行聚合

    6.2 agg (aggregate)

    聚合操作调用的是agg方法,该方法有多种调用方式。通常和groupBy方法配合使用。
    //输入col,输出dataframe,对id字段求最大,对col2求和。
    agg(expers:column*)
    df.agg("id" -> "max", "col2" -> "sum")
    df.agg(max("age"),avg("salary"))
    df.groupBy().agg(max("age"), avg("salary"))

    agg(exprs: Map[String, String]) 返回dataframe类型 ,同数学计算求值 map类型的
    df.agg(Map("age" -> "max", "salary" -> "avg"))
    df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
    agg(aggExpr: (String, String), aggExprs: (String, String)*) 返回dataframe类型 ,同数学计算求值
    df.agg(Map("age" -> "max", "salary" -> "avg"))
    df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))

    6.3 给一个日常使用的例子

    假设df是一个关于ctr预估的矩阵,存在物料的各个特征字段和点击字段,点击字段值为[0,1],代表点与不点。
    随便生成个数据,再转成dataframe。
    val data = Array(("1", "2", "3", "4", 0), ("1", "7", "8", "9", 1),("2", "7", "8", "9", 1),("2", "7", "8", "9", 0),("1", "7", "8", "9", 1),("3", "7", "8", "9", 1))

    df.groupBy("feature_col1").agg( mean("click"), count("click"))
    
    这个表就能看出点击和特征(feature_col1)的关系,可以判断出此特征的重要性。
    这个特征为3的时候,点击都是1。
    特征为2的时候,点击平均值是0.67
    特征为1的时候,点击平均值是0.5
    如果count(click)的样本足够多的情况下,以上平均值就能反映点击和这个特征的关系。
    即:此特征为3和2的时候点击的概率比较大,为1的时候跟点击似乎无关。
    +----+------------------+-----------+
    |feature_col1|         avg(click)|count(click)|
    +----+------------------+-----------+
    |   3|               1.0|          1|
    |   1|0.6666666666666666|          3|
    |   2|               0.5|          2|
    +----+------------------+-----------+
    

    7. 数据的分割、拼接(上下合并)和采样,join&union&randomSplit&sample


    (1)笛卡尔积
    DF1.crossJoin(DF2)
    (2)一个字段形式
    需要两个DataFrame中有相同的一个列名,默认是"inner"
    DF1.join(DF2, "id")
    (3)多个字段形式 ,多个字段时最好指定join的类型,有:“full”,”outer”,”full_outer”,”fullouter”代表全连接,“left”,”left_outer”或者”leftouter”代表左连接,“right”,”right_outer”及“rightouter”代表右连接,还有" leftsemi(左半连接)"等等。
    val df = df1.join(df2, Seq("key1","key2")) // 基于两个公共字段key1和key的等值连接
    DF1.join(DF2, Seq("id", "name"),"inner") // 多列
    val df = df1.join(df2, df1("key1") === df2("key1"), "outer")
    DF1.join(DF2, DF1("app_id") === DF2("item_id"), "left")

    (4)当两个字段名字不一样时候
    DF1.join(DF2 , DF1("id" ) === DF2( "ID"),"inner")

    (5)
    如果出现类似这样的报错:Reference 'ID' is ambiguous
    说明表里有两个名字为“ID”列,这个ID列就是join的连接字段,操作的时候会出现歧义。
    即使连接字段名称不相同,比如一个"id"一个"ID",表中两个字段为id和ID
    仍然会报上面的错误
    DF1.join(DF2 , DF1("id" ) === DF2( "ID"),"inner")这样也会报错
    正确做法是合并两个代表ID的字段为一个,改写成
    df2.join(df, Seq("ID"),"left")

    8.1 union

    DF1.union(DF2)
    DF1.unionAll(DF2) //unionAll以及弃用,新版本一般使用union

    8.2 randomSplit

    例如:
    val Array(train, test) = DF.randomSplit(Array(0.8,0.2))

    8 交叉:

    获取两个dataframe共有的纪录
    df.intersect(df.limit(1)).show
    获取一个dataframe中有另一个daraframe中没有的纪录
    df.except(df.limit(1)).show

    9.生成一个dataframe&&重命名Dataframe中指定字段名

    import sqlContext.implicits._
    val df = Seq(
      (12, "First Value", java.sql.Date.valueOf("2010-01-01")),
      (234, "Second Value", java.sql.Date.valueOf("2010-02-01")),
    (224, "Second Value", java.sql.Date.valueOf("2010-02-01"))
    ).toDF("int_column", "string_column", "date_column")
    

    指定字段名不存在则不进行操作,id字段名改为ID
    df.withColumnRenamed("id","ID")

    10. 读取数据用“|”切分,转成rdd,再把rdd(Array(s0,s1,s2))转化为dataframe

    例如: 123123|闪电球|label_2

      case class firstlevel(id: String, name: String, result: String)
    ## “|”需要两个转义字符
      val textFile_DF = sc.textFile("/user/result")
              .map(v => v.toString.split("\\|"))
              .map{case Array(s0, s1, s2) => firstlevel(s0, s1, s2)}.toDF
    

    11. spark sql和DataFrame中使用UDF(User Defined Function),用户自定义函数

    11.1.测试数据

    构造测试数据-(name,age)
    val userData = Array(("Leo",16),("Marry",21),("Jack",14),("Tom",18))

    创建测试df

    val user_df = spark.createDataFrame(userData).toDF("name","age")
    user_df.show(false)
    

    注册user表
    user_df.createOrReplaceTempView("user")

    11.2.Spark.sql中的用法(hive 中的udf,Spark Sql的dataframe中无效)

    11.2.1 通过匿名函数注册UDF
    下面的UDF的功能是计算某列的长度,该列的类型为String
    11.2.1.1 注册
    spark.udf.register("strLen",(str:String) => str.length())

    11.2.2 通过实名函数注册UDF
    实名函数注册有点不同,要在后面加 " _"(空格+下划线)
    定义一个实名函数

     def isAdult(age: Int) = {
      if (age < 18) {
        false
      } else {
        true
      }
    }
    

    11.2.2.1 注册
    spark.udf.register("isAdult", isAdult _)
    11.2.2.2 使用
    spark.sql("select name,strLen(name) as name_len from user").show

    11.3.DataFrame中的udf用法
    DataFrame的udf方法虽然和Spark.sql的名字一样,但是属于不同的类,它在org.apache.spark.sql.functions里
    11.3.1 注册

    import org.apache.spark.sql.functions.udf
    //注册自定义函数(通过匿名函数)
    val strLen = udf((str: String) => str.length())
    //注册自定义函数(通过实名函数)
    val udf_isAdult = udf(isAdult _)
    

    11.3.2 使用
    可通过withColumn和select使用
    给user表添加两列。

    //通过withColumn添加列
    user_df.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show
    //通过select添加列
    user_df.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show
    

    12.添加一列

    添加一列有很多方法:

    12.1. withColumn

    通常使用withColumn,他会先判断DataFrame里有没有这个列名,如果有的话就会替换掉原来的列,没有的话就用调用select方法增加一列,所以如果我们的需求是增加一列的话,但这个函数只能对原有列做处理生成新列。两者实现的功能一样,且最终都是调用select方法,但是withColumn会提前做一些判断处理,所以withColumn的性能不如select好。
    df.withColumn("Column Name", value)
    df.withColumn("feature", set_feature(imei_label_feature_df("feature")))

    12.2 使用udf

    可以用udf写自定义函数新增列

    import org.apache.spark.sql.functions.udf
    // 新建一个dataFrame
    val tempDataFrame = spark.createDataFrame(Seq(
      ("a", "asf"),
      ("b", "2143"),
      ("c", "rfds")
    )).toDF("id", "content")
    // 自定义udf的函数
    val code = (arg: String) => {
          if (arg.getClass.getName == "java.lang.String") 1 else 0
        }
    
    val addCol = udf(code)
    // 增加一列
    val addColDataframe = tempDataFrame.withColumn("addcol", addCol(tempDataFrame("id")))
    addColDataframe.show(10, false)
    

    12.3 添加常量列时,需要使用lit

    例如:添加所有行的值均为0的一列
    import org.apache.spark.sql.functions.{col, concat_ws, udf,lit}
    df.withColumn("Column Name", lit(0))
    例如:添加所有行的值均为字符串"imei"的一列
    df.withColumn("Column Name", lit("imei"))

    Spark 2.2引入了typedLit来支持Seq、list等等

    import org.apache.spark.sql.functions.typedLit
    
    df.withColumn("some_array", typedLit(Seq(1, 2, 3)))
    df.withColumn("some_struct", typedLit(("foo", 1, .0.3)))
    df.withColumn("some_map", typedLit(Map("key1" -> 1, "key2" -> 2)))
    

    另外可以使用alias来改变列名,类似sql中的as

    df.withColumn(
        "some_struct",
        struct(lit("foo").alias("x"), lit(1).alias("y"), lit(0.3).alias("z"))
     )
    

    可以使用cast函数来改变类型

    df.withColumn(
        "some_struct", 
        struct(lit("foo"), lit(1), lit(0.3)).cast("struct<x: string, y: integer, z: double>")
     )
    

    12.3 添加值为null的一列

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    val df = spark.createDataFrame(List(
      (1.2, 1),
      (3.1, 2)))
      .toDF("col1", "col2")
    val udf_null = udf((s: Any) => null) //有时候这个会报错
    //scala.MatchError: Null (of class scala.reflect.internal.Types$ClassNoArgsTypeRef)
    //出现这个错误时,是因为scala不知道要给null转换成何种类型
    //可以强制给一个类型
    //val udf_null = udf((s: Any) => null.asInstanceOf[String])
    
    val df_res = df.withColumn("col_name", udf_null(col("col1")).cast(types. StringType))
    df_res.show
    

    12.4 一般需要把null的列转换成需要的新列

    比如已经新增了一个全为null的列,我们需要把这列转化成uuid的随机数列

        val code: (String => String) = (arg: String) => { if (arg == null) UUID.randomUUID().toString else "error"}
        val add_uuid_Col = udf(code)
        df_res.withColumn("col_name", add_uuid_Col(df("col_name"))
    

    13.更改一列的列名

    df.withColumnRenamed("x", "y")

    14. 过滤操作(filter and where)

    14.1.filter & filterNot

    filter方法返回了所有使假设条件为真的集合元素组成的新集合。还有一个方法filterNot,可以返回所有使假设条件返回false的元素组成的新集合,用法一样。

    val df = sc.parallelize(Seq(("a", 2), ("b", 5), ("c", 2), ("d", 3), ("e", 1))).toDF("id", "num")
    整数类型
    逻辑运算符:>, <, ===

    import org.apache.spark.sql.functions
    val number = 2;
    df.filter(functions.col("num") === 2)
    df.filter(df.col("num") === 2)
    df.filter(df("num") === 2)
    df.filter($"num">number)    //传递参数过滤(新版本scala可能不能用)
    df.filter($"num"<2)
    

    或者

    df.filter("num=2")
    df.filter("num>2")
    df.filter("num is null")
    

    字符串类型
    df.filter(functions.col("id") === ("aaaaaa"))等于
    df.filter($"id".equalTo("aaaaaa")) 等于
    df.filter(functions.col("id") =!= ("aaaaaa")) 不等于

    df.filter($"id".length>1)这种写法已经弃用了,改成df.filter(length(col("id")) >1)

    Map、Array类型判断相等需要使用sameElements方法
    df .filter(col("array1" sameElements "array2"))

    传递参数过滤

    val str = s"a"
    
    df.filter($"id"equalTo(str))
    

    当dataframe没有字段名时,可以用默认的字段名[_1, _2, .....]来进行判断

    多条件判断
    逻辑连接符 &&(并)、||(或)

    df.filter($"num"===2 && $"id".equalTo("a")
    df.filter($"num"===1 || $"num"===3)
    
    val df1 = df
           .filter(row => row.getAs("col1")!= "") // 过滤掉异常数据
           // 如果存在,过滤掉col2日期不在当前执行周期内的数据
           .filter(row => row.getAs("col2") >= row.getAs("col1").split("_")(0) &&
            row.getAs("col2") <= row.getAs("col1").split("_")(1))
    
    这样写会报错
    因为使用getAs函数获取某列的数据时没有指明具体的类型,导致无法判断是否支持"<="或 ">="这类的运算符,因而会报错
    
    val df1 = df
           .filter(row => row.getAs[String]("col1")!= "") // 过滤掉异常数据
           // 如果存在,过滤掉col2日期不在当前执行周期内的数据
           .filter(row => row.getAs[String]("col2") >= row.getAs[String]("col1").split("_")(0) &&
            row.getAs[String]("col2") <= row.getAs("col1").split("_")(1))
    
    

    14.2.where

    df.where("id = 1 or sex = 'male' ").show
    用法和filter相同

    15.转化列类型

    15.1单列转换

    import org.apache.spark.sql.types._
    //import org.apache.spark.sql.types.{DoubleType, IntegerType}
    
    val data = Array(("1", "2", "3", "4", "5"), ("6", "7", "8", "9", "10"))
    val df = spark.createDataFrame(data).toDF("col1", "col2", "col3", "col4", "col5")
    
    import org.apache.spark.sql.functions._
    df.select(col("col1").cast(DoubleType)).show()
    

    15.2使用withColumn循环多列转换(遍历)

    import org.apache.spark.sql.types
    var df1 = df
    val colNames = df.columns
    
    for (colName <- colNames) {
      df1 = df1.withColumn(colName, col(colName).cast(types.DoubleType))
    // 有些type名称会和其他类里的名称起冲突,例如StringType,调用的时候手动加上类名如types.StringType调用。
    }
    df1.show()
    

    15.3 当一列类型为Array,需要转成字符串

    平时都是使用Array.mkString(',')这种。在dataframe中需要使用concat_ws,详细见【20.1.数据的合并】。

    16.创建一个空dataframe

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types
    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.EmptyRDD
    
    /**
     * Spark创建空DataFrame示例
     */
    object EmptyDataFrame {
    
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder().appName("EmptyDataFrame").master("local").getOrCreate()
    
        /**
         * 创建一个空的DataFrame,代表用户
         * 有四列,分别代表ID、名字、年龄、生日
         */
        val colNames = Array("id", "name", "age", "birth")
        //为了简单起见,字段类型都为String
        val schema = StructType(colNames.map(fieldName => StructField(fieldName, types.StringType, true)))
        //主要是利用了spark.sparkContext.emptyRDD
        val emptyDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
    
        emptyDf.show
    
        /**
       也可以给每列指定相对应的类型(使用这种会比较多)
       通过StructType直接指定每个字段的schema
         */
        val schema1 = StructType(
          Seq(
            StructField("id", types.IntegerType, true),
            StructField("name", types.StringType, true),
            StructField("age", types.IntegerType, true),
            StructField("birth", types.StringType, true)))
        val emptyDf1 = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema1)
        emptyDf1.show
    
        //还有一种空的DataFrame,没有任何行任何列
        spark.emptyDataFrame.show
    
        spark.stop()
      }
    }
    

    17.指定列缺失值填充、删除以及替换

    17.1 填充

        val df1 = source_df.na.fill("新值")
        val df2 = source_df.na.fill(10)​
        val df3 = source_df.na.fill(value = 0.0,cols=Array("col1","col2"))
        val df4 = source_df.na.fill(value = "wang",cols=Array("col1","col2"))
    //传入一个值,以及所有需要用此值填充的Array,或者使用Map,按照列进行不同填充
        val df5 = source_df.na.fill(value = "male",cols=Array("col1"))
        val df6 = source_df.na.fill(Map("col1"->"wang","col2"->"yao"))
    

    17.2 删除

    val df7 = source_df.na.drop()
    val df8 = source_df.na.drop(Array("col1"))
    val df9 = source_df.na.drop(10,Array("col1","col2"))    //删除某一或几列的非空非NaN但是值低于10的
    

    17.3 替换

    df.na.replace("col1",Map(1->2))         //将col1列的值为1替换为2.
    df.na.replace(Array("col1","col2"),Map(1->2))​​
    df.na.replace[Int]("col1",Map(1->2))   //可以添加泛型,Map中的key和value类型必须与其保持一致。
    

    18.根据列分割成多行 explode

    explode 用来根据某列分割成多行
    通常用来展开array或map为多行。

    拆分List格式的列。
    df = df.withColumn("entityPair", explode(col("List_col")));

    拆分Map格式的列。
    df = df.select(explode(col("data"))).toDF("key", "value");
    可以看到,这里和List有一个不同的地方是需要在explode后接一个toDF操作,是因为Map进行展开操作后自然会得到两列,我们需要将其转化为DataFrame格式的两列,列名可以自己指定。

    18.2 网上的例子1:https://blog.csdn.net/strongyoung88/article/details/52227568

    有的时候JSON数据,会包含嵌套关系,比如像如下的JSON数据:

    {"name":"Michael", "age":25,"myScore":[{"score1":19,"score2":23},{"score1":58,"score2":50}]}
    {"name":"Andy", "age":30,"myScore":[{"score1":29,"score2":33},{"score1":38,"score2":52},{"score1":88,"score2":71}]}
    {"name":"Justin", "age":19,"myScore":[{"score1":39,"score2":43},{"score1":28,"score2":53}]}
    

    用sparkql读取以后

    df.show()
    df.printSchema()
    
    +---+--------------------+-------+
    |age|             myScore|   name|
    +---+--------------------+-------+
    | 25|  [[23,19], [50,58]]|Michael|
    | 30|[[33,29], [52,38]...|   Andy|
    | 19|  [[43,39], [53,28]]| Justin|
    +---+--------------------+-------+
    root
     |-- age: long (nullable = true)
     |-- myScore: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- score2: long (nullable = true)
     |    |    |-- score1: long (nullable = true)
     |-- name: string (nullable = true)
    

    使用explode函数把myScore的数组类型展开

    val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore")
    val dfMyScore = dfScore.select("name","myScore.score1", "myScore.score2")
    dfScore.show()
    

    此时,会得到如下结果,这个时候的表,就跟我们平时看到的关系型数据庫的表是一样的了,接下来,我们就可以执行相关的sql查询了。

    +-------+-----------------+------------------+
    |   name|           score1|            score2|
    +-------+-----------------+------------------+
    |Michael|               19|                23|
    |Michael|               58|                50|
    |   Andy|               29|                33|
    |   Andy|               38|                52|
    |   Andy|               88|                71|
    | Justin|               39|                43|
    | Justin|               28|                53|
    +-------+-----------------+------------------+
    

    18.3 网上的例子2:

    有时候需要根据某个字段内容进行分割,然后生成多行,这时可以使用explode方法
    根据c3字段中空格将字段内容分割,分割内容存在c3_字段中
    分割前:


    df.explode( "c3" , "c3_" ){time: String => time.split( " " )}
    结果:

    19.对dataframe中的值进行替换

    我们需要把第二列中的“Tesla”改成“S”,“Ford”改成“F”

    //初始化一个dataframe
    //import org.apache.spark.sql.functions._
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.{functions => F}
    import org.apache.spark.sql.{DataFrame,SparkSession}
    
    val sparkSession =  SparkSession.builder().enableHiveSupport().getOrCreate()
    val rdd = sc.parallelize(
          List( (2012,"Tesla","S"), (1997,"Ford","E350"), (2015,"Chevy","Volt"))
      )
     import sparkSession.implicits._
      val df = rdd.toDF("year","trademark","model_number")
    

    使用functions.udf

    val makeSIfTesla = udf {(trademark: String) => 
      if(trademark == "Tesla") "S"  else if(trademark == "Ford")"F"
     else trademark
    }
    var df1 = df.withColumn("trademark", makeSIfTesla(df("trademark")))
    

    使用functions.when

    val df2 = df.withColumn("trademark", F.when(col("trademark").equalTo("Tesla"), "S").when(col("trademark").equalTo("Ford"), "F").otherwise(col("trademark")))
    
    val df3 = df.withColumn("trademark", F.when(col("trademark") === "Tesla", "S").when(col("trademark") === "Ford", "F").otherwise(col("trademark")))
    

    20.数据的拆分与合并

    20.1.数据的合并

    concat合并列,列类型需要为字符串 
    concat_ws按照特殊符号分隔来合并列,返回col格式
    连接字符串:
    concat_ws("_", field1, field2),输出结果将会是:“field1_field2”。
    
    数组元素连接:
    concat_ws("_", [a,b,c]),输出结果将会是:"a_b_c"。
    多个列连接:
    concat_ws(",",col(col_name1),col(col_name2),col(col_name3))
    
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.types
    import org.apache.spark.sql.{DataFrame, Row}
    //import org.apache.spark.sql.types.{DoubleType, StringType}
    
    val data = Array(("1", "2", "3", "4", "5"), ("6", "7", "8", "9", "10"))
    val df = spark.createDataFrame(data).toDF("col1", "col2", "col3", "col4", "col5")
    
    //合并多个列,加在原dataframe的后面
    val new_df = df.withColumn("new_col_name", concat_ws(",", col("col1"),col("col2"),col("col3")).cast(types.StringType))
    
    //使用自定义UDF函数
        // 编写udf函数,对行做处理
    val separator = ","
        def mergeCols(merge_row: Row): String = {
            merge_row.toSeq.foldLeft("")(_ + separator + _).substring(1)
        }
        val mergeColsUDF = udf(mergeCols _)
        df.select($"col4",$"col5",mergeColsUDF(struct($"col1", $"col2", $"col3")).as("value")).show
    
    结果:
    +----+----+-----+
    |col4|col5|value|
    +----+----+-----+
    |   4|   5|1,2,3|
    |   9|  10|6,7,8|
    +----+----+-----+
    

    20.2.数据的拆分

    把一列(字符串),按照特殊的分隔符号切分成多列

    生成数据
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{DataFrame, Row}
    //import org.apache.spark.sql.types.{DoubleType, StringType}
    
    val data = Array(("1", "2", "3", "4", "5"), ("6", "7", "8", "9", "10"))
    var df = spark.createDataFrame(data).toDF("col1", "col2", "col3", "col4", "col5")
    val separator = ","
        def mergeCols(merge_row: Row): String = {
            merge_row.toSeq.foldLeft("")(_ + separator + _).substring(1)
        }
    
        val mergeColsUDF = udf(mergeCols _)
       df = df.select(mergeColsUDF(struct($"col1", $"col2", $"col3",$"col4",$"col5")).as("value"))
    
    使用内置函数split,然后遍历添加列
        val separator = ","
        lazy val first = df.first()
    使用lazy val first,原因是构造顺序问题,在使用.length等方法时,first可能还没有被赋值,所以先使用lazy val生成,等到用到这个变量的时候再初始化。
        val numAttrs = first.toString().split(separator).length
    tabulate返回指定长度数组,每个数组元素为指定函数的返回值,默认从0开始。
        val attrs = Array.tabulate(numAttrs)(n => "col_" + n)
        //按指定分隔符拆分value列,生成splitCols列
        var newDF = df.withColumn("splitCols", split($"value", separator))
        attrs.zipWithIndex.foreach(x => {
          newDF = newDF.withColumn(x._1, $"splitCols".getItem(x._2))
        })
        newDF.show()
    
    结果:
    +----------+----------------+-----+-----+-----+-----+-----+
    |     value|       splitCols|col_0|col_1|col_2|col_3|col_4|
    +----------+----------------+-----+-----+-----+-----+-----+
    | 1,2,3,4,5| [1, 2, 3, 4, 5]|    1|    2|    3|    4|    5|
    |6,7,8,9,10|[6, 7, 8, 9, 10]|    6|    7|    8|    9|   10|
    +----------+----------------+-----+-----+-----+-----+-----+
    

    21.查看分区以及重分区

    分区数太少,不能充分利用集群中所有可用的内核。 【影响较大】
    分区数太多,将管理许多小任务,使得产生过多的开销(更多的抓取,更多的磁盘搜索,driver app需要跟踪每个任务的状态)。 【小于1000的分区计数,调度太多的小任务对这一点影响相对较小】
    查看分区
    df.rdd.getNumPartitions
    重分区(10个分区)repartition可以用来减少或增多分区数
    df.repartition(10).rdd.getNumPartitions
    重分区(1个)coalesce为repartition的优化版本,只能用来减少分区数
    df.coalesce(1).rdd.getNumPartitions
    如果重分区的数目大于原来的分区数,那么必须指定shuffle参数为true
    df.coalesce(14,true) repartition相当于带true的coalesce

    注意:增加分区数会产生shuffle,但是减少分区数不会产生shuffle,重分区可以缓解数据倾斜的问题,一般指减少分区数

    22.Spark SQL符合类型struct

    df.selectExpr("(col1,col2) as new_col","*").show(1)
    df.selectExpr("struct(col1,col2) as new_col","*").show(1)
    val complexDF=df.select(struct("col1","col2").as("new_col3"))
    complexDF.createOrReplaceTempView("complexDF")
    complexDF.select("new_col3.col1").show(1)
    complexDF.select(col("new_col3").getField("col2")).show(1)
    complexDF.select("new_col3.*").show(1)
    spark.sql("select new_col3.* from complexDF").show(1)
    

    23.注册成表,并进行SparkSQL操作,以及df的map操作

    df.show
    这个数据代表用户有没有车,label = 1为有车。

    • 将DataFrame注册成表
      df.registerTempTable("people_car_table")

    • 利用sql方法进行SparkSQL操作,选出有车的用户
      val people_car = sparkSession.sql("SELECT imei,label FROM people_car_table WHERE label = 1")

    • 将返回结果看作是数据库操作的一行,(0)表示第一列,依次类推
      这里把imei前面加上字符串为“IMEI:”的前缀
      people_car.map(t => "IMEI: " + t(0)).collect().foreach(println)


      people_car.map(t => "IMEI: " + t(0)).show

      同时改变两列
      people_car.map(t => ("imei:"+t(0),"label:"+t(1))).show
    • 通过域的名称获取信息,结果和people_car.map(t => "IMEI: " + t(0)).show一样
      使用getAs。
      people_car.map(t => "Name: " + t.getAs[String]("imei")).show

    上面两张图可以看出dataframe的map操作和rdd基本是一样的,但是dataframe相比rdd的优势在于带有schema,更加直观,使用map以后schema就变化了,需要重新定义schema。
    • 通过sql语句读取hive表,dataframe格式
      val car_df = sparkSession.sql(select_sql)

    24.检查数据

    先生成数据

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{DataFrame, Row}
    //import org.apache.spark.sql.types.{DoubleType, StringType}
    
    val data = Array(("1", "2", "3", "4", "5"), ("6", "7", "8", "9", "10"))
    
    var df = spark.createDataFrame(data).toDF("col1", "col2", "col3", "col4", "col5")
    

    检查数据

    24.1

    df.dtypes 返回字段名和数据类型
    结果:
    res1: Array[(String, String)] = Array((col1,StringType), (col2,StringType), (col3,StringType), (col4,StringType), (col5,StringType))

    24.2

    【show方法参数默认为true,如果字段太长则会略去,改为false,会显示整个字段】
    df.show(false) 返回数据内容
    结果:

    +----+----+----+----+----+
    |col1|col2|col3|col4|col5|
    +----+----+----+----+----+
    |   1|   2|   3|   4|   5|
    |   6|   7|   8|   9|  10|
    +----+----+----+----+----+
    
    24.3

    df.schema 返回schema
    结果:
    res3: org.apache.spark.sql.types.StructType = StructType(StructField(col1,StringType,true), StructField(col2,StringType,true), StructField(col3,StringType,true), StructField(col4,StringType,true), StructField(col5,StringType,true))

    24.4

    df.columns    //返回列名
    df.count()      //返回行数
    df.distinct().count() //不重复行数
    df.printSchema()   //打印schema
    df.describe().show() 
    //导入数据后执行的第一个操作是了解它们的大致情况。
    //对于数字列, 了解描述性摘要统计信息对理解数据的分布有很大帮助。
    //可以使用describe函数来返回一个DataFrame, 其中会包含非空项目数, 平均值, 标准偏差以及每个数字列的最小值和最大值等信息。
    

    25.Spark SQL 加载数据

    import org.apache.spark.sql.types._
    
    // Create an RDD
    val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
    
    // 转化rdd中的行为Row格式
    import org.apache.spark.sql.Row
    val rowRDD = peopleRDD.map(x => Row(x))
    
    val schemaString = "name age" //两列
    
    // 根据schemaString 生成schema
    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))
    
    val schema = StructType(fields)
    
    // Apply the schema to the RDD
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    
    // 创建临时表
    peopleDF.createOrReplaceTempView("people")
    
    // 可以直接使用sql操作dataframe了
    val results = spark.sql("SELECT name FROM people")
    
    // The results of SQL queries are DataFrames and support all the normal RDD operations
    // The columns of a row in the result can be accessed by field index or by field name
    results.map(attributes => "Name: " + attributes(0)).show()
    // +-------------+
    // |        value|
    // +-------------+
    // |Name: Michael|
    // |   Name: Andy|
    // | Name: Justin|
    // +-------------+
    

    Reference

    https://blog.csdn.net/strongyoung88/article/details/52227568
    https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html
    https://stackoverflow.com/questions/32357774/scala-how-can-i-replace-value-in-dataframes-using-scala
    https://stackoverflow.com/questions/32788322/how-to-add-a-constant-column-in-a-spark-dataframe
    https://www.cnblogs.com/xiaoma0529/p/7126923.html
    https://dongkelun.com/2018/08/14/sparkEmptyDataFrame/
    https://dongkelun.com/2018/08/02/sparkUDF/
    https://blog.csdn.net/sinat_36121406/article/details/82755516

    相关文章

      网友评论

        本文标题:【译】MLXTEND之StackingCVRegressor

        本文链接:https://www.haomeiwen.com/subject/zuethftx.html