Skip to content

Commit

Permalink
SAMZA-2784: Remove excessive commit logs
Browse files Browse the repository at this point in the history
  • Loading branch information
dxichen committed Jan 19, 2024
1 parent 082899d commit 37b2abd
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public CompletableFuture<Map<String, String>> upload(CheckpointId checkpointId,
CompletionStage<String> snapshotIndexBlobIdFuture =
snapshotIndexFuture
.thenComposeAsync(si -> {
LOG.info("Uploading Snapshot index: {} for task: {} store: {}", si, taskName, storeName);
LOG.debug("Uploading Snapshot index: {} for task: {} store: {}", si, taskName, storeName);
return blobStoreUtil.putSnapshotIndex(si);
}, executor);

Expand Down Expand Up @@ -296,7 +296,7 @@ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, St
// 1. remove TTL of index blob and all of its files and sub-dirs marked for retention
CompletionStage<Void> removeTTLFuture =
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
LOG.info("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}",
LOG.debug("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}",
snapshotIndexBlobId, taskName, storeName);
return blobStoreUtil.removeTTL(snapshotIndexBlobId, snapshotIndex, requestMetadata);
}, executor);
Expand All @@ -305,7 +305,7 @@ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, St
// 2. delete the files/subdirs marked for deletion in the snapshot index.
CompletionStage<Void> cleanupRemoteSnapshotFuture =
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
LOG.info("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}",
LOG.debug("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}",
snapshotIndexBlobId, taskName, storeName);
return blobStoreUtil.cleanUpDir(snapshotIndex.getDirIndex(), requestMetadata);
}, executor);
Expand All @@ -317,7 +317,7 @@ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, St
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
if (snapshotIndex.getPrevSnapshotIndexBlobId().isPresent()) {
String blobId = snapshotIndex.getPrevSnapshotIndexBlobId().get();
LOG.info("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.",
LOG.debug("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.",
blobId, taskName, storeName);
return blobStoreUtil.deleteSnapshotIndexBlob(blobId, requestMetadata);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public static BiPredicate<File, FileIndex> areSameFile(boolean compareLargeFileC
if (!compareLargeFileChecksums && isLargeFile) {
// Since RocksDB SST files are immutable after creation, we can skip the expensive checksum computations
// which requires reading the entire file.
LOG.debug("Local file: {} and remote file: {} are same. " +
LOG.debug("Local file: {} and remote file: {} both present. " +
"Skipping checksum calculation for large file of size: {}.",
localFile.getAbsolutePath(), remoteFile.getFileName(), localFileAttrs.size());
return true;
Expand All @@ -234,7 +234,7 @@ public static BiPredicate<File, FileIndex> areSameFile(boolean compareLargeFileC

boolean areSameChecksum = localFileChecksum == remoteFile.getChecksum();
if (!areSameChecksum) {
LOG.warn("Local file: {} and remote file: {} are not same. " +
LOG.debug("Local file: {} and remote file: {} are not same. " +
"Local checksum: {}. Remote checksum: {}",
localFile.getAbsolutePath(), remoteFile.getFileName(), localFileChecksum, remoteFile.getChecksum());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class TaskInstance(
// WARNING: cleanUp is NOT optional with blob stores since this is where we reset the TTL for
// tracked blobs. if this TTL reset is skipped, some of the blobs retained by future commits may
// be deleted in the background by the blob store, leading to data loss.
info("Cleaning up stale state from previous run for taskName: %s" format taskName)
debug("Cleaning up stale state from previous run for taskName: %s" format taskName)
commitManager.cleanUp(checkpointV2.getCheckpointId, checkpointV2.getStateCheckpointMarkers)
}

Expand Down Expand Up @@ -474,7 +474,7 @@ class TaskInstance(
new Function[util.Map[String, util.Map[String, String]], CompletableFuture[Void]] {
override def apply(uploadSCMs: util.Map[String, util.Map[String, String]]): CompletableFuture[Void] = {
// Perform cleanup on unused checkpoints
info("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId))
debug("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId))
val cleanUpStartTime = System.nanoTime()
try {
commitManager.cleanUp(checkpointId, uploadSCMs)
Expand Down

0 comments on commit 37b2abd

Please sign in to comment.