From 4181adaba0d8d9d2731eb2722a8da74d7d2a2e79 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 22 Aug 2024 14:10:48 -0700 Subject: [PATCH 1/7] feat: eliminate latest manifest --- rust/lance-table/src/io/commit.rs | 21 ------------------- .../src/io/commit/external_manifest.rs | 7 +------ 2 files changed, 1 insertion(+), 27 deletions(-) diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 628ff004e3..97821fe5d6 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -226,21 +226,6 @@ fn make_staging_manifest_path(base: &Path) -> Result { }) } -async fn write_latest_manifest( - from_path: &Path, - base_path: &Path, - object_store: &dyn OSObjectStore, -) -> Result<()> { - let latest_path = latest_manifest_path(base_path); - let staging_path = make_staging_manifest_path(from_path)?; - object_store - .copy(from_path, &staging_path) - .await - .map_err(|err| CommitError::OtherError(err.into()))?; - object_store.rename(&staging_path, &latest_path).await?; - Ok(()) -} - #[cfg(feature = "dynamodb")] const DDB_URL_QUERY_KEY: &str = "ddbTableName"; @@ -538,8 +523,6 @@ impl CommitHandler for UnsafeCommitHandler { // Write the manifest naively manifest_writer(object_store, manifest, indices, &version_path).await?; - write_latest_manifest(&version_path, base_path, &object_store.inner).await?; - Ok(()) } } @@ -613,8 +596,6 @@ impl CommitHandler for T { } let res = manifest_writer(object_store, manifest, indices, &path).await; - write_latest_manifest(&path, base_path, &object_store.inner).await?; - // Release the lock lease.release(res.is_ok()).await?; @@ -694,8 +675,6 @@ impl CommitHandler for RenameCommitHandler { } }; - write_latest_manifest(&path, base_path, &object_store.inner).await?; - res } } diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index 7285518e95..bbbf9b97da 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -124,11 +124,7 @@ impl CommitHandler for ExternalManifestCommitHandler { .rename(&staging, &object_store_manifest_path) .await?; - // step 2: write _latest.manifest - write_latest_manifest(&manifest_path, base_path, object_store.inner.as_ref()) - .await?; - - // step 3: update external store to finalize path + // step 2: update external store to finalize path self.external_manifest_store .put_if_exists( base_path.as_ref(), @@ -264,7 +260,6 @@ impl CommitHandler for ExternalManifestCommitHandler { ))?; // update the _latest.manifest pointer - write_latest_manifest(&path, base_path, &object_store.inner).await?; // step 5: flip the external store to point to the final location self.external_manifest_store From 117cda8858f124c8de9e5002d82546a3a67b6851 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 22 Aug 2024 14:30:23 -0700 Subject: [PATCH 2/7] do cleanup --- .../src/io/commit/external_manifest.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index bbbf9b97da..4522954c03 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -11,12 +11,12 @@ use async_trait::async_trait; use lance_core::{Error, Result}; use lance_io::object_store::{ObjectStore, ObjectStoreExt}; use log::warn; -use object_store::{path::Path, ObjectStore as OSObjectStore}; +use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore}; use snafu::{location, Location}; use super::{ - current_manifest_path, make_staging_manifest_path, manifest_path, write_latest_manifest, - ManifestLocation, MANIFEST_EXTENSION, + current_manifest_path, make_staging_manifest_path, manifest_path, ManifestLocation, + MANIFEST_EXTENSION, }; use crate::format::{Index, Manifest}; use crate::io::commit::{CommitError, CommitHandler, ManifestWriter}; @@ -116,12 +116,8 @@ impl CommitHandler for ExternalManifestCommitHandler { // step 1: copy path -> object_store_manifest_path let object_store_manifest_path = manifest_path(base_path, version); let manifest_path = Path::parse(path)?; - let staging = make_staging_manifest_path(&manifest_path)?; - // TODO: remove copy-rename once we upgrade object_store crate - object_store.copy(&manifest_path, &staging).await?; object_store - .inner - .rename(&staging, &object_store_manifest_path) + .copy(&manifest_path, &object_store_manifest_path) .await?; // step 2: update external store to finalize path @@ -133,6 +129,13 @@ impl CommitHandler for ExternalManifestCommitHandler { ) .await?; + // step 3: delete the staging path + match object_store.inner.delete(&manifest_path).await { + Ok(_) => {} + Err(ObjectStoreError::NotFound { .. }) => {} + Err(e) => return Err(e.into()), + } + Ok(object_store_manifest_path) } // Dataset not found in the external store, this could be because the dataset did not From dac5cfc47050993946d7da088c2f5fdeb7a314f6 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 22 Aug 2024 14:43:41 -0700 Subject: [PATCH 3/7] cleanup --- .../src/io/commit/external_manifest.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index 4522954c03..a140eea20e 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -11,7 +11,7 @@ use async_trait::async_trait; use lance_core::{Error, Result}; use lance_io::object_store::{ObjectStore, ObjectStoreExt}; use log::warn; -use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore}; +use object_store::{path::Path, ObjectStore as OSObjectStore}; use snafu::{location, Location}; use super::{ @@ -116,8 +116,12 @@ impl CommitHandler for ExternalManifestCommitHandler { // step 1: copy path -> object_store_manifest_path let object_store_manifest_path = manifest_path(base_path, version); let manifest_path = Path::parse(path)?; + let staging = make_staging_manifest_path(&manifest_path)?; + // TODO: remove copy-rename once we upgrade object_store crate + object_store.copy(&manifest_path, &staging).await?; object_store - .copy(&manifest_path, &object_store_manifest_path) + .inner + .rename(&staging, &object_store_manifest_path) .await?; // step 2: update external store to finalize path @@ -129,13 +133,6 @@ impl CommitHandler for ExternalManifestCommitHandler { ) .await?; - // step 3: delete the staging path - match object_store.inner.delete(&manifest_path).await { - Ok(_) => {} - Err(ObjectStoreError::NotFound { .. }) => {} - Err(e) => return Err(e.into()), - } - Ok(object_store_manifest_path) } // Dataset not found in the external store, this could be because the dataset did not From c33affd8138bd3e6ddbf077ed5598600ad51fde7 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 26 Aug 2024 10:40:23 -0700 Subject: [PATCH 4/7] updates --- rust/lance-table/src/io/commit.rs | 5 --- .../src/io/commit/external_manifest.rs | 2 -- rust/lance/src/dataset/cleanup.rs | 34 +++++++++---------- rust/lance/src/io/commit/external_manifest.rs | 7 ++-- 4 files changed, 18 insertions(+), 30 deletions(-) diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 97821fe5d6..4896836753 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -57,7 +57,6 @@ use { use crate::format::{Index, Manifest}; -const LATEST_MANIFEST_NAME: &str = "_latest.manifest"; const VERSIONS_DIR: &str = "_versions"; const MANIFEST_EXTENSION: &str = "manifest"; @@ -75,10 +74,6 @@ pub fn manifest_path(base: &Path, version: u64) -> Path { .child(format!("{version}.{MANIFEST_EXTENSION}")) } -pub fn latest_manifest_path(base: &Path) -> Path { - base.child(LATEST_MANIFEST_NAME) -} - #[derive(Debug)] pub struct ManifestLocation { /// The version the manifest corresponds to. diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index a140eea20e..f760259b3d 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -259,8 +259,6 @@ impl CommitHandler for ExternalManifestCommitHandler { ) ))?; - // update the _latest.manifest pointer - // step 5: flip the external store to point to the final location self.external_manifest_store .put_if_exists(base_path.as_ref(), manifest.version, path.as_ref()) diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 243541b5dc..174e1b560c 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -783,8 +783,6 @@ mod tests { ); assert_lt!(after_count.num_tx_files, before_count.num_tx_files); - // The latest manifest should still be there, even if it is older than - // the given time. assert_gt!(after_count.num_manifest_files, 0); assert_gt!(after_count.num_data_files, 0); // We should keep referenced tx files @@ -805,9 +803,9 @@ mod tests { let before_count = fixture.count_files().await.unwrap(); - // 3 versions (plus one extra latest.manifest) + // 3 versions assert_eq!(before_count.num_data_files, 3); - assert_eq!(before_count.num_manifest_files, 4); + assert_eq!(before_count.num_manifest_files, 3); let before = utc_now() - TimeDelta::try_days(7).unwrap(); let removed = fixture.run_cleanup(before).await.unwrap(); @@ -824,7 +822,7 @@ mod tests { // the latest version assert_eq!(after_count.num_data_files, 3); // Only the oldest manifest file should be removed - assert_eq!(after_count.num_manifest_files, 3); + assert_eq!(after_count.num_manifest_files, 2); assert_eq!(after_count.num_tx_files, 2); } @@ -942,7 +940,7 @@ mod tests { let before_count = fixture.count_files().await.unwrap(); assert_eq!(before_count.num_data_files, 2); - assert_eq!(before_count.num_manifest_files, 3); + assert_eq!(before_count.num_manifest_files, 2); // Not much time has passed but we can still delete the old manifest // and the related data files @@ -957,7 +955,7 @@ mod tests { ); assert_eq!(after_count.num_data_files, 1); - assert_eq!(after_count.num_manifest_files, 2); + assert_eq!(after_count.num_manifest_files, 1); } #[tokio::test] @@ -986,7 +984,7 @@ mod tests { let before_count = fixture.count_files().await.unwrap(); assert_eq!(before_count.num_data_files, 2); - assert_eq!(before_count.num_manifest_files, 2); + assert_eq!(before_count.num_manifest_files, 1); let before = utc_now(); let removed = fixture @@ -1025,8 +1023,8 @@ mod tests { assert_eq!(before_count.num_index_files, 1); // Two user data files assert_eq!(before_count.num_data_files, 2); - // Creating an index creates a new manifest so there are 4 total - assert_eq!(before_count.num_manifest_files, 4); + // Creating an index creates a new manifest so there are 3 total + assert_eq!(before_count.num_manifest_files, 3); let before = utc_now() - TimeDelta::try_days(8).unwrap(); let removed = fixture.run_cleanup(before).await.unwrap(); @@ -1040,7 +1038,7 @@ mod tests { assert_eq!(after_count.num_index_files, 0); assert_eq!(after_count.num_data_files, 1); - assert_eq!(after_count.num_manifest_files, 2); + assert_eq!(after_count.num_manifest_files, 1); assert_eq!(after_count.num_tx_files, 1); } @@ -1066,7 +1064,7 @@ mod tests { let before_count = fixture.count_files().await.unwrap(); assert_eq!(before_count.num_data_files, 3); assert_eq!(before_count.num_delete_files, 2); - assert_eq!(before_count.num_manifest_files, 6); + assert_eq!(before_count.num_manifest_files, 5); let before = utc_now() - TimeDelta::try_days(8).unwrap(); let removed = fixture.run_cleanup(before).await.unwrap(); @@ -1080,7 +1078,7 @@ mod tests { assert_eq!(after_count.num_data_files, 1); assert_eq!(after_count.num_delete_files, 1); - assert_eq!(after_count.num_manifest_files, 3); + assert_eq!(after_count.num_manifest_files, 2); assert_eq!(after_count.num_tx_files, 2); // Ensure we can still read the dataset @@ -1128,7 +1126,7 @@ mod tests { // This append will fail since the commit is blocked but it should have // deposited a data file assert_eq!(before_count.num_data_files, 2); - assert_eq!(before_count.num_manifest_files, 2); + assert_eq!(before_count.num_manifest_files, 1); assert_eq!(before_count.num_tx_files, 2); // All of our manifests are newer than the threshold but temp files @@ -1146,7 +1144,7 @@ mod tests { ); assert_eq!(after_count.num_data_files, 1); - assert_eq!(after_count.num_manifest_files, 2); + assert_eq!(after_count.num_manifest_files, 1); assert_eq!(after_count.num_tx_files, 1); } @@ -1198,7 +1196,7 @@ mod tests { let before_count = fixture.count_files().await.unwrap(); assert_eq!(before_count.num_data_files, 2); - assert_eq!(before_count.num_manifest_files, 3); + assert_eq!(before_count.num_manifest_files, 2); assert!(fixture .run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap()) @@ -1212,7 +1210,7 @@ mod tests { // has to finish the buffered tasks even if they are ignored. let mid_count = fixture.count_files().await.unwrap(); assert_eq!(mid_count.num_data_files, 1); - assert_eq!(mid_count.num_manifest_files, 3); + assert_eq!(mid_count.num_manifest_files, 2); fixture.unblock_delete_manifest(); @@ -1229,6 +1227,6 @@ mod tests { ); assert_eq!(after_count.num_data_files, 1); - assert_eq!(after_count.num_manifest_files, 2); + assert_eq!(after_count.num_manifest_files, 1); } } diff --git a/rust/lance/src/io/commit/external_manifest.rs b/rust/lance/src/io/commit/external_manifest.rs index 127b456045..9c5a1278ea 100644 --- a/rust/lance/src/io/commit/external_manifest.rs +++ b/rust/lance/src/io/commit/external_manifest.rs @@ -13,7 +13,7 @@ mod test { use lance_table::io::commit::external_manifest::{ ExternalManifestCommitHandler, ExternalManifestStore, }; - use lance_table::io::commit::{latest_manifest_path, manifest_path, CommitHandler}; + use lance_table::io::commit::{manifest_path, CommitHandler}; use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; use object_store::local::LocalFileSystem; use snafu::{location, Location}; @@ -272,10 +272,7 @@ mod test { // manually simulate last version is out of sync let localfs: Box = Box::new(LocalFileSystem::new()); localfs.delete(&manifest_path(&ds.base, 6)).await.unwrap(); - localfs - .copy(&manifest_path(&ds.base, 5), &latest_manifest_path(&ds.base)) - .await - .unwrap(); + // set the store back to dataset path with -{uuid} suffix let mut version_six = localfs .list(Some(&ds.base)) From bb3c88beb4c3bff7d0a19fd36b8c52523af64407 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 26 Aug 2024 10:48:18 -0700 Subject: [PATCH 5/7] clippy --- rust/lance-table/src/io/commit.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 4896836753..5d4c8ab00d 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -651,7 +651,7 @@ impl CommitHandler for RenameCommitHandler { // Write the manifest to the temporary path manifest_writer(object_store, manifest, indices, &tmp_path).await?; - let res = match object_store + match object_store .inner .rename_if_not_exists(&tmp_path, &path) .await @@ -668,9 +668,7 @@ impl CommitHandler for RenameCommitHandler { // Something else went wrong return Err(CommitError::OtherError(e.into())); } - }; - - res + } } } From 55e67027893bb2248375130a247345a1d6f9f9c5 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 26 Aug 2024 11:14:10 -0700 Subject: [PATCH 6/7] fix dynamodb test --- rust/lance/src/io/commit/dynamodb.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/rust/lance/src/io/commit/dynamodb.rs b/rust/lance/src/io/commit/dynamodb.rs index 452240b91b..0b38dc76d1 100644 --- a/rust/lance/src/io/commit/dynamodb.rs +++ b/rust/lance/src/io/commit/dynamodb.rs @@ -11,7 +11,7 @@ // The tests are linux only because // GHA Mac runner doesn't have docker, which is required to run dynamodb-local // Windows FS can't handle concurrent copy -#[cfg(all(test, target_os = "linux", feature = "dynamodb_tests"))] +#[cfg(all(test, feature = "dynamodb_tests"))] mod test { macro_rules! base_uri { () => { @@ -46,7 +46,7 @@ mod test { use lance_table::io::commit::{ dynamodb::DynamoDBExternalManifestStore, external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore}, - latest_manifest_path, manifest_path, CommitHandler, + manifest_path, CommitHandler, }; fn read_params(handler: Arc) -> ReadParams { @@ -308,10 +308,6 @@ mod test { // manually simulate last version is out of sync let localfs: Box = Box::new(LocalFileSystem::new()); localfs.delete(&manifest_path(&ds.base, 6)).await.unwrap(); - localfs - .copy(&manifest_path(&ds.base, 5), &latest_manifest_path(&ds.base)) - .await - .unwrap(); // set the store back to dataset path with -{uuid} suffix let mut version_six = localfs From ebb4f75e1b18de7ddc40153976a516ffdb609cd8 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 26 Aug 2024 11:31:18 -0700 Subject: [PATCH 7/7] add guard --- rust/lance/src/io/commit/dynamodb.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/io/commit/dynamodb.rs b/rust/lance/src/io/commit/dynamodb.rs index 0b38dc76d1..b5e8d714ed 100644 --- a/rust/lance/src/io/commit/dynamodb.rs +++ b/rust/lance/src/io/commit/dynamodb.rs @@ -11,7 +11,7 @@ // The tests are linux only because // GHA Mac runner doesn't have docker, which is required to run dynamodb-local // Windows FS can't handle concurrent copy -#[cfg(all(test, feature = "dynamodb_tests"))] +#[cfg(all(test, target_os = "linux", feature = "dynamodb_tests"))] mod test { macro_rules! base_uri { () => {