Skip to content

Commit

Permalink
[SPARK-48886][SS] Add version info to changelog v2 to allow for easie…
Browse files Browse the repository at this point in the history
…r evolution

### What changes were proposed in this pull request?
Add version info to changelog v2 to allow for easier evolution

### Why are the changes needed?
Currently the changelog file format does not add the version info. With format v2, we propose to add this to the changelog file itself to make future evolution easier.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Augmented unit tests
```
===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.RocksDBSuite, threads: ForkJoinPool.commonPool-worker-6 (daemon=true), ForkJoinPool.commonPool-worker-4 (daemon=true), ForkJoinPool.commonPool-worker-7 (daemon=true), ForkJoinPool.commonPool-worker-5 (daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), rpc-boss-3-1 (daemon=true), ForkJoinPool.commonPool-worker-8 (daemon=true), shuffle-boss-6-1 (daemon=true), ForkJoinPool.commonPool-worker-1 (daemon=true), ForkJoinPool.common...
[info] Run completed in 4 minutes, 23 seconds.
[info] Total number of tests run: 176
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 176, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47336 from anishshri-db/task/SPARK-48886.

Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
anishshri-db authored and HeartSaVioR committed Jul 15, 2024
1 parent 40899c1 commit 5ff6a52
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,15 @@ class RocksDBFileManager(
fileMappings
}

private def getChangelogVersion(useColumnFamilies: Boolean): Short = {
val changelogVersion: Short = if (useColumnFamilies) {
2
} else {
1
}
changelogVersion
}

def getChangeLogWriter(
version: Long,
useColumnFamilies: Boolean = false): StateStoreChangelogWriter = {
Expand All @@ -180,10 +189,16 @@ class RocksDBFileManager(
if (!fm.exists(rootDir)) fm.mkdirs(rootDir)
rootDirChecked = true
}
val changelogWriter = if (useColumnFamilies) {
new StateStoreChangelogWriterV2(fm, changelogFile, codec)
} else {
new StateStoreChangelogWriterV1(fm, changelogFile, codec)

val changelogVersion = getChangelogVersion(useColumnFamilies)
val changelogWriter = changelogVersion match {
case 1 =>
new StateStoreChangelogWriterV1(fm, changelogFile, codec)
case 2 =>
new StateStoreChangelogWriterV2(fm, changelogFile, codec)
case _ =>
throw new IllegalArgumentException(s"Failed to find changelog writer for " +
s"version=$changelogVersion")
}
changelogWriter
}
Expand All @@ -193,11 +208,23 @@ class RocksDBFileManager(
version: Long,
useColumnFamilies: Boolean = false): StateStoreChangelogReader = {
val changelogFile = dfsChangelogFile(version)
if (useColumnFamilies) {
new StateStoreChangelogReaderV2(fm, changelogFile, codec)
} else {
new StateStoreChangelogReaderV1(fm, changelogFile, codec)

// Note that ideally we should get the version for the reader from the
// changelog itself. However, since we don't record this for v1, we need to
// rely on external arguments to make this call today. Within the reader, we verify
// for the correctness of the decided/expected version. We might revisit this pattern
// as we add more changelog versions in the future.
val changelogVersion = getChangelogVersion(useColumnFamilies)
val changelogReader = changelogVersion match {
case 1 =>
new StateStoreChangelogReaderV1(fm, changelogFile, codec)
case 2 =>
new StateStoreChangelogReaderV2(fm, changelogFile, codec)
case _ =>
throw new IllegalArgumentException(s"Failed to find changelog reader for " +
s"version=$changelogVersion")
}
changelogReader
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,16 @@ abstract class StateStoreChangelogWriter(
new DataOutputStream(compressed)
}

protected def writeVersion(): Unit = {
compressedStream.writeUTF(s"v${version}")
}

protected var backingFileStream: CancellableFSDataOutputStream =
fm.createAtomic(file, overwriteIfPossible = true)
protected var compressedStream: DataOutputStream = compressStream(backingFileStream)

def version: Short

def put(key: Array[Byte], value: Array[Byte]): Unit

def delete(key: Array[Byte]): Unit
Expand Down Expand Up @@ -140,6 +146,9 @@ class StateStoreChangelogWriterV1(
compressionCodec: CompressionCodec)
extends StateStoreChangelogWriter(fm, file, compressionCodec) {

// Note that v1 does not record this value in the changelog file
override def version: Short = 1

override def put(key: Array[Byte], value: Array[Byte]): Unit = {
assert(compressedStream != null)
compressedStream.writeInt(key.size)
Expand Down Expand Up @@ -180,20 +189,25 @@ class StateStoreChangelogWriterV1(

/**
* Write changes to the key value state store instance to a changelog file.
* There are 2 types of data records, put and delete.
* A put record is written as: | record type | key length
* | key content | value length | value content | col family name length | col family name | -1 |
* There are 3 types of data records, put, merge and delete.
* A put record or merge record is written as: | record type | key length
* | key content | value length | value content | -1 |
* A delete record is written as: | record type | key length | key content | -1
* | col family name length | col family name | -1 |
* Write an EOF_RECORD to signal the end of file.
* The overall changelog format is: | put record | delete record | ... | put record | eof record |
* The overall changelog format is: version | put record | delete record
* | ... | put record | eof record |
*/
class StateStoreChangelogWriterV2(
fm: CheckpointFileManager,
file: Path,
compressionCodec: CompressionCodec)
extends StateStoreChangelogWriter(fm, file, compressionCodec) {

override def version: Short = 2

// append the version field to the changelog file starting from version 2
writeVersion()

override def put(key: Array[Byte], value: Array[Byte]): Unit = {
writePutOrMergeRecord(key, value, RecordType.PUT_RECORD)
}
Expand Down Expand Up @@ -265,6 +279,8 @@ abstract class StateStoreChangelogReader(
}
protected val input: DataInputStream = decompressStream(sourceStream)

def version: Short

override protected def close(): Unit = { if (input != null) input.close() }

override def getNext(): (RecordType.Value, Array[Byte], Array[Byte])
Expand All @@ -283,6 +299,9 @@ class StateStoreChangelogReaderV1(
compressionCodec: CompressionCodec)
extends StateStoreChangelogReader(fm, fileToRead, compressionCodec) {

// Note that v1 does not record this value in the changelog file
override def version: Short = 1

override def getNext(): (RecordType.Value, Array[Byte], Array[Byte]) = {
val keySize = input.readInt()
// A -1 key size mean end of file.
Expand Down Expand Up @@ -314,7 +333,7 @@ class StateStoreChangelogReaderV1(
* Read an iterator of change record from the changelog file.
* A record is represented by tuple(recordType: RecordType.Value,
* key: Array[Byte], value: Array[Byte])
* A put record is returned as a tuple(recordType, key, value)
* A put or merge record is returned as a tuple(recordType, key, value)
* A delete record is return as a tuple(recordType, key, null)
*/
class StateStoreChangelogReaderV2(
Expand All @@ -330,6 +349,13 @@ class StateStoreChangelogReaderV2(
blockBuffer
}

override def version: Short = 2

// ensure that the version read is v2
val changelogVersionStr = input.readUTF()
assert(changelogVersionStr == "v2",
s"Changelog version mismatch: $changelogVersionStr != v2")

override def getNext(): (RecordType.Value, Array[Byte], Array[Byte]) = {
val recordType = RecordType.getRecordTypeFromByte(input.readByte())
// A EOF_RECORD means end of file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
val fileManager = new RocksDBFileManager(
dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
val changelogWriter = fileManager.getChangeLogWriter(1)
assert(changelogWriter.version === 1)

val ex = intercept[UnsupportedOperationException] {
changelogWriter.merge("a", "1")
Expand Down Expand Up @@ -639,12 +640,14 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
val fileManager = new RocksDBFileManager(
dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
val changelogWriter = fileManager.getChangeLogWriter(1)
assert(changelogWriter.version === 1)

(1 to 5).foreach(i => changelogWriter.put(i.toString, i.toString))
(2 to 4).foreach(j => changelogWriter.delete(j.toString))

changelogWriter.commit()
val changelogReader = fileManager.getChangelogReader(1)
assert(changelogReader.version === 1)
val entries = changelogReader.toSeq
val expectedEntries = (1 to 5).map { i =>
(RecordType.PUT_RECORD, i.toString.getBytes,
Expand All @@ -666,6 +669,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
val fileManager = new RocksDBFileManager(
dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
val changelogWriter = fileManager.getChangeLogWriter(1, true)
assert(changelogWriter.version === 2)
(1 to 5).foreach { i =>
changelogWriter.put(i.toString, i.toString)
}
Expand All @@ -679,6 +683,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared

changelogWriter.commit()
val changelogReader = fileManager.getChangelogReader(1, true)
assert(changelogReader.version === 2)
val entries = changelogReader.toSeq
val expectedEntries = (1 to 5).map { i =>
(RecordType.PUT_RECORD, i.toString.getBytes, i.toString.getBytes)
Expand Down

0 comments on commit 5ff6a52

Please sign in to comment.