Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stop writing _latest.manifest #2776

Merged
merged 7 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 2 additions & 30 deletions rust/lance-table/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ use {

use crate::format::{Index, Manifest};

const LATEST_MANIFEST_NAME: &str = "_latest.manifest";
const VERSIONS_DIR: &str = "_versions";
const MANIFEST_EXTENSION: &str = "manifest";

Expand All @@ -75,10 +74,6 @@ pub fn manifest_path(base: &Path, version: u64) -> Path {
.child(format!("{version}.{MANIFEST_EXTENSION}"))
}

pub fn latest_manifest_path(base: &Path) -> Path {
base.child(LATEST_MANIFEST_NAME)
}

#[derive(Debug)]
pub struct ManifestLocation {
/// The version the manifest corresponds to.
Expand Down Expand Up @@ -226,21 +221,6 @@ fn make_staging_manifest_path(base: &Path) -> Result<Path> {
})
}

async fn write_latest_manifest(
from_path: &Path,
base_path: &Path,
object_store: &dyn OSObjectStore,
) -> Result<()> {
let latest_path = latest_manifest_path(base_path);
let staging_path = make_staging_manifest_path(from_path)?;
object_store
.copy(from_path, &staging_path)
.await
.map_err(|err| CommitError::OtherError(err.into()))?;
object_store.rename(&staging_path, &latest_path).await?;
Ok(())
}

#[cfg(feature = "dynamodb")]
const DDB_URL_QUERY_KEY: &str = "ddbTableName";

Expand Down Expand Up @@ -538,8 +518,6 @@ impl CommitHandler for UnsafeCommitHandler {
// Write the manifest naively
manifest_writer(object_store, manifest, indices, &version_path).await?;

write_latest_manifest(&version_path, base_path, &object_store.inner).await?;

Ok(())
}
}
Expand Down Expand Up @@ -613,8 +591,6 @@ impl<T: CommitLock + Send + Sync> CommitHandler for T {
}
let res = manifest_writer(object_store, manifest, indices, &path).await;

write_latest_manifest(&path, base_path, &object_store.inner).await?;

// Release the lock
lease.release(res.is_ok()).await?;

Expand Down Expand Up @@ -675,7 +651,7 @@ impl CommitHandler for RenameCommitHandler {
// Write the manifest to the temporary path
manifest_writer(object_store, manifest, indices, &tmp_path).await?;

let res = match object_store
match object_store
.inner
.rename_if_not_exists(&tmp_path, &path)
.await
Expand All @@ -692,11 +668,7 @@ impl CommitHandler for RenameCommitHandler {
// Something else went wrong
return Err(CommitError::OtherError(e.into()));
}
};

write_latest_manifest(&path, base_path, &object_store.inner).await?;

res
}
}
}

Expand Down
13 changes: 3 additions & 10 deletions rust/lance-table/src/io/commit/external_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use object_store::{path::Path, ObjectStore as OSObjectStore};
use snafu::{location, Location};

use super::{
current_manifest_path, make_staging_manifest_path, manifest_path, write_latest_manifest,
ManifestLocation, MANIFEST_EXTENSION,
current_manifest_path, make_staging_manifest_path, manifest_path, ManifestLocation,
MANIFEST_EXTENSION,
};
use crate::format::{Index, Manifest};
use crate::io::commit::{CommitError, CommitHandler, ManifestWriter};
Expand Down Expand Up @@ -124,11 +124,7 @@ impl CommitHandler for ExternalManifestCommitHandler {
.rename(&staging, &object_store_manifest_path)
.await?;

// step 2: write _latest.manifest
write_latest_manifest(&manifest_path, base_path, object_store.inner.as_ref())
.await?;

// step 3: update external store to finalize path
// step 2: update external store to finalize path
self.external_manifest_store
.put_if_exists(
base_path.as_ref(),
Expand Down Expand Up @@ -263,9 +259,6 @@ impl CommitHandler for ExternalManifestCommitHandler {
)
))?;

// update the _latest.manifest pointer
write_latest_manifest(&path, base_path, &object_store.inner).await?;

// step 5: flip the external store to point to the final location
self.external_manifest_store
.put_if_exists(base_path.as_ref(), manifest.version, path.as_ref())
Expand Down
34 changes: 16 additions & 18 deletions rust/lance/src/dataset/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,8 +783,6 @@ mod tests {
);
assert_lt!(after_count.num_tx_files, before_count.num_tx_files);

// The latest manifest should still be there, even if it is older than
// the given time.
assert_gt!(after_count.num_manifest_files, 0);
assert_gt!(after_count.num_data_files, 0);
// We should keep referenced tx files
Expand All @@ -805,9 +803,9 @@ mod tests {

let before_count = fixture.count_files().await.unwrap();

// 3 versions (plus one extra latest.manifest)
// 3 versions
assert_eq!(before_count.num_data_files, 3);
assert_eq!(before_count.num_manifest_files, 4);
assert_eq!(before_count.num_manifest_files, 3);

let before = utc_now() - TimeDelta::try_days(7).unwrap();
let removed = fixture.run_cleanup(before).await.unwrap();
Expand All @@ -824,7 +822,7 @@ mod tests {
// the latest version
assert_eq!(after_count.num_data_files, 3);
// Only the oldest manifest file should be removed
assert_eq!(after_count.num_manifest_files, 3);
assert_eq!(after_count.num_manifest_files, 2);
assert_eq!(after_count.num_tx_files, 2);
}

Expand Down Expand Up @@ -942,7 +940,7 @@ mod tests {

let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 2);
assert_eq!(before_count.num_manifest_files, 3);
assert_eq!(before_count.num_manifest_files, 2);

// Not much time has passed but we can still delete the old manifest
// and the related data files
Expand All @@ -957,7 +955,7 @@ mod tests {
);

assert_eq!(after_count.num_data_files, 1);
assert_eq!(after_count.num_manifest_files, 2);
assert_eq!(after_count.num_manifest_files, 1);
}

#[tokio::test]
Expand Down Expand Up @@ -986,7 +984,7 @@ mod tests {

let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 2);
assert_eq!(before_count.num_manifest_files, 2);
assert_eq!(before_count.num_manifest_files, 1);

let before = utc_now();
let removed = fixture
Expand Down Expand Up @@ -1025,8 +1023,8 @@ mod tests {
assert_eq!(before_count.num_index_files, 1);
// Two user data files
assert_eq!(before_count.num_data_files, 2);
// Creating an index creates a new manifest so there are 4 total
assert_eq!(before_count.num_manifest_files, 4);
// Creating an index creates a new manifest so there are 3 total
assert_eq!(before_count.num_manifest_files, 3);

let before = utc_now() - TimeDelta::try_days(8).unwrap();
let removed = fixture.run_cleanup(before).await.unwrap();
Expand All @@ -1040,7 +1038,7 @@ mod tests {

assert_eq!(after_count.num_index_files, 0);
assert_eq!(after_count.num_data_files, 1);
assert_eq!(after_count.num_manifest_files, 2);
assert_eq!(after_count.num_manifest_files, 1);
assert_eq!(after_count.num_tx_files, 1);
}

Expand All @@ -1066,7 +1064,7 @@ mod tests {
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 3);
assert_eq!(before_count.num_delete_files, 2);
assert_eq!(before_count.num_manifest_files, 6);
assert_eq!(before_count.num_manifest_files, 5);

let before = utc_now() - TimeDelta::try_days(8).unwrap();
let removed = fixture.run_cleanup(before).await.unwrap();
Expand All @@ -1080,7 +1078,7 @@ mod tests {

assert_eq!(after_count.num_data_files, 1);
assert_eq!(after_count.num_delete_files, 1);
assert_eq!(after_count.num_manifest_files, 3);
assert_eq!(after_count.num_manifest_files, 2);
assert_eq!(after_count.num_tx_files, 2);

// Ensure we can still read the dataset
Expand Down Expand Up @@ -1128,7 +1126,7 @@ mod tests {
// This append will fail since the commit is blocked but it should have
// deposited a data file
assert_eq!(before_count.num_data_files, 2);
assert_eq!(before_count.num_manifest_files, 2);
assert_eq!(before_count.num_manifest_files, 1);
assert_eq!(before_count.num_tx_files, 2);

// All of our manifests are newer than the threshold but temp files
Expand All @@ -1146,7 +1144,7 @@ mod tests {
);

assert_eq!(after_count.num_data_files, 1);
assert_eq!(after_count.num_manifest_files, 2);
assert_eq!(after_count.num_manifest_files, 1);
assert_eq!(after_count.num_tx_files, 1);
}

Expand Down Expand Up @@ -1198,7 +1196,7 @@ mod tests {

let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 2);
assert_eq!(before_count.num_manifest_files, 3);
assert_eq!(before_count.num_manifest_files, 2);

assert!(fixture
.run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap())
Expand All @@ -1212,7 +1210,7 @@ mod tests {
// has to finish the buffered tasks even if they are ignored.
let mid_count = fixture.count_files().await.unwrap();
assert_eq!(mid_count.num_data_files, 1);
assert_eq!(mid_count.num_manifest_files, 3);
assert_eq!(mid_count.num_manifest_files, 2);

fixture.unblock_delete_manifest();

Expand All @@ -1229,6 +1227,6 @@ mod tests {
);

assert_eq!(after_count.num_data_files, 1);
assert_eq!(after_count.num_manifest_files, 2);
assert_eq!(after_count.num_manifest_files, 1);
}
}
6 changes: 1 addition & 5 deletions rust/lance/src/io/commit/dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ mod test {
use lance_table::io::commit::{
dynamodb::DynamoDBExternalManifestStore,
external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
latest_manifest_path, manifest_path, CommitHandler,
manifest_path, CommitHandler,
};

fn read_params(handler: Arc<dyn CommitHandler>) -> ReadParams {
Expand Down Expand Up @@ -308,10 +308,6 @@ mod test {
// manually simulate last version is out of sync
let localfs: Box<dyn object_store::ObjectStore> = Box::new(LocalFileSystem::new());
localfs.delete(&manifest_path(&ds.base, 6)).await.unwrap();
localfs
.copy(&manifest_path(&ds.base, 5), &latest_manifest_path(&ds.base))
.await
.unwrap();

// set the store back to dataset path with -{uuid} suffix
let mut version_six = localfs
Expand Down
7 changes: 2 additions & 5 deletions rust/lance/src/io/commit/external_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod test {
use lance_table::io::commit::external_manifest::{
ExternalManifestCommitHandler, ExternalManifestStore,
};
use lance_table::io::commit::{latest_manifest_path, manifest_path, CommitHandler};
use lance_table::io::commit::{manifest_path, CommitHandler};
use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
use object_store::local::LocalFileSystem;
use snafu::{location, Location};
Expand Down Expand Up @@ -272,10 +272,7 @@ mod test {
// manually simulate last version is out of sync
let localfs: Box<dyn object_store::ObjectStore> = Box::new(LocalFileSystem::new());
localfs.delete(&manifest_path(&ds.base, 6)).await.unwrap();
localfs
.copy(&manifest_path(&ds.base, 5), &latest_manifest_path(&ds.base))
.await
.unwrap();

// set the store back to dataset path with -{uuid} suffix
let mut version_six = localfs
.list(Some(&ds.base))
Expand Down
Loading