Skip to content

Commit

Permalink
squash commits
Browse files Browse the repository at this point in the history
  • Loading branch information
vikrantpuppala committed Oct 8, 2024
1 parent e0daf1e commit fe44950
Show file tree
Hide file tree
Showing 6 changed files with 398 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,7 @@
import io.unitycatalog.server.persist.utils.ServerPropertiesUtils;
import io.unitycatalog.server.security.SecurityConfiguration;
import io.unitycatalog.server.security.SecurityContext;
import io.unitycatalog.server.service.AuthDecorator;
import io.unitycatalog.server.service.AuthService;
import io.unitycatalog.server.service.CatalogService;
import io.unitycatalog.server.service.FunctionService;
import io.unitycatalog.server.service.IcebergRestCatalogService;
import io.unitycatalog.server.service.ModelService;
import io.unitycatalog.server.service.PermissionService;
import io.unitycatalog.server.service.SchemaService;
import io.unitycatalog.server.service.Scim2UserService;
import io.unitycatalog.server.service.TableService;
import io.unitycatalog.server.service.TemporaryModelVersionCredentialsService;
import io.unitycatalog.server.service.TemporaryPathCredentialsService;
import io.unitycatalog.server.service.TemporaryTableCredentialsService;
import io.unitycatalog.server.service.TemporaryVolumeCredentialsService;
import io.unitycatalog.server.service.VolumeService;
import io.unitycatalog.server.service.*;
import io.unitycatalog.server.service.credential.CredentialOperations;
import io.unitycatalog.server.service.iceberg.FileIOFactory;
import io.unitycatalog.server.service.iceberg.MetadataService;
Expand All @@ -49,13 +35,7 @@
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import java.nio.file.Path;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.*;
import org.apache.logging.log4j.core.config.Configurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -136,7 +116,8 @@ private void addServices(ServerBuilder sb) {
TableService tableService = new TableService(authorizer);
FunctionService functionService = new FunctionService(authorizer);
ModelService modelService = new ModelService(authorizer);
// TODO: combine these into a single service in a follow-up PR
CoordinatedCommitsService coordinatedCommitsService = new CoordinatedCommitsService(authorizer);
// TODO: combine these into a single service in a follow-up PR
TemporaryTableCredentialsService temporaryTableCredentialsService =
new TemporaryTableCredentialsService(authorizer, credentialOperations);
TemporaryVolumeCredentialsService temporaryVolumeCredentialsService =
Expand All @@ -159,6 +140,8 @@ private void addServices(ServerBuilder sb) {
.annotatedService(basePath + "tables", tableService, unityConverterFunction)
.annotatedService(basePath + "functions", functionService, unityConverterFunction)
.annotatedService(basePath + "models", modelService, unityConverterFunction)
.annotatedService(
basePath + "delta/commits", coordinatedCommitsService, unityConverterFunction)
.annotatedService(
basePath + "temporary-table-credentials", temporaryTableCredentialsService)
.annotatedService(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.unitycatalog.server.exception;

import java.util.Map;

public class CommitException extends BaseException {
public CommitException(
ErrorCode errorCode, String errorMessage, Throwable cause, Map<String, String> metadata) {
super(errorCode, errorMessage, cause, metadata);
}

public CommitException(ErrorCode errorCode, String errorMessage, Throwable cause) {
super(errorCode, errorMessage, cause);
}

public CommitException(ErrorCode errorCode, String errorMessage) {
super(errorCode, errorMessage);
}

public CommitException(ErrorCode errorCode, Throwable cause) {
super(errorCode, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package io.unitycatalog.server.handler;

import static io.unitycatalog.server.persist.CommitRepository.MAX_NUM_COMMITS_PER_TABLE;

import io.unitycatalog.server.exception.CommitException;
import io.unitycatalog.server.exception.ErrorCode;
import io.unitycatalog.server.model.Commit;
import io.unitycatalog.server.model.DataSourceFormat;
import io.unitycatalog.server.model.TableInfo;
import io.unitycatalog.server.persist.CommitRepository;
import io.unitycatalog.server.persist.TableRepository;
import io.unitycatalog.server.persist.dao.CommitDAO;
import java.util.Objects;
import java.util.UUID;
import org.hibernate.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CoordinatedCommitsHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(CoordinatedCommitsHandler.class);
public static final TableRepository TABLE_REPOSITORY = TableRepository.getInstance();
public static final CommitRepository COMMIT_REPOSITORY = CommitRepository.getInstance();

public static void validate(Commit commit) {
// We do not support disown commits
if (commit.getCommitInfo() != null && commit.getCommitInfo().getIsDisownCommit()) {
throw new CommitException(ErrorCode.UNIMPLEMENTED, "Disown commits are not supported!");
}
// Validate the commit object
assert commit.getTableId() != null;
// assert commit.getTableUri() != null;

// TODO: Add other assertions like the table URI path exists

// Validate the commit info object
if (commit.getCommitInfo() != null) {
assert commit.getCommitInfo().getIsDisownCommit() != null;
assert commit.getCommitInfo().getFileSize() > 0;
assert commit.getCommitInfo().getFileName() != null;
assert commit.getCommitInfo().getVersion() != null;
assert commit.getCommitInfo().getTimestamp() != null;
assert commit.getCommitInfo().getFileModificationTimestamp() != null;
} else {
// If commit info is null, then it should be a backfill only commit
assert commit.getLatestBackfilledVersion() != null;
}
}

public static void validateTablePath(Commit commit) {
TableInfo tableInfo = TABLE_REPOSITORY.getTableById(commit.getTableId());
// assert tableInfo.getTableType() == TableType.MANAGED;
assert tableInfo.getDataSourceFormat() == DataSourceFormat.DELTA;

// TODO: Add other assertions like verifying the table path (tableInfo.getStorageLocation)
}

public static void validateOnboardingCommit(Commit commit) {
assert commit.getCommitInfo() != null;
// Onboarding commit cannot be a disown commit
assert !commit.getCommitInfo().getIsDisownCommit();
}

public static void handleFirstCommit(Session session, Commit commit) {
validateOnboardingCommit(commit);
COMMIT_REPOSITORY.saveCommit(session, commit);
}

public static void handleBackfillOnlyCommit(
Session session,
String tableId,
Long latestBackfilledVersion,
CommitDAO firstCommit,
CommitDAO lastCommit) {
// Nothing to delete, return
if (latestBackfilledVersion < firstCommit.getCommitVersion()) {
return;
}
// We only need to delete when there is more than one commit. We always keep the last commit.
if (firstCommit.getCommitVersion() < lastCommit.getCommitVersion()) {
COMMIT_REPOSITORY.backfillCommits(
session,
UUID.fromString(tableId),
Math.min(latestBackfilledVersion, lastCommit.getCommitVersion() - 1),
firstCommit,
lastCommit.getCommitVersion());
}
if (latestBackfilledVersion == lastCommit.getCommitVersion()) {
// Mark the last commit as the latest backfilled version
COMMIT_REPOSITORY.markCommitAsLatestBackfilled(
session, UUID.fromString(tableId), lastCommit.getCommitVersion());
}

// TODO: Should we also retain the disown commit if it's backfilled?
}

public static void handleReboardCommit() {
throw new CommitException(ErrorCode.UNIMPLEMENTED, "Reboarding of tables not allowed!");
}

public static void handleNormalCommit(
Session session, Commit commit, CommitDAO firstCommit, CommitDAO lastCommit) {
if (commit.getCommitInfo().getVersion() <= lastCommit.getCommitVersion()) {
throw new CommitException(
ErrorCode.ALREADY_EXISTS,
"Commit version should be greater than the last commit version = "
+ lastCommit.getCommitVersion());
}
if (commit.getCommitInfo().getVersion() > lastCommit.getCommitVersion() + 1) {
throw new CommitException(
ErrorCode.INVALID_ARGUMENT,
"Commit version should be the next version after the last commit version = "
+ lastCommit.getCommitVersion());
}
if (commit.getLatestBackfilledVersion() != null
&& commit.getLatestBackfilledVersion() > lastCommit.getCommitVersion()) {
throw new CommitException(
ErrorCode.INVALID_ARGUMENT,
"Latest backfilled version cannot be greater than the last commit version = "
+ lastCommit.getCommitVersion());
}
long latestBackfilledVersion = getLatestBackfilledVersion(commit, firstCommit, lastCommit);
if (commit.getCommitInfo().getVersion() - latestBackfilledVersion > MAX_NUM_COMMITS_PER_TABLE) {
throw new CommitException(
ErrorCode.RESOURCE_EXHAUSTED,
"Max number of commits per table reached = " + MAX_NUM_COMMITS_PER_TABLE);
}
COMMIT_REPOSITORY.saveCommit(session, commit);
if (latestBackfilledVersion >= firstCommit.getCommitVersion()) {
COMMIT_REPOSITORY.backfillCommits(
session,
UUID.fromString(commit.getTableId()),
latestBackfilledVersion,
firstCommit,
commit.getCommitInfo().getVersion());
}
}

private static Long getLatestBackfilledVersion(
Commit commit, CommitDAO firstCommit, CommitDAO lastCommit) {
long latestBackfilledVersion;
if (lastCommit.getIsBackfilledLatestCommit()) {
if (!Objects.equals(firstCommit.getCommitVersion(), lastCommit.getCommitVersion())) {
LOGGER.error(
"When the last commit is the latest backfilled commit, there should be exactly one commit");
// This is recoverable
}
latestBackfilledVersion = lastCommit.getCommitVersion();
} else {
latestBackfilledVersion =
Math.max(
commit.getLatestBackfilledVersion() == null ? 0 : commit.getLatestBackfilledVersion(),
firstCommit.getCommitVersion() - 1);
}
return latestBackfilledVersion;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package io.unitycatalog.server.persist;

import io.unitycatalog.server.model.Commit;
import io.unitycatalog.server.model.Metadata;
import io.unitycatalog.server.persist.dao.ColumnInfoDAO;
import io.unitycatalog.server.persist.dao.CommitDAO;
import io.unitycatalog.server.persist.dao.PropertyDAO;
import io.unitycatalog.server.persist.dao.TableInfoDAO;
import io.unitycatalog.server.utils.Constants;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import org.hibernate.Session;
import org.hibernate.query.NativeQuery;
import org.hibernate.query.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitRepository {
private static final CommitRepository INSTANCE = new CommitRepository();
private static final Logger LOGGER = LoggerFactory.getLogger(CommitRepository.class);

// The maximum number of commits per table.
public static final Integer MAX_NUM_COMMITS_PER_TABLE = 50;
private static final Integer NUM_COMMITS_PER_BATCH = 100;

public static CommitRepository getInstance() {
return INSTANCE;
}

public void saveCommit(Session session, Commit commit) {
CommitDAO commitDAO = CommitDAO.from(commit);
session.persist(commitDAO);
}

public void backfillCommits(
Session session, UUID tableId, Long upTo, CommitDAO firstCommit, Long highestCommitVersion) {
assert upTo >= firstCommit.getCommitVersion();
assert upTo < highestCommitVersion;
long numCommitsToDelete = upTo - firstCommit.getCommitVersion() + 1;
if (numCommitsToDelete <= 0) {
return;
}
// Retry backfilling 5 times to prioritize cleaning of the commit table and log bugs where there
// are more
// commits in the table than MAX_NUM_COMMITS_PER_TABLE
for (int i = 0; i < 5 && numCommitsToDelete > 0; i++) {
numCommitsToDelete -= deleteCommits(session, tableId, upTo);
if (numCommitsToDelete > 0) {
LOGGER.error(
"Failed to backfill commits for tableId: {}, upTo: {}, in batch: {}, commits left: {}",
tableId,
upTo,
i,
numCommitsToDelete);
}
}
}

public int deleteCommits(Session session, UUID tableId, Long upTo) {
NativeQuery<CommitDAO> query =
session.createNativeQuery(
"DELETE FROM uc_commits WHERE table_id = :tableId AND commit_version <= :upTo LIMIT :numCommitsPerBatch",
CommitDAO.class);
query.setParameter("tableId", tableId);
query.setParameter("upTo", upTo);
query.setParameter("numCommitsPerBatch", NUM_COMMITS_PER_BATCH);
return query.executeUpdate();
}

public void markCommitAsLatestBackfilled(Session session, UUID tableId, Long commitVersion) {
NativeQuery<CommitDAO> query =
session.createNativeQuery(
"UPDATE uc_commits SET is_backfilled_latest_commit = true WHERE table_id = :tableId "
+ "AND commit_version = :commitVersion",
CommitDAO.class);
query.setParameter("tableId", tableId);
query.setParameter("commitVersion", commitVersion);
query.executeUpdate();
}

public List<CommitDAO> getFirstAndLastCommits(Session session, UUID tableId) {
// Use native SQL to get the first and last commits since HQL doesn't support UNION ALL
String sql =
"(SELECT * FROM uc_commits WHERE table_id = :tableId ORDER BY commit_version ASC LIMIT 1) "
+ "UNION ALL "
+ "(SELECT * FROM uc_commits WHERE table_id = :tableId ORDER BY commit_version DESC LIMIT 1)";

Query<CommitDAO> query = session.createNativeQuery(sql, CommitDAO.class);
query.setParameter("tableId", tableId);
List<CommitDAO> result = query.getResultList();
// Sort to ensure the first commit is at index 0
result.sort(Comparator.comparing(CommitDAO::getCommitVersion));
return result;
}

public void updateTableMetadata(Session session, Commit commit) {
TableInfoDAO tableInfoDAO =
session.get(TableInfoDAO.class, UUID.fromString(commit.getTableId()));
Metadata metadata = commit.getMetadata();
// Update properties
PropertyRepository.findProperties(session, tableInfoDAO.getId(), Constants.TABLE)
.forEach(session::remove);
PropertyDAO.from(metadata.getProperties(), tableInfoDAO.getId(), Constants.TABLE)
.forEach(session::persist);
// Update columns
tableInfoDAO.setColumns(ColumnInfoDAO.fromList(metadata.getSchema().getColumns()));
session.merge(tableInfoDAO);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private static void deleteLocalDirectory(Path dirPath) throws IOException {
});
}
} else {
throw new IOException("Directory does not exist: " + dirPath);
LOGGER.info("Directory does not exist: {}", dirPath);
}
}

Expand Down
Loading

0 comments on commit fe44950

Please sign in to comment.