Skip to content

Commit

Permalink
FileNotFoundException does not imply InitialSnapshot
Browse files Browse the repository at this point in the history
This PR changes Delta snapshot management and file listing code to return options, with `None` meaning the directory was empty or missing. Otherwise, they return `Some(logSegment)` -- possibly with an empty file list, if the search found no usable commit files. That way, we can reliably distinguish a truly empty/missing Delta table from one whose log files are corrupted or missing in a way that prevents snapshot construction. The former should produce an `InitialSnapshot` while the latter should propagate an error.

Previously, Delta snapshot management code made the unsafe assumption that `FileNotFoundException` always necessarily meant the directory was empty, and several code locations caught the exception in order to create an `InitialSnapshot` that designates an empty table. This led to an awkward and brittle design, where code had to either avoid throwing `FileNotFoundException` -- even if the problem was, in fact, a file not found... or else catch and wrap the exception to ensure it propagated past the catch clauses that would wrongly create an `InitialSnapshot`.

Existing unit tests cover this code.

GitOrigin-RevId: 6d4330b43cdfa11f69f64ee3849eab9192c9b268
  • Loading branch information
ryan-johnson-databricks authored and scottsand-db committed Mar 3, 2022
1 parent 01ce4f5 commit 582bc95
Showing 1 changed file with 69 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ trait SnapshotManagement { self: DeltaLog =>

/**
* Get the LogSegment that will help in computing the Snapshot of the table at DeltaLog
* initialization.
* initialization, or None if the directory was empty/missing.
*
* @param startingCheckpoint A checkpoint that we can start our listing from
*/
protected def getLogSegmentFrom(
startingCheckpoint: Option[CheckpointMetaData]): LogSegment = {
startingCheckpoint: Option[CheckpointMetaData]): Option[LogSegment] = {
getLogSegmentForVersion(startingCheckpoint.map(_.version))
}

Expand All @@ -81,11 +81,22 @@ trait SnapshotManagement { self: DeltaLog =>
*
* @param startVersion the version to start. Inclusive.
* @param versionToLoad the optional parameter to set the max version we should return. Inclusive.
* @return Some array of files found (possibly empty, if no usable commit files are present), or
* None if the listing returned no files at all.
*/
private final def listDeltaAndCheckpointFiles(
startVersion: Long,
versionToLoad: Option[Long]): Array[FileStatus] = {
listFrom(startVersion)
versionToLoad: Option[Long]): Option[Array[FileStatus]] = {
// LIST the directory, starting from the provided lower bound (treat missing dir as empty).
// NOTE: "empty/missing" is _NOT_ equivalent to "contains no useful commit files."
val listing = try {
listFrom(startVersion)
} catch {
case _: FileNotFoundException => Iterator.empty
}
if (listing.isEmpty) return None

val files = listing
// Pick up all checkpoint and delta files
.filter { file => isDeltaCommitOrCheckpointFile(file.getPath) }
// Checkpoint files of 0 size are invalid but Spark will ignore them silently when reading
Expand All @@ -94,6 +105,9 @@ trait SnapshotManagement { self: DeltaLog =>
// take files until the version we want to load
.takeWhile(f => versionToLoad.forall(v => getFileVersion(f.getPath) <= v))
.toArray

// NOTE: The file list may be empty if the listing returned no usable files.
Some(files)
}

/**
Expand All @@ -109,16 +123,32 @@ trait SnapshotManagement { self: DeltaLog =>
* Delta streaming source. If not provided, we will try to load the latest
* version of the table.
* @return Some LogSegment to build a Snapshot if files do exist after the given
* startCheckpoint. None, if there are no new files after `startCheckpoint`.
* startCheckpoint. None, if the directory was missing or empty.
*/
protected def getLogSegmentForVersion(
startCheckpoint: Option[Long],
versionToLoad: Option[Long] = None): LogSegment = {
versionToLoad: Option[Long] = None): Option[LogSegment] = {
recordFrameProfile("Delta", "SnapshotManagement.getLogSegmentForVersion") {
// List from the starting checkpoint. If a checkpoint doesn't exist, this will still return
// deltaVersion=0.
val newFiles = listDeltaAndCheckpointFiles(startCheckpoint.getOrElse(0L), versionToLoad)
.getOrElse {
// No files found even when listing from 0 => empty directory => table does not exist yet.
if (startCheckpoint.isEmpty) return None

// [SC-95011] FIXME(ryan.johnson): We always write the commit and checkpoint files
// before updating _last_checkpoint. If the listing came up empty, then we either
// encountered a list-after-put inconsistency in the underlying log store, or somebody
// corrupted the table by deleting files. Either way, we can't safely continue.
//
// For now, we preserve existing behavior by returning Array.empty, which will trigger a
// recursive call to [[getLogSegmentForVersion]] below (same as before the refactor).
Array.empty[FileStatus]
}

if (newFiles.isEmpty && startCheckpoint.isEmpty) {
// We can't construct a snapshot because the directory contained no usable commit
// files... but we can't return None either, because it was not truly empty.
throw DeltaErrors.emptyDirectoryException(logPath.toString)
} else if (newFiles.isEmpty) {
// The directory may be deleted and recreated and we may have stale state in our DeltaLog
Expand All @@ -135,19 +165,22 @@ trait SnapshotManagement { self: DeltaLog =>
val newCheckpointVersion = newCheckpoint.map(_.version).getOrElse {
// If we do not have any checkpoint, pass new checkpoint version as -1 so that first
// delta version can be 0.
if (startCheckpoint.isDefined) {
startCheckpoint.foreach { startCheckpoint =>
// `startCheckpoint` was given but no checkpoint found on delta log. This means that the
// last checkpoint we thought should exist (the `_last_checkpoint` file) no longer exists.
// Try to look up another valid checkpoint and create `LogSegment` from it.
//
// [SC-95011] FIXME(ryan.johnson): Something has gone very wrong if the checkpoint doesn't
// exist at all. This code should only handle rejected incomplete checkpoints.
recordDeltaEvent(this, "delta.checkpoint.error.partial")
val alternativeLogSegment = getLogSegmentWithMaxExclusiveCheckpointVersion(
snapshotVersion = versionToLoad.getOrElse(deltaVersion(deltas.last.getPath)),
startCheckpoint.get)
return alternativeLogSegment.getOrElse {
throw DeltaErrors.missingPartFilesException(
startCheckpoint.get, new FileNotFoundException(
s"Checkpoint file to load version: ${startCheckpoint.get} is missing."))
}
val snapshotVersion = versionToLoad.getOrElse(deltaVersion(deltas.last.getPath))
getLogSegmentWithMaxExclusiveCheckpointVersion(snapshotVersion, startCheckpoint)
.foreach { alternativeLogSegment => return Some(alternativeLogSegment) }

// No alternative found, but the directory contains files so we cannot return None.
throw DeltaErrors.missingPartFilesException(
startCheckpoint, new FileNotFoundException(
s"Checkpoint file to load version: $startCheckpoint is missing."))
}
-1L
}
Expand Down Expand Up @@ -190,13 +223,13 @@ trait SnapshotManagement { self: DeltaLog =>
}
val lastCommitTimestamp = deltas.last.getModificationTime

LogSegment(
Some(LogSegment(
logPath,
newVersion,
deltasAfterCheckpoint,
newCheckpointFiles,
newCheckpoint.map(_.version),
lastCommitTimestamp)
lastCommitTimestamp))
}
}

Expand All @@ -207,8 +240,7 @@ trait SnapshotManagement { self: DeltaLog =>
*/
protected def getSnapshotAtInit: Snapshot = {
recordFrameProfile("Delta", "SnapshotManagement.getSnapshotAtInit") {
try {
val segment = getLogSegmentFrom(lastCheckpoint)
getLogSegmentFrom(lastCheckpoint).map { segment =>
val startCheckpoint = segment.checkpointVersionOpt
.map(v => s" starting from checkpoint $v.").getOrElse(".")
logInfo(s"Loading version ${segment.version}$startCheckpoint")
Expand All @@ -217,11 +249,9 @@ trait SnapshotManagement { self: DeltaLog =>
lastUpdateTimestamp = clock.getTimeMillis()
logInfo(s"Returning initial snapshot $snapshot")
snapshot
} catch {
case e: FileNotFoundException =>
logInfo(s"Creating initial snapshot without metadata, because the directory is empty")
// The log directory may not exist
new InitialSnapshot(logPath, this)
}.getOrElse {
logInfo(s"Creating initial snapshot without metadata, because the directory is empty")
new InitialSnapshot(logPath, this)
}
}
}
Expand Down Expand Up @@ -272,6 +302,7 @@ trait SnapshotManagement { self: DeltaLog =>
val filesSinceCheckpointVersion = listDeltaAndCheckpointFiles(
startVersion = cp.version,
versionToLoad = Some(snapshotVersion))
.getOrElse(Array.empty)
val (checkpoints, deltas) =
filesSinceCheckpointVersion.partition(f => isCheckpointFile(f.getPath))
if (deltas.isEmpty) {
Expand Down Expand Up @@ -315,6 +346,7 @@ trait SnapshotManagement { self: DeltaLog =>
case None =>
val deltas =
listDeltaAndCheckpointFiles(startVersion = 0, versionToLoad = Some(snapshotVersion))
.getOrElse(Array.empty)
.filter(file => isDeltaFile(file.getPath))
val deltaVersions = deltas.map(f => deltaVersion(f.getPath))
try {
Expand Down Expand Up @@ -438,8 +470,8 @@ trait SnapshotManagement { self: DeltaLog =>
*/
protected def updateInternal(isAsync: Boolean): Snapshot =
recordDeltaOperation(this, "delta.log.update", Map(TAG_ASYNC -> isAsync.toString)) {
try {
val segment = getLogSegmentForVersion(currentSnapshot.logSegment.checkpointVersionOpt)
val segmentOpt = getLogSegmentForVersion(currentSnapshot.logSegment.checkpointVersionOpt)
val newSnapshot = segmentOpt.map { segment =>
if (segment == currentSnapshot.logSegment) {
// Exit early if there is no new file
lastUpdateTimestamp = clock.getTimeMillis()
Expand All @@ -464,17 +496,13 @@ trait SnapshotManagement { self: DeltaLog =>
"nextSnapshotMetadata" -> newSnapshot.metadata))
}

replaceSnapshot(newSnapshot)
logInfo(s"Updated snapshot to $newSnapshot")
} catch {
case e: FileNotFoundException =>
if (Option(e.getMessage).exists(_.contains("reconstruct state at version"))) {
throw e
}
val message = s"No delta log found for the Delta table at $logPath"
logInfo(message)
replaceSnapshot(new InitialSnapshot(logPath, this))
newSnapshot
}.getOrElse {
logInfo(s"No delta log found for the Delta table at $logPath")
new InitialSnapshot(logPath, this)
}
replaceSnapshot(newSnapshot)
lastUpdateTimestamp = clock.getTimeMillis()
currentSnapshot
}
Expand All @@ -501,9 +529,12 @@ trait SnapshotManagement { self: DeltaLog =>
// Do not use the hint if the version we're asking for is smaller than the last checkpoint hint
val startingCheckpoint = lastCheckpointHint.collect { case ci if ci.version <= version => ci }
.orElse(findLastCompleteCheckpoint(CheckpointInstance(version, None)))
val segment = getLogSegmentForVersion(startingCheckpoint.map(_.version), Some(version))

createSnapshot(segment, minFileRetentionTimestamp)
getLogSegmentForVersion(startingCheckpoint.map(_.version), Some(version)).map { segment =>
createSnapshot(segment, minFileRetentionTimestamp)
}.getOrElse {
// We can't return InitialSnapshot because our caller asked for a specific snapshot version.
throw DeltaErrors.emptyDirectoryException(logPath.toString)
}
}
}

Expand Down

0 comments on commit 582bc95

Please sign in to comment.