diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 628ff004e3..5d4c8ab00d 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -57,7 +57,6 @@ use { use crate::format::{Index, Manifest}; -const LATEST_MANIFEST_NAME: &str = "_latest.manifest"; const VERSIONS_DIR: &str = "_versions"; const MANIFEST_EXTENSION: &str = "manifest"; @@ -75,10 +74,6 @@ pub fn manifest_path(base: &Path, version: u64) -> Path { .child(format!("{version}.{MANIFEST_EXTENSION}")) } -pub fn latest_manifest_path(base: &Path) -> Path { - base.child(LATEST_MANIFEST_NAME) -} - #[derive(Debug)] pub struct ManifestLocation { /// The version the manifest corresponds to. @@ -226,21 +221,6 @@ fn make_staging_manifest_path(base: &Path) -> Result { }) } -async fn write_latest_manifest( - from_path: &Path, - base_path: &Path, - object_store: &dyn OSObjectStore, -) -> Result<()> { - let latest_path = latest_manifest_path(base_path); - let staging_path = make_staging_manifest_path(from_path)?; - object_store - .copy(from_path, &staging_path) - .await - .map_err(|err| CommitError::OtherError(err.into()))?; - object_store.rename(&staging_path, &latest_path).await?; - Ok(()) -} - #[cfg(feature = "dynamodb")] const DDB_URL_QUERY_KEY: &str = "ddbTableName"; @@ -538,8 +518,6 @@ impl CommitHandler for UnsafeCommitHandler { // Write the manifest naively manifest_writer(object_store, manifest, indices, &version_path).await?; - write_latest_manifest(&version_path, base_path, &object_store.inner).await?; - Ok(()) } } @@ -613,8 +591,6 @@ impl CommitHandler for T { } let res = manifest_writer(object_store, manifest, indices, &path).await; - write_latest_manifest(&path, base_path, &object_store.inner).await?; - // Release the lock lease.release(res.is_ok()).await?; @@ -675,7 +651,7 @@ impl CommitHandler for RenameCommitHandler { // Write the manifest to the temporary path manifest_writer(object_store, manifest, indices, &tmp_path).await?; - let res = match object_store + match object_store .inner .rename_if_not_exists(&tmp_path, &path) .await @@ -692,11 +668,7 @@ impl CommitHandler for RenameCommitHandler { // Something else went wrong return Err(CommitError::OtherError(e.into())); } - }; - - write_latest_manifest(&path, base_path, &object_store.inner).await?; - - res + } } } diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index 7285518e95..f760259b3d 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -15,8 +15,8 @@ use object_store::{path::Path, ObjectStore as OSObjectStore}; use snafu::{location, Location}; use super::{ - current_manifest_path, make_staging_manifest_path, manifest_path, write_latest_manifest, - ManifestLocation, MANIFEST_EXTENSION, + current_manifest_path, make_staging_manifest_path, manifest_path, ManifestLocation, + MANIFEST_EXTENSION, }; use crate::format::{Index, Manifest}; use crate::io::commit::{CommitError, CommitHandler, ManifestWriter}; @@ -124,11 +124,7 @@ impl CommitHandler for ExternalManifestCommitHandler { .rename(&staging, &object_store_manifest_path) .await?; - // step 2: write _latest.manifest - write_latest_manifest(&manifest_path, base_path, object_store.inner.as_ref()) - .await?; - - // step 3: update external store to finalize path + // step 2: update external store to finalize path self.external_manifest_store .put_if_exists( base_path.as_ref(), @@ -263,9 +259,6 @@ impl CommitHandler for ExternalManifestCommitHandler { ) ))?; - // update the _latest.manifest pointer - write_latest_manifest(&path, base_path, &object_store.inner).await?; - // step 5: flip the external store to point to the final location self.external_manifest_store .put_if_exists(base_path.as_ref(), manifest.version, path.as_ref()) diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 243541b5dc..174e1b560c 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -783,8 +783,6 @@ mod tests { ); assert_lt!(after_count.num_tx_files, before_count.num_tx_files); - // The latest manifest should still be there, even if it is older than - // the given time. assert_gt!(after_count.num_manifest_files, 0); assert_gt!(after_count.num_data_files, 0); // We should keep referenced tx files @@ -805,9 +803,9 @@ mod tests { let before_count = fixture.count_files().await.unwrap(); - // 3 versions (plus one extra latest.manifest) + // 3 versions assert_eq!(before_count.num_data_files, 3); - assert_eq!(before_count.num_manifest_files, 4); + assert_eq!(before_count.num_manifest_files, 3); let before = utc_now() - TimeDelta::try_days(7).unwrap(); let removed = fixture.run_cleanup(before).await.unwrap(); @@ -824,7 +822,7 @@ mod tests { // the latest version assert_eq!(after_count.num_data_files, 3); // Only the oldest manifest file should be removed - assert_eq!(after_count.num_manifest_files, 3); + assert_eq!(after_count.num_manifest_files, 2); assert_eq!(after_count.num_tx_files, 2); } @@ -942,7 +940,7 @@ mod tests { let before_count = fixture.count_files().await.unwrap(); assert_eq!(before_count.num_data_files, 2); - assert_eq!(before_count.num_manifest_files, 3); + assert_eq!(before_count.num_manifest_files, 2); // Not much time has passed but we can still delete the old manifest // and the related data files @@ -957,7 +955,7 @@ mod tests { ); assert_eq!(after_count.num_data_files, 1); - assert_eq!(after_count.num_manifest_files, 2); + assert_eq!(after_count.num_manifest_files, 1); } #[tokio::test] @@ -986,7 +984,7 @@ mod tests { let before_count = fixture.count_files().await.unwrap(); assert_eq!(before_count.num_data_files, 2); - assert_eq!(before_count.num_manifest_files, 2); + assert_eq!(before_count.num_manifest_files, 1); let before = utc_now(); let removed = fixture @@ -1025,8 +1023,8 @@ mod tests { assert_eq!(before_count.num_index_files, 1); // Two user data files assert_eq!(before_count.num_data_files, 2); - // Creating an index creates a new manifest so there are 4 total - assert_eq!(before_count.num_manifest_files, 4); + // Creating an index creates a new manifest so there are 3 total + assert_eq!(before_count.num_manifest_files, 3); let before = utc_now() - TimeDelta::try_days(8).unwrap(); let removed = fixture.run_cleanup(before).await.unwrap(); @@ -1040,7 +1038,7 @@ mod tests { assert_eq!(after_count.num_index_files, 0); assert_eq!(after_count.num_data_files, 1); - assert_eq!(after_count.num_manifest_files, 2); + assert_eq!(after_count.num_manifest_files, 1); assert_eq!(after_count.num_tx_files, 1); } @@ -1066,7 +1064,7 @@ mod tests { let before_count = fixture.count_files().await.unwrap(); assert_eq!(before_count.num_data_files, 3); assert_eq!(before_count.num_delete_files, 2); - assert_eq!(before_count.num_manifest_files, 6); + assert_eq!(before_count.num_manifest_files, 5); let before = utc_now() - TimeDelta::try_days(8).unwrap(); let removed = fixture.run_cleanup(before).await.unwrap(); @@ -1080,7 +1078,7 @@ mod tests { assert_eq!(after_count.num_data_files, 1); assert_eq!(after_count.num_delete_files, 1); - assert_eq!(after_count.num_manifest_files, 3); + assert_eq!(after_count.num_manifest_files, 2); assert_eq!(after_count.num_tx_files, 2); // Ensure we can still read the dataset @@ -1128,7 +1126,7 @@ mod tests { // This append will fail since the commit is blocked but it should have // deposited a data file assert_eq!(before_count.num_data_files, 2); - assert_eq!(before_count.num_manifest_files, 2); + assert_eq!(before_count.num_manifest_files, 1); assert_eq!(before_count.num_tx_files, 2); // All of our manifests are newer than the threshold but temp files @@ -1146,7 +1144,7 @@ mod tests { ); assert_eq!(after_count.num_data_files, 1); - assert_eq!(after_count.num_manifest_files, 2); + assert_eq!(after_count.num_manifest_files, 1); assert_eq!(after_count.num_tx_files, 1); } @@ -1198,7 +1196,7 @@ mod tests { let before_count = fixture.count_files().await.unwrap(); assert_eq!(before_count.num_data_files, 2); - assert_eq!(before_count.num_manifest_files, 3); + assert_eq!(before_count.num_manifest_files, 2); assert!(fixture .run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap()) @@ -1212,7 +1210,7 @@ mod tests { // has to finish the buffered tasks even if they are ignored. let mid_count = fixture.count_files().await.unwrap(); assert_eq!(mid_count.num_data_files, 1); - assert_eq!(mid_count.num_manifest_files, 3); + assert_eq!(mid_count.num_manifest_files, 2); fixture.unblock_delete_manifest(); @@ -1229,6 +1227,6 @@ mod tests { ); assert_eq!(after_count.num_data_files, 1); - assert_eq!(after_count.num_manifest_files, 2); + assert_eq!(after_count.num_manifest_files, 1); } } diff --git a/rust/lance/src/io/commit/dynamodb.rs b/rust/lance/src/io/commit/dynamodb.rs index 452240b91b..b5e8d714ed 100644 --- a/rust/lance/src/io/commit/dynamodb.rs +++ b/rust/lance/src/io/commit/dynamodb.rs @@ -46,7 +46,7 @@ mod test { use lance_table::io::commit::{ dynamodb::DynamoDBExternalManifestStore, external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore}, - latest_manifest_path, manifest_path, CommitHandler, + manifest_path, CommitHandler, }; fn read_params(handler: Arc) -> ReadParams { @@ -308,10 +308,6 @@ mod test { // manually simulate last version is out of sync let localfs: Box = Box::new(LocalFileSystem::new()); localfs.delete(&manifest_path(&ds.base, 6)).await.unwrap(); - localfs - .copy(&manifest_path(&ds.base, 5), &latest_manifest_path(&ds.base)) - .await - .unwrap(); // set the store back to dataset path with -{uuid} suffix let mut version_six = localfs diff --git a/rust/lance/src/io/commit/external_manifest.rs b/rust/lance/src/io/commit/external_manifest.rs index 127b456045..9c5a1278ea 100644 --- a/rust/lance/src/io/commit/external_manifest.rs +++ b/rust/lance/src/io/commit/external_manifest.rs @@ -13,7 +13,7 @@ mod test { use lance_table::io::commit::external_manifest::{ ExternalManifestCommitHandler, ExternalManifestStore, }; - use lance_table::io::commit::{latest_manifest_path, manifest_path, CommitHandler}; + use lance_table::io::commit::{manifest_path, CommitHandler}; use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; use object_store::local::LocalFileSystem; use snafu::{location, Location}; @@ -272,10 +272,7 @@ mod test { // manually simulate last version is out of sync let localfs: Box = Box::new(LocalFileSystem::new()); localfs.delete(&manifest_path(&ds.base, 6)).await.unwrap(); - localfs - .copy(&manifest_path(&ds.base, 5), &latest_manifest_path(&ds.base)) - .await - .unwrap(); + // set the store back to dataset path with -{uuid} suffix let mut version_six = localfs .list(Some(&ds.base))