Skip to content

Commit

Permalink
Schema evolution in merge for UPDATE/INSERT non-star actions
Browse files Browse the repository at this point in the history
This change implements support for schema evolution in merge for UPDATE SET <assignments> and INSERT (...) VALUES (...) actions. Before this, schema evolution was only triggered  with UPDATE SET * and INSERT * actions.
The following example fails on resolving `target.newColumn` before this change, with schema evolution enabled it now succeeds and adds `newColumn` to the target table schema:

Target schema: `key: int, value: int`
Source schema: `key: int, value: int, newColumn: int`
```
MERGE INTO target]
USING source
ON target.key = source.key
WHEN MATCHED THEN UPDATE SET target.newColumn = source.newColumn
```

Changes:
- When schema evolution is enabled, allow resolving assignments in merge actions against the source table when resolving against the target table fails.
- When schema evolution is enabled, collect all new columns and nested fields in the source table that are assigned to by any merge action and add them to the table schema, taking into account * actions.

Extensive tests added to MergeIntoSuiteBase covering schema evolution for both top level columns and nested attributes.

GitOrigin-RevId: 3387f19fd987d769fdc719255bbdbcaf92db6bba
  • Loading branch information
johanl-db authored and allisonport-db committed Feb 10, 2023
1 parent 8a70d11 commit affd577
Show file tree
Hide file tree
Showing 3 changed files with 306 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaIllegalArgumentE
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnaryExpression}
Expand Down Expand Up @@ -429,7 +430,6 @@ object DeltaMergeInto {
}

val canAutoMigrate = conf.getConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE)

/**
* Resolves a clause using the given plan (used for resolving the action exprs) and
* returns the resolved clause.
Expand Down Expand Up @@ -516,6 +516,14 @@ object DeltaMergeInto {
resolveOrFail(unresolvedAttrib, fakeTargetPlan, s"$typ clause"),
resolutionErrorMsg)
} catch {
// Allow schema evolution for update and insert non-star when the column is not in
// the target.
case _: AnalysisException
if canAutoMigrate && (clause.isInstanceOf[DeltaMergeIntoMatchedUpdateClause] ||
clause.isInstanceOf[DeltaMergeIntoNotMatchedClause]) =>
DeltaUpdateTable.getTargetColNameParts(
resolveOrFail(unresolvedAttrib, fakeSourcePlan, s"$typ clause"),
resolutionErrorMsg)
case e: Throwable => throw e
}
}
Expand Down Expand Up @@ -548,20 +556,55 @@ object DeltaMergeInto {
val resolvedNotMatchedBySourceClauses = notMatchedBySourceClauses.map {
resolveClause(_, fakeTargetPlan)
}
val actions = (matchedClauses ++ notMatchedClauses ++ notMatchedBySourceClauses)
.flatMap(_.actions)
val containsStarAction = actions.exists(_.isInstanceOf[UnresolvedStar])

val migrateSchema = canAutoMigrate && containsStarAction
val finalSchema = if (canAutoMigrate) {
// When schema evolution is enabled, add to the target table new columns or nested fields that
// are assigned to in merge actions and not already part of the target schema. This is done by
// collecting all assignments from merge actions and using them to filter out the source
// schema before merging it with the target schema. We don't consider NOT MATCHED BY SOURCE
// clauses since these can't by definition reference source columns and thus can't introduce
// new columns in the target schema.
val actions = (matchedClauses ++ notMatchedClauses).flatMap(_.actions)
val assignments = actions.collect { case a: DeltaMergeAction => a.targetColNameParts }
val containsStarAction = actions.exists {
case _: UnresolvedStar => true
case _ => false
}

val finalSchema = if (migrateSchema) {
var sourceSchema = source.schema

// Filter the source schema to retain only fields that are referenced by at least one merge
// clause, then merge this schema with the target to give the final schema.
def filterSchema(sourceSchema: StructType, basePath: Seq[String]): StructType =
StructType(sourceSchema.flatMap { field =>
val fieldPath = basePath :+ field.name.toLowerCase(Locale.ROOT)
val childAssignedInMergeClause = assignments.exists(_.startsWith(fieldPath))

field.dataType match {
// Specifically assigned to in one clause: always keep, including all nested attributes
case _ if assignments.contains(fieldPath) => Some(field)
// If this is a struct and one of the child is being assigned to in a merge clause, keep
// it and continue filtering children.
case struct: StructType if childAssignedInMergeClause =>
Some(field.copy(dataType = filterSchema(struct, fieldPath)))
// The field isn't assigned to directly or indirectly (i.e. its children) in any non-*
// clause. Check if it should be kept with any * action.
case struct: StructType if containsStarAction =>
Some(field.copy(dataType = filterSchema(struct, fieldPath)))
case _ if containsStarAction => Some(field)
// The field and its children are not assigned to in any * or non-* action, drop it.
case _ => None
}
})

val migrationSchema = filterSchema(source.schema, Seq.empty)
// The implicit conversions flag allows any type to be merged from source to target if Spark
// SQL considers the source type implicitly castable to the target. Normally, mergeSchemas
// enforces Parquet-level write compatibility, which would mean an INT source can't be merged
// into a LONG target.
SchemaMergingUtils.mergeSchemas(target.schema, sourceSchema, allowImplicitConversions = true)
SchemaMergingUtils.mergeSchemas(
target.schema,
migrationSchema,
allowImplicitConversions = true)
} else {
target.schema
}
Expand All @@ -573,7 +616,7 @@ object DeltaMergeInto {
resolvedMatchedClauses,
resolvedNotMatchedClauses,
resolvedNotMatchedBySourceClauses,
migrateSchema = migrateSchema,
migrateSchema = canAutoMigrate,
finalSchema = Some(finalSchema))

// Its possible that pre-resolved expressions (e.g. `sourceDF("key") = targetDF("key")`) have
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,4 +535,13 @@ trait MergeIntoNotMatchedBySourceSuite extends MergeIntoSuiteBase {
(3, 31, null) // Not matched by source, updated
).toDF("key", "value", "extra"),
expectedWithoutEvolution = Seq((0, 0), (1, 1), (3, 31)).toDF("key", "value"))

// Migrating new column via WHEN NOT MATCHED BY SOURCE is not allowed.
testEvolution("update new column with not matched by source fails")(
targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"),
sourceData = Seq((1, 1, "extra3"), (2, 2, "extra2")).toDF("key", "value", "extra"),
clauses = updateNotMatched("extra = s.extra") :: Nil,
expectErrorContains = "cannot resolve extra in UPDATE clause",
expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause")

}
Original file line number Diff line number Diff line change
Expand Up @@ -2550,6 +2550,251 @@ abstract class MergeIntoSuiteBase
expectedWithoutEvolution = ((0, 0) +: (3, 30) +: (1, 1) +: Nil).toDF("key", "value")
)

// Schema evolution with UPDATE SET alone
testEvolution("new column with update set")(
targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"),
sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"),
clauses = update(set = "key = s.key, value = s.value, extra = s.extra") :: Nil,
expected = ((0, 0, null) +: (3, 30, null) +: (1, 1, "extra1") +: Nil)
.toDF("key", "value", "extra"),
expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause")

testEvolution("new column updated with value from existing column")(
targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"),
sourceData = Seq((1, 1, -1), (2, 2, -2))
.toDF("key", "value", "extra"),
clauses = update(set = "extra = s.value") :: Nil,
expected = ((0, 0, null) +: (1, 10, 1) +: (3, 30, null) +: Nil)
.asInstanceOf[List[(Integer, Integer, Integer)]]
.toDF("key", "value", "extra"),
expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause")

// Schema evolution with INSERT alone
testEvolution("new column with insert values")(
targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"),
sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"),
clauses = insert(values = "(key, value, extra) VALUES (s.key, s.value, s.extra)") :: Nil,
expected = ((0, 0, null) +: (1, 10, null) +: (3, 30, null) +: (2, 2, "extra2") +: Nil)
.toDF("key", "value", "extra"),
expectErrorWithoutEvolutionContains = "cannot resolve extra in INSERT clause")

testEvolution("new column inserted with value from existing column")(
targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"),
sourceData = Seq((1, 1, -1), (2, 2, -2))
.toDF("key", "value", "extra"),
clauses = insert(values = "(key, extra) VALUES (s.key, s.value)") :: Nil,
expected = ((0, 0, null) +: (1, 10, null) +: (3, 30, null) +: (2, null, 2) +: Nil)
.asInstanceOf[List[(Integer, Integer, Integer)]]
.toDF("key", "value", "extra"),
expectErrorWithoutEvolutionContains = "cannot resolve extra in INSERT clause")

// Schema evolution (UPDATE) with two new columns in the source but only one added to the target.
testEvolution("new column with update set and column not updated")(
targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"),
sourceData = Seq((1, 1, "extra1", "unused1"), (2, 2, "extra2", "unused2"))
.toDF("key", "value", "extra", "unused"),
clauses = update(set = "extra = s.extra") :: Nil,
expected = ((0, 0, null) +: (1, 10, "extra1") +: (3, 30, null) +: Nil)
.asInstanceOf[List[(Integer, Integer, String)]]
.toDF("key", "value", "extra"),
expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause")

testEvolution("new column updated from other new column")(
targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"),
sourceData = Seq((1, 1, "extra1", "unused1"), (2, 2, "extra2", "unused2"))
.toDF("key", "value", "extra", "unused"),
clauses = update(set = "extra = s.unused") :: Nil,
expected = ((0, 0, null) +: (1, 10, "unused1") +: (3, 30, null) +: Nil)
.asInstanceOf[List[(Integer, Integer, String)]]
.toDF("key", "value", "extra"),
expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause")

// Schema evolution (INSERT) with two new columns in the source but only one added to the target.
testEvolution("new column with insert values and column not inserted")(
targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"),
sourceData = Seq((1, 1, "extra1", "unused1"), (2, 2, "extra2", "unused2"))
.toDF("key", "value", "extra", "unused"),
clauses = insert(values = "(key, extra) VALUES (s.key, s.extra)") :: Nil,
expected = ((0, 0, null) +: (1, 10, null) +: (3, 30, null) +: (2, null, "extra2") +: Nil)
.asInstanceOf[List[(Integer, Integer, String)]]
.toDF("key", "value", "extra"),
expectErrorWithoutEvolutionContains = "cannot resolve extra in INSERT clause")

testEvolution("new column inserted from other new column")(
targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"),
sourceData = Seq((1, 1, "extra1", "unused1"), (2, 2, "extra2", "unused2"))
.toDF("key", "value", "extra", "unused"),
clauses = insert(values = "(key, extra) VALUES (s.key, s.unused)") :: Nil,
expected = ((0, 0, null) +: (1, 10, null) +: (3, 30, null) +: (2, null, "unused2") +: Nil)
.asInstanceOf[List[(Integer, Integer, String)]]
.toDF("key", "value", "extra"),
expectErrorWithoutEvolutionContains = "cannot resolve extra in INSERT clause")

// Schema evolution with two new columns added by UPDATE and INSERT resp.
testEvolution("new column added by insert and other new column added by update")(
targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"),
sourceData = Seq((1, 1, "extra1", "other1"), (2, 2, "extra2", "other2"))
.toDF("key", "value", "extra", "other"),
clauses = update(set = "extra = s.extra") ::
insert(values = "(key, other) VALUES (s.key, s.other)") :: Nil,
expected =
((0, 0, null, null) +:
(1, 10, "extra1", null) +:
(3, 30, null, null) +:
(2, null, null, "other2") +: Nil)
.asInstanceOf[List[(Integer, Integer, String, String)]]
.toDF("key", "value", "extra", "other"),
expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause")

// Nested Schema evolution with UPDATE alone
testNestedStructsEvolution("new nested source field added when updating top-level column")(
target = """{ "key": "A", "value": { "a": 1 }""",
source = """{ "key": "A", "value": { "a": 2, "b": 3 }""",
targetSchema = new StructType()
.add("key", StringType)
.add("value", new StructType()
.add("a", IntegerType)),
sourceSchema = new StructType()
.add("key", StringType)
.add("value", new StructType()
.add("a", IntegerType)
.add("b", IntegerType)),
clauses = update("value = s.value") :: Nil,
result = """{ "key": "A", "value": { "a": 2, "b": 3 }""",
resultSchema = new StructType()
.add("key", StringType)
.add("value", new StructType()
.add("a", IntegerType)
.add("b", IntegerType)),
expectErrorWithoutEvolutionContains = "Cannot cast")

testNestedStructsEvolution("new nested source field not in update is ignored")(
target = """{ "key": "A", "value": { "a": 1 }""",
source = """{ "key": "A", "value": { "a": 2, "b": 3 }""",
targetSchema = new StructType()
.add("key", StringType)
.add("value", new StructType()
.add("a", IntegerType)),
sourceSchema = new StructType()
.add("key", StringType)
.add("value", new StructType()
.add("a", IntegerType)
.add("b", IntegerType)),
clauses = update("value.a = s.value.a") :: Nil,
result = """{ "key": "A", "value": { "a": 2 }""",
resultWithoutEvolution = """{ "key": "A", "value": { "a": 2 }""")

testNestedStructsEvolution("two new nested source fields with update: one added, one ignored")(
target = """{ "key": "A", "value": { "a": 1 }""",
source = """{ "key": "A", "value": { "a": 2, "b": 3, "c": 4 }""",
targetSchema = new StructType()
.add("key", StringType)
.add("value", new StructType()
.add("a", IntegerType)),
sourceSchema = new StructType()
.add("key", StringType)
.add("value", new StructType()
.add("a", IntegerType)
.add("b", IntegerType)
.add("c", IntegerType)),
clauses = update("value.b = s.value.b") :: Nil,
result = """{ "key": "A", "value": { "a": 1, "b": 3 }""",
resultSchema = new StructType()
.add("key", StringType)
.add("value", new StructType()
.add("a", IntegerType)
.add("b", IntegerType)),
expectErrorWithoutEvolutionContains = "No such struct field")


// Nested Schema evolution with INSERT alone
testNestedStructsEvolution("new nested source field added when inserting top-level column")(
target = """{ "key": "A", "value": { "a": 1 }""",
source = """{ "key": "B", "value": { "a": 2, "b": 3 }""",
targetSchema = new StructType()
.add("key", StringType)
.add("value", new StructType()
.add("a", IntegerType)),
sourceSchema = new StructType()
.add("key", StringType)
.add("value", new StructType()
.add("a", IntegerType)
.add("b", IntegerType)),
clauses = insert("(value) VALUES (s.value)") :: Nil,
result =
"""{ "key": "A", "value": { "a": 1, "b": null }
{ "key": "B", "value": { "a": 2, "b": 3 }""".stripMargin,
resultSchema = new StructType()
.add("key", StringType)
.add("value", new StructType()
.add("a", IntegerType)
.add("b", IntegerType)),
expectErrorWithoutEvolutionContains = "Cannot cast")

testNestedStructsEvolution("insert new nested source field not supported")(
target = """{ "key": "A", "value": { "a": 1 }""",
source = """{ "key": "A", "value": { "a": 2, "b": 3, "c": 4 }""",
targetSchema = new StructType()
.add("key", StringType)
.add("value", new StructType()
.add("a", IntegerType)),
sourceSchema = new StructType()
.add("key", StringType)
.add("value", new StructType()
.add("a", IntegerType)
.add("b", IntegerType)
.add("c", IntegerType)),
clauses = insert("(value.b) VALUES (s.value.b)") :: Nil,
expectErrorContains = "Nested field is not supported in the INSERT clause of MERGE operation",
expectErrorWithoutEvolutionContains = "No such struct field")

// No schema evolution
testEvolution("old column updated from new column")(
targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"),
sourceData = Seq((1, 1, -1), (2, 2, -2))
.toDF("key", "value", "extra"),
clauses = update(set = "value = s.extra") :: Nil,
expected = ((0, 0) +: (1, -1) +: (3, 30) +: Nil).toDF("key", "value"),
expectedWithoutEvolution = ((0, 0) +: (1, -1) +: (3, 30) +: Nil).toDF("key", "value"))

testEvolution("old column inserted from new column")(
targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"),
sourceData = Seq((1, 1, -1), (2, 2, -2))
.toDF("key", "value", "extra"),
clauses = insert(values = "(key) VALUES (s.extra)") :: Nil,
expected = ((0, 0) +: (1, 10) +: (3, 30) +: (-2, null) +: Nil)
.asInstanceOf[List[(Integer, Integer)]]
.toDF("key", "value"),
expectedWithoutEvolution = ((0, 0) +: (1, 10) +: (3, 30) +: (-2, null) +: Nil)
.asInstanceOf[List[(Integer, Integer)]]
.toDF("key", "value"))

testEvolution("new column with insert existing column")(
targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"),
sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"),
clauses = insert(values = "(key) VALUES (s.key)") :: Nil,
expected = ((0, 0) +: (1, 10) +: (2, null) +: (3, 30) +: Nil)
.asInstanceOf[List[(Integer, Integer)]]
.toDF("key", "value"),
expectedWithoutEvolution = ((0, 0) +: (1, 10) +: (2, null) +: (3, 30) +: Nil)
.asInstanceOf[List[(Integer, Integer)]]
.toDF("key", "value"))

// Column doesn't exist with UPDATE/INSERT alone.
testEvolution("update set nonexistent column")(
targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"),
sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"),
clauses = update(set = "nonexistent = s.extra") :: Nil,
expectErrorContains = "cannot resolve nonexistent in UPDATE clause",
expectErrorWithoutEvolutionContains = "cannot resolve nonexistent in UPDATE clause")

testEvolution("insert values nonexistent column")(
targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"),
sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"),
clauses = insert(values = "(nonexistent) VALUES (s.extra)") :: Nil,
expectErrorContains = "cannot resolve nonexistent in INSERT clause",
expectErrorWithoutEvolutionContains = "cannot resolve nonexistent in INSERT clause")

testEvolution("new column with update set and update *")(
targetData = Seq((0, 0), (1, 10), (2, 20)).toDF("key", "value"),
sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"),
Expand Down

0 comments on commit affd577

Please sign in to comment.