Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Fix AttributeReference mismatch for readChangeFeed queries coming from Spark Connect #3451

Conversation

dillitz
Copy link
Contributor

@dillitz dillitz commented Jul 31, 2024

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This fixes an issue in the DeltaAnalysis rule, more specifically fromV2Relation, that leads to Spark Connect readChangeFeed queries failing when applying Projections, Selections, etc. to the underlying table's columns due to an AttributeReference mismatch:
Spark Connect Query:

spark.read.format("delta").option("readChangeFeed", "true").option("startingVersion", 0)
   .table("main.dillitz.test").select("id").show()

Unresolved Logical Plan:

common {
  plan_id: 15
}
project {
  input {
    common {
      plan_id: 14
    }
    read {
      named_table {
        unparsed_identifier: "main.dillitz.test"
        options {
          key: "startingVersion"
          value: "0"
        }
        options {
          key: "readChangeFeed"
          value: "true"
        }
      }
    }
  }
  expressions {
    unresolved_attribute {
      unparsed_identifier: "id"
    }
  }
}

Resolved Logical Plan:

'Project ['id]
+- 'UnresolvedRelation [dillitz, default, test], [startingVersion=0, readChangeFeed=true], false 

Plan before DeltaAnalysis rule:

Project [id#594L]
+- SubqueryAlias dillitz.default.test
   +- RelationV2[id#594L] dillitz.default.test dillitz.default.test

Plan after DeltaAnalysis rule:

!Project [id#594L]
+- SubqueryAlias spark_catalog.delta.`/private/var/folders/11/kfrr0zqj4w3_lb6mpjk76q_00000gp/T/spark-8f2dc5b0-6722-4928-90bb-fba73bd9ce87`
   +- Relation [id#595L,_change_type#596,_commit_version#597L,_commit_timestamp#598] DeltaCDFRelation(SnapshotWithSchemaMode(...))

Error:

org.apache.spark.sql.catalyst.ExtendedAnalysisException: [MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION] Resolved attribute(s) "id" missing from "id", "_change_type", "_commit_version", "_commit_timestamp" in operator !Project [id#493L]. Attribute(s) with the same name appear in the operation: "id".
Please check if the right attribute(s) are used. SQLSTATE: XX000;
!Project [id#493L]
+- SubqueryAlias dillitz.default.test
   +- Relation dillitz.default.test[id#494L,_change_type#495,_commit_version#496L,_commit_timestamp#497] DeltaCDFRelation(SnapshotWithSchemaMode(...))

	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:55)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:694)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:197)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:287)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:197)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:179)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:341)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:167)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:155)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:155)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:341)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$2(Analyzer.scala:396)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:169)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:396)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:443)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:393)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:260)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:441)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$4(QueryExecution.scala:600)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1152)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:600)
	at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:596)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:596)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:254)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:253)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:235)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:105)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
	at org.apache.spark.sql.SparkSession.$anonfun$withActiveAndFrameProfiler$1(SparkSession.scala:1187)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.SparkSession.withActiveAndFrameProfiler(SparkSession.scala:1187)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:103)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformShowString(SparkConnectPlanner.scala:323)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelation$1(SparkConnectPlanner.scala:169)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$usePlanCache$3(SessionHolder.scala:480)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.connect.service.SessionHolder.usePlanCache(SessionHolder.scala:479)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:166)
	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:90)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handlePlan(ExecuteThreadRunner.scala:312)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:244)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:176)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:343)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:343)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:83)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:237)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:82)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:342)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:176)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:530)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:529)

How was this patch tested?

Created an E2E Spark Connect test running queries like the one above. Not sure how to merge it into this repository.

Does this PR introduce any user-facing changes?

No.

@dillitz dillitz changed the title fix output of the LogicalRelation for readChangeFeed queries coming f… [Spark] Fix AttributeReference mismatch for readChangeFeed queries coming from Spark Connect Jul 31, 2024
@dillitz dillitz force-pushed the fix-logical-relation-output-readChangeFeed-spark-connect branch from 3ed1ac6 to 9a3dd90 Compare August 1, 2024 12:07
@xupefei
Copy link
Contributor

xupefei commented Aug 1, 2024

Looks good to me!

@dillitz
Copy link
Contributor Author

dillitz commented Aug 6, 2024

Now also copying metadata from the retrieved schema after test failures in DeltaSharingDataSourceCMSuite.

@scottsand-db scottsand-db merged commit 4f768f1 into delta-io:master Aug 7, 2024
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants