美文网首页
Spark3.0+Delta Lake0.7不支持update

Spark3.0+Delta Lake0.7不支持update

作者: 大数据扫地僧 | 来源:发表于2020-09-03 09:53 被阅读0次

官网(https://docs.delta.io/latest/quick-start.html)例子,如下所示:


import io.delta.tables._

import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath("/tmp/delta-table")

deltaTable.update(

      condition = expr("id % 2 == 0"),

      set = Map("id" -> expr("id + 100"))

)

报错:

Caused by: java.lang.UnsupportedOperationException: UPDATE TABLE is not supported temporarily.

at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:767)

at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)

at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)

at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)

at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)

at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)

at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)

at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)

at scala.collection.Iterator.foreach(Iterator.scala:943)

at scala.collection.Iterator.foreach$(Iterator.scala:943)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)

at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)

at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)

at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)

at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)

at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)

at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)

at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)

at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:330)

at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:94)

at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)

at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)

at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)

at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)

at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:94)

at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:87)

at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:107)

at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)

at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)

at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)

at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)

at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:107)

at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:100)

at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:199)

at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:381)

at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:199)

at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207)

at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:95)

at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)

at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)

at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)

at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)

at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)

at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)

at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:92)

at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)

at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)

at org.apache.spark.sql.delta.util.AnalysisHelper.toDataset(AnalysisHelper.scala:45)

at org.apache.spark.sql.delta.util.AnalysisHelper.toDataset$(AnalysisHelper.scala:44)

at io.delta.tables.DeltaTable.toDataset(DeltaTable.scala:42)

at io.delta.tables.execution.DeltaTableOperations.$anonfun$executeUpdate$1(DeltaTableOperations.scala:64)

at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:60)

... 8 more

pom文件如下所示:


<dependency>

  <groupId>org.apache.spark</groupId>

  <artifactId>spark-core_2.12</artifactId>

  <version>3.0.0</version>

</dependency>

<dependency>

  <groupId>io.delta</groupId>

  <artifactId>delta-core_2.12</artifactId>

  <version>0.7.0</version> 

</dependency>

定位到源码发现,如下所示,在SparkStrategies.scala中,我们可以看到Update和Merge暂时不支持。


(省略)...

case _: UpdateTable =>

  throw new UnsupportedOperationException(s"UPDATE TABLE is not supported temporarily.")

case _: MergeIntoTable =>

  throw new UnsupportedOperationException(s"MERGE INTO TABLE is not supported temporarily.")

相关文章

网友评论

      本文标题:Spark3.0+Delta Lake0.7不支持update

      本文链接:https://www.haomeiwen.com/subject/htzwsktx.html