Skip to content

Commit

Permalink
[Spark] Managed Commits: add a DynamoDB-based commit owner (#3107)
Browse files Browse the repository at this point in the history
## Description
Taking inspiration from #339, this
PR adds a Commit Owner Client which uses DynamoDB as the backend. Each
Delta table managed by a DynamoDB instance will have one corresponding
entry in a DynamoDB table. The table schema is as follows:

* tableId: String --- The unique identifier for the entry. This is a
UUID.
* path: String --- The fully qualified path of the table in the file
system. e.g. s3://bucket/path.
* acceptingCommits: Boolean --- Whether the commit owner is accepting
new commits. This will only
* be set to false when the table is converted from managed commits to
file system commits.
* tableVersion: Number --- The version of the latest commit.
* tableTimestamp: Number --- The inCommitTimestamp of the latest commit.
* schemaVersion: Number --- The version of the schema used to store the
data.
* commits: --- The list of unbackfilled commits.
  -  version: Number --- The version of the commit.
  -  inCommitTimestamp: Number --- The inCommitTimestamp of the commit.
  -  fsName: String --- The name of the unbackfilled file.
  -  fsLength: Number --- The length of the unbackfilled file.
- fsTimestamp: Number --- The modification time of the unbackfilled
file.

For a table to be managed by DynamoDB, `registerTable` must be called
for that Delta table. This will create a new entry in the db for this
Delta table. Every `commit` invocation appends the UUID delta file
status to the `commits` list in the table entry. `commit` is performed
through a conditional write in DynamoDB.

## How was this patch tested?
Added a new suite called `DynamoDBCommitOwnerClient5BackfillSuite` which
uses a mock DynamoDB client. + plus manual testing against a DynamoDB
instance.
  • Loading branch information
dhruvarya-db authored May 20, 2024
1 parent 57df2c0 commit 7b4ee63
Show file tree
Hide file tree
Showing 9 changed files with 1,177 additions and 44 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ lazy val spark = (project in file("spark"))
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided",
// For DynamoDBCommitStore
"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided",

// Test deps
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.dynamodbcommitstore;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import org.apache.spark.sql.delta.managedcommit.CommitOwnerBuilder;
import org.apache.spark.sql.delta.managedcommit.CommitOwnerClient;
import org.apache.spark.sql.SparkSession;
import scala.collection.immutable.Map;

import java.lang.reflect.InvocationTargetException;

public class DynamoDBCommitOwnerClientBuilder implements CommitOwnerBuilder {

private final long BACKFILL_BATCH_SIZE = 1L;

@Override
public String getName() {
return "dynamodb";
}

/**
* Key for the name of the DynamoDB table which stores all the unbackfilled
* commits for this owner. The value of this key is stored in the `conf`
* which is passed to the `build` method.
*/
private static final String MANAGED_COMMITS_TABLE_NAME_KEY = "managedCommitsTableName";
/**
* Key for the endpoint of the DynamoDB service. The value of this key is stored in the
* `conf` which is passed to the `build` method.
*/
private static final String DYNAMO_DB_ENDPOINT_KEY = "dynamoDBEndpoint";

/**
* The AWS credentials provider chain to use when creating the DynamoDB client.
* This has temporarily been hardcoded until we have a way to read from sparkSession.
*/
private static final String AWS_CREDENTIALS_PROVIDER =
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain";

// TODO: update this interface so that it can take a sparkSession.
@Override
public CommitOwnerClient build(SparkSession spark, Map<String, String> conf) {
String managedCommitsTableName = conf.get(MANAGED_COMMITS_TABLE_NAME_KEY).getOrElse(() -> {
throw new RuntimeException(MANAGED_COMMITS_TABLE_NAME_KEY + " not found");
});
String dynamoDBEndpoint = conf.get(DYNAMO_DB_ENDPOINT_KEY).getOrElse(() -> {
throw new RuntimeException(DYNAMO_DB_ENDPOINT_KEY + " not found");
});
try {
AmazonDynamoDBClient client =
createAmazonDDBClient(dynamoDBEndpoint, AWS_CREDENTIALS_PROVIDER);
return new DynamoDBCommitOwnerClient(
managedCommitsTableName, dynamoDBEndpoint, client, BACKFILL_BATCH_SIZE);
} catch (Exception e) {
throw new RuntimeException("Failed to create DynamoDB client", e);
}
}

private AmazonDynamoDBClient createAmazonDDBClient(
String endpoint,
String credentialProviderName
) throws NoSuchMethodException,
ClassNotFoundException,
InvocationTargetException,
InstantiationException,
IllegalAccessException {
Class<?> awsCredentialsProviderClass = Class.forName(credentialProviderName);
AWSCredentialsProvider awsCredentialsProvider =
(AWSCredentialsProvider) awsCredentialsProviderClass.getConstructor().newInstance();
AmazonDynamoDBClient client = new AmazonDynamoDBClient(awsCredentialsProvider);
client.setEndpoint(endpoint);
return client;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.dynamodbcommitstore;

/**
* Defines the field names used in the DynamoDB table entry.
*/
final class DynamoDBTableEntryConstants {
private DynamoDBTableEntryConstants() {}

/** The primary key of the DynamoDB table. */
public static final String TABLE_ID = "tableId";
/** The version of the latest commit in the corresponding Delta table. */
public static final String TABLE_LATEST_VERSION = "tableVersion";
/** The inCommitTimestamp of the latest commit in the corresponding Delta table. */
public static final String TABLE_LATEST_TIMESTAMP = "tableTimestamp";
/** Whether this commit owner is accepting more commits for the corresponding Delta table. */
public static final String ACCEPTING_COMMITS = "acceptingCommits";
/** The path of the corresponding Delta table. */
public static final String TABLE_PATH = "path";
/** The schema version of this DynamoDB table entry. */
public static final String SCHEMA_VERSION = "schemaVersion";
/** The name of the field used to store unbackfilled commits. */
public static final String COMMITS = "commits";
/** The unbackfilled commit version. */
public static final String COMMIT_VERSION = "version";
/** The inCommitTimestamp of the unbackfilled commit. */
public static final String COMMIT_TIMESTAMP = "timestamp";
/** The name of the unbackfilled file. e.g. 00001.uuid.json */
public static final String COMMIT_FILE_NAME = "fsName";
/** The length of the unbackfilled file as per the file status. */
public static final String COMMIT_FILE_LENGTH = "fsLength";
/** The modification timestamp of the unbackfilled file as per the file status. */
public static final String COMMIT_FILE_MODIFICATION_TIMESTAMP = "fsTimestamp";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.dynamodbcommitstore;

import org.apache.spark.sql.delta.managedcommit.AbstractMetadata;
import org.apache.spark.sql.delta.managedcommit.UpdatedActions;
import org.apache.hadoop.fs.Path;

import java.util.UUID;

public class ManagedCommitUtils {

private ManagedCommitUtils() {}

/** The subdirectory in which to store the unbackfilled commit files. */
final static String COMMIT_SUBDIR = "_commits";

/** The configuration key for the managed commit owner. */
private static final String MANAGED_COMMIT_OWNER_CONF_KEY =
"delta.managedCommits.commitOwner-dev";

/**
* Creates a new unbackfilled delta file path for the given commit version.
* The path is of the form `tablePath/_delta_log/_commits/00000000000000000001.uuid.json`.
*/
public static Path generateUnbackfilledDeltaFilePath(
Path logPath,
long version) {
String uuid = UUID.randomUUID().toString();
Path basePath = new Path(logPath, COMMIT_SUBDIR);
return new Path(basePath, String.format("%020d.%s.json", version, uuid));
}

/**
* Returns the path to the backfilled delta file for the given commit version.
* The path is of the form `tablePath/_delta_log/00000000000000000001.json`.
*/
public static Path getBackfilledDeltaFilePath(
Path logPath,
Long version) {
return new Path(logPath, String.format("%020d.json", version));
}

private static String getManagedCommitOwner(AbstractMetadata metadata) {
return metadata
.getConfiguration()
.get(MANAGED_COMMIT_OWNER_CONF_KEY)
.getOrElse(() -> "");
}

/**
* Returns true if the commit is a managed commit to filesystem conversion.
*/
public static boolean isManagedCommitToFSConversion(
Long commitVersion,
UpdatedActions updatedActions) {
boolean oldMetadataHasManagedCommits =
!getManagedCommitOwner(updatedActions.getOldMetadata()).isEmpty();
boolean newMetadataHasManagedCommits =
!getManagedCommitOwner(updatedActions.getNewMetadata()).isEmpty();
return oldMetadataHasManagedCommits && !newMetadataHasManagedCommits && commitVersion > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.managedcommit
import scala.collection.mutable

import org.apache.spark.sql.delta.storage.LogStore
import io.delta.dynamodbcommitstore.DynamoDBCommitOwnerClientBuilder
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}

Expand All @@ -45,7 +46,7 @@ case class Commit(
case class CommitFailedException(
private val retryable: Boolean,
private val conflict: Boolean,
private val message: String) extends Exception(message) {
private val message: String) extends RuntimeException(message) {
def getRetryable: Boolean = retryable
def getConflict: Boolean = conflict
}
Expand Down Expand Up @@ -237,7 +238,7 @@ object CommitOwnerProvider {
}

private val initialCommitOwnerBuilders = Seq[CommitOwnerBuilder](
// Any new commit-owner builder will be registered here.
new DynamoDBCommitOwnerClientBuilder()
)
initialCommitOwnerBuilders.foreach(registerBuilder)
}
Loading

0 comments on commit 7b4ee63

Please sign in to comment.