美文网首页
Spark3.0+Delta Lake0.7不支持update

Spark3.0+Delta Lake0.7不支持update

作者: 大数据扫地僧 | 来源:发表于2020-09-03 09:53 被阅读0次

    官网(https://docs.delta.io/latest/quick-start.html)例子,如下所示:

    
    import io.delta.tables._
    
    import org.apache.spark.sql.functions._
    
    val deltaTable = DeltaTable.forPath("/tmp/delta-table")
    
    deltaTable.update(
    
          condition = expr("id % 2 == 0"),
    
          set = Map("id" -> expr("id + 100"))
    
    )
    
    报错:
    
    Caused by: java.lang.UnsupportedOperationException: UPDATE TABLE is not supported temporarily.
    
    at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:767)
    
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
    
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    
    at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
    
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
    
    at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
    
    at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
    
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
    
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
    
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
    
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
    
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    
    at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
    
    at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:330)
    
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:94)
    
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
    
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
    
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:94)
    
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:87)
    
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:107)
    
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
    
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
    
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:107)
    
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:100)
    
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:199)
    
    at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:381)
    
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:199)
    
    at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207)
    
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:95)
    
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
    
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
    
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:92)
    
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
    
    at org.apache.spark.sql.delta.util.AnalysisHelper.toDataset(AnalysisHelper.scala:45)
    
    at org.apache.spark.sql.delta.util.AnalysisHelper.toDataset$(AnalysisHelper.scala:44)
    
    at io.delta.tables.DeltaTable.toDataset(DeltaTable.scala:42)
    
    at io.delta.tables.execution.DeltaTableOperations.$anonfun$executeUpdate$1(DeltaTableOperations.scala:64)
    
    at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:60)
    
    ... 8 more
    
    

    pom文件如下所示:

    
    <dependency>
    
      <groupId>org.apache.spark</groupId>
    
      <artifactId>spark-core_2.12</artifactId>
    
      <version>3.0.0</version>
    
    </dependency>
    
    <dependency>
    
      <groupId>io.delta</groupId>
    
      <artifactId>delta-core_2.12</artifactId>
    
      <version>0.7.0</version> 
    
    </dependency>
    
    

    定位到源码发现,如下所示,在SparkStrategies.scala中,我们可以看到Update和Merge暂时不支持。

    
    (省略)...
    
    case _: UpdateTable =>
    
      throw new UnsupportedOperationException(s"UPDATE TABLE is not supported temporarily.")
    
    case _: MergeIntoTable =>
    
      throw new UnsupportedOperationException(s"MERGE INTO TABLE is not supported temporarily.")
    
    

    相关文章

      网友评论

          本文标题:Spark3.0+Delta Lake0.7不支持update

          本文链接:https://www.haomeiwen.com/subject/htzwsktx.html