美文网首页大数据学习
Spark SQL:分类讨论UDF对DataFrame列存在空值

Spark SQL:分类讨论UDF对DataFrame列存在空值

作者: xiaogp | 来源:发表于2022-02-19 20:03 被阅读0次

    摘要:Spark SQL

    先上结论

    • 空指针的情况:在UDF中正确指定了对应类型的前提下,DataFrame中的列如果是stringtimestamp,集合(arraymap)类型,null值都会被匹配到,如果UDF逻辑需要对null进行操作,会导致空指针
    • null略过返回null的情况:在UDF中正确指定了对应类型的前提下,DataFrame中的数值类型(IntLongDouble),以及布尔类型,null值都会被略过,此时UDF还是返回null,如果UDF中需要apply多列,只要有其中任何一列是null,整一行都不会被匹配到
    • functions下自带函数null略过spark.sql.functions下的函数会自动略过null,依旧返回null,如果针对string等不会自动略过null的情况,UDF中要对null进行判断,如果需要依旧返回null则UDF返回None: Option

    按照数据类型测试

    先准备一个DataFrame,使用Scala的Option来构造出含有null的列

    scala> val a = Seq((Some("a_b"), Some(1), Some(1L), Some(3.3), Some(false)), (None, None,None, None, None)).toDF("a", "b", "c", "d", "e")
    a: org.apache.spark.sql.DataFrame = [a: string, b: int ... 3 more fields]
    
    scala> a.show()
    +----+----+----+----+-----+
    |   a|   b|   c|   d|    e|
    +----+----+----+----+-----+
    | a_b|   1|   1| 3.3|false|
    |null|null|null|null| null|
    +----+----+----+----+-----+
    
    scala> a.printSchema
    root
     |-- a: string (nullable = true)
     |-- b: integer (nullable = true)
     |-- c: long (nullable = true)
     |-- d: double (nullable = true)
     |-- e: boolean (nullable = true)
    
    
    (1)字符串列(Scala/Java:String)

    Scala的String和Java是一样的,测试以下:
    测试字符串split,报错空指针,说明null(None: Option[String])被处理

    scala> a.withColumn("a_1", udf((s: String) => s.split("_")(0)).apply($"a")).show()
    Caused by: java.lang.NullPointerException
    

    测试字符串slice,报错空指针,说明null(None: Option[String])被处理

    scala> a.withColumn("a_1", udf((s: String) => s.slice(2, 3)).apply($"a")).show()
    Caused by: java.lang.NullPointerException
    

    测试不对字符串做任何操作,只要匹配上就给到一个任意输出,正常运行

    scala> a.withColumn("a_1", udf((s: String) => "3").apply($"a")).show()
    +----+----+----+----+-----+---+
    |   a|   b|   c|   d|    e|a_1|
    +----+----+----+----+-----+---+
    | a_b|   1|   1| 3.3|false|  3|
    |null|null|null|null| null|  3|
    +----+----+----+----+-----+---+
    

    测试字符串相加,正常运行

    scala> a.withColumn("a_1", udf((s: String) => s + "123").apply($"a")).show()
    +----+----+----+----+-----+-------+
    |   a|   b|   c|   d|    e|    a_1|
    +----+----+----+----+-----+-------+
    | a_b|   1|   1| 3.3|false| a_b123|
    |null|null|null|null| null|null123|
    +----+----+----+----+-----+-------+
    

    测试使用spark.sql.functions.split下的自带函数,null值略过

    scala> a.withColumn("a", split($"a", "_")).show()
    +------+----+----+----+-----+
    |     a|   b|   c|   d|    e|
    +------+----+----+----+-----+
    |[a, b]|   1|   1| 3.3|false|
    |  null|null|null|null| null|
    +------+----+----+----+-----
    

    字符串列结论

    • 字符串列中null会被UDF处理
    • 在匹配上字符串之后,UDF中的逻辑会按照Scala的语法进行处理,如果Scala语法报错例如空指针则UDF执行报错,如果Scala语法无误则UDF执行成功
    • UDF不会自动对null值做异常处理,但是spark.sql.function下的函数会对null做异常处理,处理方式是略过处理,直接返回null

    如果要实现functions下的split返回null,UDF需要加null判断

    scala> a.withColumn("a_1", udf((s: String) => if (s == null) null else s.split("_")).apply($"a")).show()
    +----+----+----+----+-----+------+
    |   a|   b|   c|   d|    e|   a_1|
    +----+----+----+----+-----+------+
    | a_b|   1|   1| 3.3|false|[a, b]|
    |null|null|null|null| null|  null|
    +----+----+----+----+-----+------+
    
    scala> a.withColumn("a_1", udf((s: String) => if (s == null) None else Some(s.split("_"))).apply($"a")).show()
    +----+----+----+----+-----+------+
    |   a|   b|   c|   d|    e|   a_1|
    +----+----+----+----+-----+------+
    | a_b|   1|   1| 3.3|false|[a, b]|
    |null|null|null|null| null|  null|
    +----+----+----+----+-----+------+
    

    (2)整数,长整数,小数列(Scala:Int,Long,Double)

    Int和Long是Scala的类型,测试一下
    测试整数相加,正常运行,null略过没有进行计算

    scala> a.withColumn("b_1", udf((s: Int) => s + 1).apply($"b")).show()
    +----+----+----+----+-----+----+
    |   a|   b|   c|   d|    e| b_1|
    +----+----+----+----+-----+----+
    | a_b|   1|   1| 3.3|false|   2|
    |null|null|null|null| null|null|
    +----+----+----+----+-----+----+
    

    测试数组包含,正常运行,null略过没有进行计算

    scala> a.withColumn("b_1", udf((s: Int) => Array(1, 2, 3).contains(s)).apply($"b")).show()
    +----+----+----+----+-----+----+
    |   a|   b|   c|   d|    e| b_1|
    +----+----+----+----+-----+----+
    | a_b|   1|   1| 3.3|false|true|
    |null|null|null|null| null|null|
    +----+----+----+----+-----+----+
    

    测试长整数大小判断,正常运行,null略过没有进行计算

    scala> a.withColumn("c_1", udf((s: Long) => if (s >= 1L) 10L else 5L).apply($"c")).show()
    +----+----+----+----+-----+----+
    |   a|   b|   c|   d|    e| c_1|
    +----+----+----+----+-----+----+
    | a_b|   1|   1| 3.3|false|  10|
    |null|null|null|null| null|null|
    +----+----+----+----+-----+----+
    

    测试小数大小判断,正常运行,null略过没有进行计算

    scala> a.withColumn("d_1", udf((s:Double) => s == 3.3).apply($"d")).show()
    +----+----+----+----+-----+----+
    |   a|   b|   c|   d|    e| d_1|
    +----+----+----+----+-----+----+
    | a_b|   1|   1| 3.3|false|true|
    |null|null|null|null| null|null|
    +----+----+----+----+-----+----+
    

    怀疑在UDF中指定了数值类型则匹配不到null,因此在UDF中使用Any尝试以下

    scala> a.withColumn("d_1", udf((s:Any) => s.toString.toDouble +1).apply($"d")).show()
    Caused by: java.lang.NullPointerException
    

    直接报错空指针,因此null值没有被处理是因为UDF中类型没有匹配上null

    数值列结论

    • 数值列列中null不会被UDF(指定Int,Long,Double)处理,会直接略过
    • UDF中指定了Int,Long,Double,实际上根部匹配不到null值,从而导致null直接略过,可以使用Scala: Any匹配
    • UDF由于不会匹配到null,因此不会产生异常

    (3)Scala集合(Seq,Map)

    使用Scala的集合作为列的元素,同样使用Option来实现带有空值和Scala集合的列

    scala> val a = Seq((Some(Seq(1, 2)), Some(Seq("1", "2")), Some(Seq(Seq(1,2), Seq(2, 3))), Some(Map("a" -> 1)), Some(Set(1, 2))), (None, None,None,  None, None)).toDF("a", "b", "c", "d", "e") 
    a: org.apache.spark.sql.DataFrame = [a: array<int>, b: array<string> ... 3 more fields]
    
    scala> a.printSchema
    root
     |-- a: array (nullable = true)
     |    |-- element: integer (containsNull = false)
     |-- b: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- c: array (nullable = true)
     |    |-- element: array (containsNull = true)
     |    |    |-- element: integer (containsNull = false)
     |-- d: map (nullable = true)
     |    |-- key: string
     |    |-- value: integer (valueContainsNull = false)
     |-- e: array (nullable = true)
     |    |-- element: integer (containsNull = false)
    
    scala> a.show()
    +------+------+----------------+--------+------+
    |     a|     b|               c|       d|     e|
    +------+------+----------------+--------+------+
    |[1, 2]|[1, 2]|[[1, 2], [2, 3]]|[a -> 1]|[1, 2]|
    |  null|  null|            null|    null|  null|
    +------+------+----------------+--------+------+
    

    其中Scala的Set,Array都被DataFrame推断为了array(Seq)
    测试数组取某个或者某些元素,null值被处理,报错空指针

    scala> a.withColumn("b_1", udf((s: Seq[String]) => s(0)).apply($"b")).show()
    Caused by: java.lang.NullPointerException
    
    scala> a.withColumn("a_1", udf((s: Seq[Int]) => s(0)).apply($"a")).show()
    Caused by: java.lang.NullPointerException
    

    测试多维数组取元素,null值被处理,报错空指针

    scala> a.withColumn("c_1", udf((s: Seq[Seq[Int]]) => s(0)).apply($"c")).show()
    Caused by: java.lang.NullPointerException
    

    测试数组包含,null值被处理,报错空指针

    scala> a.withColumn("e_1", udf((s: Seq[String]) => s.contains("1")).apply($"e")).show()
    Caused by: java.lang.NullPointerException
    

    测试Map字段提取Key Value,null值被处理,报错空指针

    scala> a.withColumn("d_1", udf((s: Map[String, Int]) => s.get("a")).apply($"d")).show()
    Caused by: java.lang.NullPointerException
    

    尝试一下匹配上但是不使用会导致空指针的操作

    scala> a.withColumn("e_1", udf((s: Seq[String]) => 100).apply($"e")).show()
    +------+------+----------------+--------+------+---+
    |     a|     b|               c|       d|     e|e_1|
    +------+------+----------------+--------+------+---+
    |[1, 2]|[1, 2]|[[1, 2], [2, 3]]|[a -> 1]|[1, 2]|100|
    |  null|  null|            null|    null|  null|100|
    +------+------+----------------+--------+------+---+
    

    结果是可以通过不报错,和String类型结论一致
    最后测试一下使用spark.sql.functions.array_contains下的函数操作array,执行成功,null值直接略过,非null处理逻辑正确

    scala> a.withColumn("a_1", array_contains($"a", 1)).show()
    +------+------+----------------+--------+------+----+
    |     a|     b|               c|       d|     e| a_1|
    +------+------+----------------+--------+------+----+
    |[1, 2]|[1, 2]|[[1, 2], [2, 3]]|[a -> 1]|[1, 2]|true|
    |  null|  null|            null|    null|  null|null|
    +------+------+----------------+--------+------+----+
    
    

    Scala集合列结论

    • Scala集合列(array,map)中null会被UDF处理
    • 在匹配上字符串之后,UDF中的逻辑会按照Scala的语法进行处理,如果Scala语法报错例如空指针则UDF执行报错,如果Scala语法无误则UDF执行成功
    • UDF不会自动对null值做异常处理,但是spark.sql.function下的函数会对null做异常处理,处理方式是略过处理,直接返回null

    如果要实现functions下自动处理为null,UDF需要加入null判断

    scala> a.withColumn("a_1", udf((s: Seq[Int]) => if (s == null) None else Some(s(0))).apply($"a")).show()
    +------+------+----------------+--------+------+----+
    |     a|     b|               c|       d|     e| a_1|
    +------+------+----------------+--------+------+----+
    |[1, 2]|[1, 2]|[[1, 2], [2, 3]]|[a -> 1]|[1, 2]|   1|
    |  null|  null|            null|    null|  null|null|
    +------+------+----------------+--------+------+----+
    

    (4)布尔(Scala:Boolean)

    重新构建一个含有null的Boolean列的DataFrame

    scala> val a = Seq((Some(false)),(None)).toDF("a")
    scala> a.show()
    +-----+
    |    a|
    +-----+
    |false|
    | null|
    +-----+
    

    测试Boolean匹配,但不不对Boolean做任何操作,结果匹配不上,null略过

    scala> a.withColumn("a_1", udf((s: Boolean) =>10).apply($"a")).show()
    +-----+----+
    |    a| a_1|
    +-----+----+
    |false|  10|
    | null|null|
    +-----+----+
    

    测试Boolean并且对Boolean取反,结果null匹配不上,null略过

    scala> a.withColumn("a_1", udf((s: Boolean) => !s).apply($"a")).show()
    +-----+----+
    |    a| a_1|
    +-----+----+
    |false|true|
    | null|null|
    +-----+----+
    

    如果采用spark.sql.functions.not也是同样略过

    scala> a.withColumn("a_1", not($"a")).show()
    +-----+----+
    |    a| a_1|
    +-----+----+
    |false|true|
    | null|null|
    +-----+----+
    

    布尔列结论

    • 布尔列中null不会被UDF中的Boolean处理,会直接略过,可以使用Any匹配

    (5)时间日期(Java:java.sql.Timestamp)

    DataFrame的timestamp列是java.sql.Timestamp,先创建一个带有null的timestmap列

    scala> val a = Seq((Some("2020-01-01")),(None)).toDF("a").select($"a".cast("timestamp"))
    a: org.apache.spark.sql.DataFrame = [a: timestamp]
    
    scala> a.show()
    +-------------------+
    |                  a|
    +-------------------+
    |2020-01-01 00:00:00|
    |               null|
    +-------------------+
    

    测试调用java.sql.TImestamp的getYear方法(计算离1900年多少年),匹配成功,null值被处理,报错空指针

    scala> a.withColumn("a_1", udf((s: java.sql.Timestamp) => s.getYear ).apply($"a")).show()
    Caused by: java.lang.NullPointerException
    

    测试调用spark.sql.function下的方法,null值略过

    scala> a.withColumn("a_1", date_format($"a", "yyyy-MM-dd")).show()
    +-------------------+----------+
    |                  a|       a_1|
    +-------------------+----------+
    |2020-01-01 00:00:00|2020-01-01|
    |               null|      null|
    +-------------------+----------+
    

    时间戳列结论

    • 时间戳列中null会被UDF中的java.sql.Timestamp处理
    • 在匹配上java.sql.Timestamp之后,UDF中的逻辑会按照Java的语法进行处理,如果Java语法报错例如空指针则UDF执行报错,如果Java语法无误则UDF执行成功
    • UDF不会自动对null值做异常处理,但是spark.sql.function下的函数会对null做异常处理,处理方式是略过处理,直接返回null

    相关文章

      网友评论

        本文标题:Spark SQL:分类讨论UDF对DataFrame列存在空值

        本文链接:https://www.haomeiwen.com/subject/jevflrtx.html