美文网首页报错集锦大数据学习
Spark SQL:function.array的数据类型问题记

Spark SQL:function.array的数据类型问题记

作者: xiaogp | 来源:发表于2022-01-27 16:29 被阅读0次

    摘要:Spark SQL

    问题复现

    需要对Spark SQL的DataFrame的一列做groupBy聚合其他所有特征,处理方式是将其他所有特征使用function.array配合function.collect_list聚合为数组,代码如下

        val joinData = data.join(announCountData, Seq("ent_name"), "left_outer").groupBy($"ent_name")
          .agg(collect_list(array("publish_date", "target_amt", "case_position", "case_reason", "plaintiff")).as("items"))
          .map {
            row => {
              // 初始化
              var involvedCount: Int = 0
              var involvedCountLast360: Int = 0
              var involvedDefendantCount: Int = 0 
              var involvedDefendantCountLast360: Int = 0
              var lzjfCount: Int = 0 
              var lzjfCountLast360: Int = 0
              var msqqCount: Int = 0 
              var msqqCountLast360: Int = 0
              // 解析
              val entName: String = row.getAs[String]("ent_name")
              val items: Seq[Seq[String]] = row.getAs[Seq[Seq[String]]]("items")
              for (item: Seq[String] <- items) {
                if (item.head != null) {
                  val createCaseDate: String = item.head.split(" ")(0)
                  val casePosition: String = item(1)
                  val caseReason: String = item(2)
                  val plaintiff: String = item(3)
                  // TODO 业务统计逻辑
                  }
                }
                 // 统计结果输出
              (entName, involvedCount, involvedCountInc, involvedDefendantCount, involvedDefendantCountInc, lzjfCount, lzjfCountInc,
                msqqCount, msqqCountInc)
            }
          }.toDF("ent_name", "involved_count", "involved_count_inc", "involved_defendant_count", "involved_defendant_count_inc",
          "lzjf_count", "lzjf_count_inc", "msqq_count", "msqq_count_inc")
    

    执行会报错

    org.apache.spark.sql.AnalysisException: ... 
    due to data type mismatch: input to function array should all be the same type, 
    but it's [timestamp, double, string, string, string];;
    

    报错说的很清楚,array函数内的列数据类型不一致,看下原始数据的数据类型

    scala> announAmountData.printSchema
     |-- ent_name: string (nullable = true)
     |-- publish_date: timestamp (nullable = true)
     |-- target_amt: double (nullable = true)
     |-- case_position: string (nullable = true)
     |-- case_reason: string (nullable = true)
     |-- plaintiff: string (nullable = true)
    

    里面包含string,double,timestamp三种类型,因此报错可以理解了,但是我发现这个问题的出现不是绝对的,因为类似这样的代码写了好多回,也有各种类型的数据类型,没有出过错(我怀疑array会自定将非string列改为string),理论上应该一起报错才对,下面开始测试一下


    function.array测试

    下面分别测试一下string,double,timestamp在使用array的各种场景下哪些会报错类型不一致

    scala> val a = Seq(("a", "2021-01-1", 3.3, "1"),("b", "2022-01-01", 4.4, "2")).toDF("a", "b", "c", "d")
    a: org.apache.spark.sql.DataFrame = [a: string, b: string ... 2 more fields]
    
    scala> val b = a.select($"a", $"b".cast("timestamp"), $"c", $"d")
    b: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 2 more fields]
    
    scala> b.printSchema
    root
     |-- a: string (nullable = true)
     |-- b: timestamp (nullable = true)
     |-- c: double (nullable = false)
     |-- d: string (nullable = true)
    
    (1)array(timestamp, string)和array(string, timestamp)
    scala> b.withColumn("e", array("b", "d"))
    res51: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 2 more fields]
    
    scala> b.withColumn("e", array("d", "b"))
    res52: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 2 more fields]
    
    scala> b.withColumn("e", array("d", "b")).show(false)
    +---+-------------------+---+---+------------------------+
    |a  |b                  |c  |d  |e                       |
    +---+-------------------+---+---+------------------------+
    |a  |2021-01-01 00:00:00|3.3|1  |[1, 2021-01-01 00:00:00]|
    |b  |2022-01-01 00:00:00|4.4|2  |[2, 2022-01-01 00:00:00]|
    +---+-------------------+---+---+------------------------+
    
    scala> b.withColumn("e", array("d", "b")).printSchema
    root
     |-- a: string (nullable = true)
     |-- b: timestamp (nullable = true)
     |-- c: double (nullable = false)
     |-- d: string (nullable = true)
     |-- e: array (nullable = false)
     |    |-- element: string (containsNull = true)
    

    这两个都是可以的,可见不需要类型一致,最终的array里面都是string,Spark SQL会自动将所有非string列转化为string

    (2)array(double, string)和array(string, double)
    scala> b.withColumn("e", array("c", "d"))
    res58: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 3 more fields]
    
    scala> b.withColumn("e", array("d", "c"))
    res59: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 3 more fields]
    

    这种也是可以的不报错,double和string可以组合array

    (3)array(double, timestamp)和array(timestamp, double)
    scala> b.withColumn("e", array("c", "b"))
    org.apache.spark.sql.AnalysisException: cannot resolve 'array(`c`, `b`)' due to data type mismatch: input to function array should all be the same type, but it's [double, timestamp];;
    'Project [a#271, b#279, c#273, d#274, array(c#273, b#279) AS e#478]
    +- Project [a#271, cast(b#272 as timestamp) AS b#279, c#273, d#274]
       +- Project [_1#266 AS a#271, _2#267 AS b#272, _3#268 AS c#273, _4#269 AS d#274]
          +- LocalRelation [_1#266, _2#267, _3#268, _4#269]
    

    直接报错,不论是array(timestamp, double)还是array(double, timestamp)都直接报错类型不一致,初步结论是array里面没有string列,因为只要将其中任一一列转化为string就可以执行

    scala> b.withColumn("e", array($"c".cast("string"), $"b"))
    res77: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 3 more fields]
    
    scala> b.withColumn("e", array($"c", $"b".cast("string")))
    res78: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 3 more fields]
    
    (4)timestamp,double,string组合出现的情况

    这种也是有时报错有时不报错,看顺序,直接Kian测试结果

    组合 报错
    array(double,timestamp,string) ×
    array(timestamp,double,string) ×
    array(string,timestamp,double)
    array(string,double,timestamp)
    array(double,string,timestamp)
    array(timestamp,string,double)

    初步猜测array的书写顺序需要满足在所有的非string类前面,一定要有至少一个string列


    解决方案

    解决方案就是手动将所有的非string列先转化为string即可,也就不需要关注书写顺序的问题,改写成如下代码即可

        val joinData = data.join(announAmountData, Seq("ent_name"), "left_outer").groupBy($"ent_name")
          .agg(collect_list(array($"publish_date".cast("string"), $"target_amt".cast("string"), $"case_position", $"case_reason", $"plaintiff")).as("items"))
          .map {
            row => {
    
              ()
            }
          }.toDF()
    
    

    相关文章

      网友评论

        本文标题:Spark SQL:function.array的数据类型问题记

        本文链接:https://www.haomeiwen.com/subject/jgjahrtx.html