Skip to content

Commit

Permalink
Fix a bug in Delta source that could miss read compatibility checks i…
Browse files Browse the repository at this point in the history
…n some cases.

Added check with stream start schema to detect an edge case in which we only have one schema change that looks exactly like the stream source schema. Enhanced `isReadCompatible` with option to determine the correct nullability conversion fallback.
GitOrigin-RevId: 6fd79e5703da9ead16fc54e717a156e1b4f15fdd
  • Loading branch information
jackierwzhang authored and allisonport-db committed Dec 15, 2022
1 parent 7fa42c3 commit 4e51a99
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 79 deletions.
12 changes: 6 additions & 6 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1340,9 +1340,9 @@
"DELTA_SCHEMA_CHANGED" : {
"message" : [
"Detected schema change:",
"old schema: <oldSchema>",
"streaming source schema: <readSchema>",
"",
"new schema: <newSchema>",
"data file schema: <dataSchema>",
"",
"Please try restarting the query. If this issue repeats across query restarts without",
"making progress, you have made an incompatible schema change and need to start your",
Expand All @@ -1354,9 +1354,9 @@
"DELTA_SCHEMA_CHANGED_WITH_STARTING_OPTIONS" : {
"message" : [
"Detected schema change in version <version>:",
"old schema: <oldSchema>",
"streaming source schema: <readSchema>",
"",
"new schema: <newSchema>",
"data file schema: <dataSchema>",
"",
"Please try restarting the query. If this issue repeats across query restarts without",
"making progress, you have made an incompatible schema change and need to start your",
Expand All @@ -1371,9 +1371,9 @@
"DELTA_SCHEMA_CHANGED_WITH_VERSION" : {
"message" : [
"Detected schema change in version <version>:",
"old schema: <oldSchema>",
"streaming source schema: <readSchema>",
"",
"new schema: <newSchema>",
"data file schema: <dataSchema>",
"",
"Please try restarting the query. If this issue repeats across query restarts without",
"making progress, you have made an incompatible schema change and need to start your",
Expand Down
21 changes: 11 additions & 10 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -921,8 +921,8 @@ trait DeltaErrorsBase


def schemaChangedException(
oldSchema: StructType,
newSchema: StructType,
readSchema: StructType,
dataSchema: StructType,
retryable: Boolean,
version: Option[Long],
includeStartingVersionOrTimestampMessage: Boolean): Throwable = {
Expand All @@ -932,20 +932,20 @@ trait DeltaErrorsBase

if (version.isEmpty) {
newException("DELTA_SCHEMA_CHANGED", Array(
formatSchema(oldSchema),
formatSchema(newSchema)
formatSchema(readSchema),
formatSchema(dataSchema)
))
} else if (!includeStartingVersionOrTimestampMessage) {
newException("DELTA_SCHEMA_CHANGED_WITH_VERSION", Array(
version.get.toString,
formatSchema(oldSchema),
formatSchema(newSchema)
formatSchema(readSchema),
formatSchema(dataSchema)
))
} else {
newException("DELTA_SCHEMA_CHANGED_WITH_STARTING_OPTIONS", Array(
version.get.toString,
formatSchema(oldSchema),
formatSchema(newSchema),
formatSchema(readSchema),
formatSchema(dataSchema),
version.get.toString
))
}
Expand Down Expand Up @@ -2505,7 +2505,7 @@ trait DeltaErrorsBase
if (isCdfRead) "Streaming read of Change Data Feed (CDF)" else "Streaming read",
readSchema,
incompatibleSchema,
DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key,
DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES.key,
additionalProperties = Map(
"detectedDuringStreaming" -> detectedDuringStreaming.toString
))
Expand All @@ -2514,7 +2514,8 @@ trait DeltaErrorsBase
def failedToGetSnapshotDuringColumnMappingStreamingReadCheck(cause: Throwable): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_STREAM_CHECK_COLUMN_MAPPING_NO_SNAPSHOT",
Array(DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key),
Array(DeltaSQLConf
.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES.key),
Some(cause))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,51 +230,73 @@ object SchemaUtils extends DeltaLogging {
* As the Delta snapshots update, the schema may change as well. This method defines whether the
* new schema of a Delta table can be used with a previously analyzed LogicalPlan. Our
* rules are to return false if:
* - Dropping any column that was present in the DataFrame schema
* - Converting nullable=false to nullable=true for any column
* - Dropping any column that was present in the existing schema
* - Any change of datatype
* - If `forbidTightenNullability` = true:
* - Forbids tightening the nullability (existing nullable=true -> read nullable=false)
* - Typically Used when the existing schema refers to the schema of written data, such as
* when a Delta streaming source reads a schema change (existingSchema) which
* has nullable=true, using the latest schema which has nullable=false, so we should not
* project nulls from the data into the non-nullable read schema.
* - Otherwise:
* - Forbids relaxing the nullability (existing nullable=false -> read nullable=true)
* - Typically Used when the read schema refers to the schema of written data, such as during
* Delta scan, the latest schema during execution (readSchema) has nullable=true but during
* analysis phase the schema (existingSchema) was nullable=false, so we should not project
* nulls from the later data onto a non-nullable schema analyzed in the past.
*/
def isReadCompatible(existingSchema: StructType, readSchema: StructType): Boolean = {
def isReadCompatible(
existingSchema: StructType,
readSchema: StructType,
forbidTightenNullability: Boolean = false): Boolean = {

def isNullabilityCompatible(existingNullable: Boolean, readNullable: Boolean): Boolean = {
if (forbidTightenNullability) {
readNullable || !existingNullable
} else {
existingNullable || !readNullable
}
}

def isDatatypeReadCompatible(existing: DataType, newtype: DataType): Boolean = {
(existing, newtype) match {
case (e: StructType, n: StructType) =>
isReadCompatible(e, n)
isReadCompatible(e, n, forbidTightenNullability)
case (e: ArrayType, n: ArrayType) =>
// if existing elements are non-nullable, so should be the new element
(e.containsNull || !n.containsNull) &&
isNullabilityCompatible(e.containsNull, n.containsNull) &&
isDatatypeReadCompatible(e.elementType, n.elementType)
case (e: MapType, n: MapType) =>
// if existing value is non-nullable, so should be the new value
(e.valueContainsNull || !n.valueContainsNull) &&
isNullabilityCompatible(e.valueContainsNull, n.valueContainsNull) &&
isDatatypeReadCompatible(e.keyType, n.keyType) &&
isDatatypeReadCompatible(e.valueType, n.valueType)
case (a, b) => a == b
}
}

def isStructReadCompatible(existing: StructType, newtype: StructType): Boolean = {
val existing = toFieldMap(existingSchema)
val existingFields = toFieldMap(existing)
// scalastyle:off caselocale
val existingFieldNames = existingSchema.fieldNames.map(_.toLowerCase).toSet
assert(existingFieldNames.size == existingSchema.length,
val existingFieldNames = existing.fieldNames.map(_.toLowerCase).toSet
assert(existingFieldNames.size == existing.length,
"Delta tables don't allow field names that only differ by case")
val newFields = readSchema.fieldNames.map(_.toLowerCase).toSet
assert(newFields.size == readSchema.length,
val newFields = newtype.fieldNames.map(_.toLowerCase).toSet
assert(newFields.size == newtype.length,
"Delta tables don't allow field names that only differ by case")
// scalastyle:on caselocale

if (!existingFieldNames.subsetOf(newFields)) {
// Dropped a column that was present in the DataFrame schema
return false
}
readSchema.forall { newField =>
newtype.forall { newField =>
// new fields are fine, they just won't be returned
existing.get(newField.name).forall { existingField =>
existingFields.get(newField.name).forall { existingField =>
// we know the name matches modulo case - now verify exact match
(existingField.name == newField.name
// if existing value is non-nullable, so should be the new value
&& (existingField.nullable || !newField.nullable)
&& isNullabilityCompatible(existingField.nullable, newField.nullable)
// and the type of the field must be compatible, too
&& isDatatypeReadCompatible(existingField.dataType, newField.dataType))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,8 +881,8 @@ trait DeltaSQLConfBase {
.createWithDefault(false)
}

val DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES =
buildConf("streaming.unsafeReadOnIncompatibleSchemaChanges.enabled")
val DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES =
buildConf("streaming.unsafeReadOnIncompatibleColumnMappingSchemaChanges.enabled")
.doc(
"Streaming read on Delta table with column mapping schema operations " +
"(e.g. rename or drop column) is currently blocked due to potential data loss and " +
Expand All @@ -892,6 +892,29 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)


val DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES_DURING_STREAM_SATRT =
buildConf("streaming.unsafeReadOnIncompatibleSchemaChangesDuringStreamStart.enabled")
.doc(
"""A legacy config to disable schema read-compatibility check on the start version schema
|when starting a streaming query. The config is added to allow legacy problematic queries
|disabling the check to keep running if users accept the potential risks of incompatible
|schema reading.""".stripMargin)
.internal()
.booleanConf
.createWithDefault(false)

val DELTA_STREAM_UNSAFE_READ_ON_NULLABILITY_CHANGE =
buildConf("streaming.unsafeReadOnNullabilityChange.enabled")
.doc(
"""A legacy config to disable unsafe nullability check. The config is added to allow legacy
|problematic queries disabling the check to keep running if users accept the potential
|risks of incompatible schema reading.""".stripMargin)
.internal()
.booleanConf
.createWithDefault(false)


val DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES =
buildConf("changeDataFeed.unsafeBatchReadOnIncompatibleSchemaChanges.enabled")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.io.FileNotFoundException
import java.sql.Timestamp

import scala.collection.mutable
import scala.util.{Success, Try}
import scala.util.control.NonFatal

import scala.util.matching.Regex

import org.apache.spark.sql.delta._
Expand Down Expand Up @@ -111,14 +111,31 @@ trait DeltaSourceBase extends Source
* Flag that allows user to force enable unsafe streaming read on Delta table with
* column mapping enabled AND drop/rename actions.
*/
protected lazy val forceEnableStreamingRead: Boolean = spark.sessionState.conf
.getConf(DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES)
protected lazy val forceEnableStreamingReadOnColumnMappingSchemaChanges: Boolean =
spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES)

/**
* Flag that allows user to disable the read-compatibility check during stream start which
* protects against an corner case in which verifyStreamHygiene could not detect.
* This is a bug fix but yet a potential behavior change, so we add a flag to fallback.
*/
protected lazy val forceEnableStreamingReadOnReadIncompatibleSchemaChangesDuringStreamStart =
spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES_DURING_STREAM_SATRT)

/**
* Flag that allow user to fallback to the legacy behavior in which user can allow nullable=false
* schema to read nullable=true data, which is incorrect but a behavior change regardless.
*/
protected lazy val forceEnableUnsafeReadOnNullabilityChange =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STREAM_UNSAFE_READ_ON_NULLABILITY_CHANGE)

/**
* A global flag to mark whether we have done a per-stream start check for column mapping
* schema changes (rename / drop).
*/
protected var hasCheckedColumnMappingChangesOnStreamStart: Boolean = false
protected var hasCheckedReadIncompatibleSchemaChangesOnStreamStart: Boolean = false

override val schema: StructType = {
val schemaWithoutCDC =
Expand Down Expand Up @@ -274,7 +291,7 @@ trait DeltaSourceBase extends Source
} else {
// Block latestOffset() from generating an invalid offset by proactively verifying
// incompatible schema changes under column mapping. See more details in the method doc.
checkColumnMappingSchemaChangesOnStreamStartOnce(fromVersion)
checkReadIncompatibleSchemaChangeOnStreamStartOnce(fromVersion)
buildOffsetFromIndexedFile(lastFileChange.get, fromVersion, isStartingVersion)
}
}
Expand All @@ -297,7 +314,7 @@ trait DeltaSourceBase extends Source
} else {
// Similarly, block latestOffset() from generating an invalid offset by proactively verifying
// incompatible schema changes under column mapping. See more details in the method doc.
checkColumnMappingSchemaChangesOnStreamStartOnce(previousOffset.reservoirVersion)
checkReadIncompatibleSchemaChangeOnStreamStartOnce(previousOffset.reservoirVersion)
buildOffsetFromIndexedFile(lastFileChange.get, previousOffset.reservoirVersion,
previousOffset.isStartingVersion)
}
Expand Down Expand Up @@ -354,7 +371,7 @@ trait DeltaSourceBase extends Source
}

/**
* Check column mapping changes during stream (re)start so we could fail fast.
* Check read-incompatible schema changes during stream (re)start so we could fail fast.
*
* This only needs to be called ONCE in the life cycle of a stream, either at the very first
* latestOffset, or the very first getBatch to make sure we have detected an incompatible
Expand All @@ -364,18 +381,19 @@ trait DeltaSourceBase extends Source
* 1. User starts a new stream @ startingVersion 1
* 2. latestOffset is called before getBatch() because there was no previous commits so
* getBatch won't be called as a recovery mechanism.
* Suppose there's a single rename/drop column change S during computing next offset, S
* Suppose there's a single rename/drop/nullability change S during computing next offset, S
* would look exactly the same as the latest schema so verifyStreamHygiene would not work.
* 3. latestOffset would return this new offset cross the schema boundary.
*
* TODO: unblock this after we roll out the proper semantics.
* TODO: unblock the column mapping side after we roll out the proper semantics.
*/
protected def checkColumnMappingSchemaChangesOnStreamStartOnce(startVersion: Long): Unit = {
if (hasCheckedColumnMappingChangesOnStreamStart) {
protected def checkReadIncompatibleSchemaChangeOnStreamStartOnce(startVersion: Long): Unit = {
if (hasCheckedReadIncompatibleSchemaChangesOnStreamStart) {
return
}
// Column-mapping related schema changes, all of them should not be retryable
if (snapshotAtSourceInit.metadata.columnMappingMode != NoMapping &&
!forceEnableStreamingRead) {
!forceEnableStreamingReadOnColumnMappingSchemaChanges) {

val snapshotAtStartVersionToScan = try {
getSnapshotFromDeltaLog(startVersion)
Expand All @@ -397,8 +415,18 @@ trait DeltaSourceBase extends Source
)
}
}

if (!forceEnableStreamingReadOnReadIncompatibleSchemaChangesDuringStreamStart) {
// Other read-incompatible schema changes, some of them can be retryable
Try(getSnapshotFromDeltaLog(startVersion)) match {
case Success(startSnapshot) =>
verifySchemaChange(startSnapshot.schema, startSnapshot.version)
case _ =>
// Don't regress and fail if we could not calculate such snapshot.
}
}
// Mark as checked
hasCheckedColumnMappingChangesOnStreamStart = true
hasCheckedReadIncompatibleSchemaChangesOnStreamStart = true
}

/**
Expand All @@ -422,7 +450,8 @@ trait DeltaSourceBase extends Source

val metadataAtSourceInit = snapshotAtSourceInit.metadata

if (metadataAtSourceInit.columnMappingMode != NoMapping && !forceEnableStreamingRead) {
if (metadataAtSourceInit.columnMappingMode != NoMapping &&
!forceEnableStreamingReadOnColumnMappingSchemaChanges) {
if (curVersion < snapshotAtSourceInit.version) {
// Stream version is newer, ensure there's no column mapping schema changes
// from cur -> stream.
Expand Down Expand Up @@ -674,15 +703,30 @@ case class DeltaSource(
* Verify whether the schema change in `version` is safe to continue. If not, throw an exception
* to fail the query.
*/
protected def verifySchemaChange(newSchema: StructType, version: Long): Unit = {
// There is a schema change. All of files after this commit will use `newSchema`. Hence, we
protected def verifySchemaChange(schemaChange: StructType, version: Long): Unit = {
// There is a schema change. All of files after this commit will use `schemaChange`. Hence, we
// check whether we can use `schema` (the fixed source schema we use in the same run of the
// query) to read these new files safely.
if (!SchemaUtils.isReadCompatible(newSchema, schema)) {
val retryable = SchemaUtils.isReadCompatible(schema, newSchema)
val backfilling = version < snapshotAtSourceInit.version
// We forbid the case when the the schemaChange is nullable while the read schema is NOT
// nullable, or in other words, `schema` should not tighten nullability from `schemaChange`,
// because we don't ever want to read back any nulls when the read schema is non-nullable.
val shouldForbidTightenNullability = !forceEnableUnsafeReadOnNullabilityChange
if (!SchemaUtils.isReadCompatible(
schemaChange, schema, forbidTightenNullability = shouldForbidTightenNullability)) {
// Only schema change later than the current read snapshot/schema can be retried, in other
// words, backfills could never be retryable, because we have no way to refresh
// the latest schema to "catch up" when the schema change happens before than current read
// schema version.
// If not backfilling, we do another check to determine retryability, in which we assume that
// we will be reading using this later `schemaChange` back on the current outdated `schema`,
// and if it works (including that `schemaChange` should not tighten the nullability
// constraint from `schema`), it is a retryable exception.
val retryable = !backfilling && SchemaUtils.isReadCompatible(
schema, schemaChange, forbidTightenNullability = shouldForbidTightenNullability)
throw DeltaErrors.schemaChangedException(
schema,
newSchema,
schemaChange,
retryable = retryable,
Some(version),
includeStartingVersionOrTimestampMessage = options.containsStartingVersionOrTimestamp)
Expand Down Expand Up @@ -843,7 +887,7 @@ case class DeltaSource(
// Check for column mapping + streaming incompatible schema changes
// Note for initial snapshot, the startVersion should be the same as the latestOffset's version
// and therefore this check won't have any effect.
checkColumnMappingSchemaChangesOnStreamStartOnce(startVersion)
checkReadIncompatibleSchemaChangeOnStreamStartOnce(startVersion)

val createdDf = createDataFrameBetweenOffsets(startVersion, startIndex, isStartingVersion,
startSourceVersion, startOffsetOption, endOffset)
Expand Down
Loading

0 comments on commit 4e51a99

Please sign in to comment.