From 7a696084c62d0fa2746b5e3d4e29ab86c5b4439b Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 26 Aug 2024 12:53:55 -0700 Subject: [PATCH] fix: cleanup external staging manifests --- .../src/io/commit/external_manifest.rs | 129 ++++++++++-------- rust/lance/src/io/commit/external_manifest.rs | 58 +++++++- 2 files changed, 125 insertions(+), 62 deletions(-) diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index f760259b3d..d205f99c1c 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -11,7 +11,7 @@ use async_trait::async_trait; use lance_core::{Error, Result}; use lance_io::object_store::{ObjectStore, ObjectStoreExt}; use log::warn; -use object_store::{path::Path, ObjectStore as OSObjectStore}; +use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore}; use snafu::{location, Location}; use super::{ @@ -76,6 +76,50 @@ pub struct ExternalManifestCommitHandler { pub external_manifest_store: Arc, } +impl ExternalManifestCommitHandler { + /// The manifest is considered committed once the staging manifest is writen + /// to object store and that path is committed to the external store. + /// + /// However, to fully complete this, the staging manifest should be materialized + /// into the final path, the final path should be committed to the external store + /// and the staging manifest should be deleted. These steps may be completed + /// by any number of readers or writers, so care should be taken to ensure + /// that the manifest is not lost nor any errors occur due to duplicate + /// operations. + async fn finalize_manifest( + &self, + base_path: &Path, + staging_manifest_path: &Path, + version: u64, + store: &dyn OSObjectStore, + ) -> std::result::Result { + // step 1: copy the manifest to the final location + let final_manifest_path = manifest_path(base_path, version); + match store + .copy(staging_manifest_path, &final_manifest_path) + .await + { + Ok(_) => {} + Err(ObjectStoreError::NotFound { .. }) => return Ok(final_manifest_path), // Another writer beat us to it. + Err(e) => return Err(e.into()), + }; + + // step 2: flip the external store to point to the final location + self.external_manifest_store + .put_if_exists(base_path.as_ref(), version, final_manifest_path.as_ref()) + .await?; + + // step 3: delete the staging manifest + match store.delete(staging_manifest_path).await { + Ok(_) => {} + Err(ObjectStoreError::NotFound { .. }) => {} + Err(e) => return Err(e.into()), + } + + Ok(final_manifest_path) + } +} + #[async_trait] impl CommitHandler for ExternalManifestCommitHandler { async fn resolve_latest_location( @@ -110,30 +154,10 @@ impl CommitHandler for ExternalManifestCommitHandler { if path.ends_with(&format!(".{MANIFEST_EXTENSION}")) { return Ok(Path::parse(path)?); } - // path is not finalized yet, we should try to finalize the path before loading - // if sync/finalize fails, return error - // - // step 1: copy path -> object_store_manifest_path - let object_store_manifest_path = manifest_path(base_path, version); - let manifest_path = Path::parse(path)?; - let staging = make_staging_manifest_path(&manifest_path)?; - // TODO: remove copy-rename once we upgrade object_store crate - object_store.copy(&manifest_path, &staging).await?; - object_store - .inner - .rename(&staging, &object_store_manifest_path) - .await?; - // step 2: update external store to finalize path - self.external_manifest_store - .put_if_exists( - base_path.as_ref(), - version, - object_store_manifest_path.as_ref(), - ) - .await?; - - Ok(object_store_manifest_path) + let staged_path = Path::parse(&path)?; + self.finalize_manifest(base_path, &staged_path, version, &object_store.inner) + .await } // Dataset not found in the external store, this could be because the dataset did not // use external store for commit before. In this case, we search for the latest manifest @@ -204,24 +228,8 @@ impl CommitHandler for ExternalManifestCommitHandler { return Ok(Path::parse(path)?); } - let manifest_path = manifest_path(base_path, version); - let staging_path = make_staging_manifest_path(&manifest_path)?; - - // step1: try to materialize the manifest from external store to object store - // multiple writers could try to copy at the same time, this is okay - // as the content is immutable and copy is atomic - // We can't use `copy_if_not_exists` here because not all store supports it - object_store - .copy(&Path::parse(path)?, &staging_path) - .await?; - object_store.rename(&staging_path, &manifest_path).await?; - - // finalize the external store - self.external_manifest_store - .put_if_exists(base_path.as_ref(), version, manifest_path.as_ref()) - .await?; - - Ok(manifest_path) + self.finalize_manifest(base_path, &Path::parse(&path)?, version, object_store) + .await } async fn commit( @@ -241,28 +249,29 @@ impl CommitHandler for ExternalManifestCommitHandler { manifest_writer(object_store, manifest, indices, &staging_path).await?; // step 2 & 3: Try to commit this version to external store, return err on failure - // TODO: add logic to clean up orphaned staged manifests, the ones that failed to commit to external store - // https://github.com/lancedb/lance/issues/1201 - self.external_manifest_store + let res = self + .external_manifest_store .put_if_not_exists(base_path.as_ref(), manifest.version, staging_path.as_ref()) .await - .map_err(|_| CommitError::CommitConflict {})?; + .map_err(|_| CommitError::CommitConflict {}); - // step 4: copy the manifest to the final location - object_store.inner.copy( - &staging_path, - &path, - ).await.map_err(|e| CommitError::OtherError( - Error::io( - format!("commit to external store is successful, but could not copy manifest to object store, with error: {}.", e), - location!(), - ) - ))?; + if res.is_err() { + // delete the staging manifest + match object_store.inner.delete(&staging_path).await { + Ok(_) => {} + Err(ObjectStoreError::NotFound { .. }) => {} + Err(e) => return Err(CommitError::OtherError(e.into())), + } + return res; + } - // step 5: flip the external store to point to the final location - self.external_manifest_store - .put_if_exists(base_path.as_ref(), manifest.version, path.as_ref()) - .await?; + self.finalize_manifest( + base_path, + &staging_path, + manifest.version, + &object_store.inner, + ) + .await?; Ok(()) } diff --git a/rust/lance/src/io/commit/external_manifest.rs b/rust/lance/src/io/commit/external_manifest.rs index 9c5a1278ea..adabbd33c6 100644 --- a/rust/lance/src/io/commit/external_manifest.rs +++ b/rust/lance/src/io/commit/external_manifest.rs @@ -16,6 +16,7 @@ mod test { use lance_table::io::commit::{manifest_path, CommitHandler}; use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; use object_store::local::LocalFileSystem; + use object_store::path::Path; use snafu::{location, Location}; use tokio::sync::Mutex; @@ -237,6 +238,32 @@ mod test { .await .unwrap(); assert_eq!(ds.count_rows(None).await.unwrap(), 60); + + // No temporary manifests left over + let manifest_path = dir.path().join("_versions/"); + let unexpected_entries = std::fs::read_dir(manifest_path) + .unwrap() + .filter(|entry| { + let entry = entry.as_ref().unwrap(); + !entry + .file_name() + .as_os_str() + .to_string_lossy() + .ends_with(".manifest") + }) + // There is a bug in local fs where concurrent commits can leave behind + // temporary `x.manifest#n` files. This might be a bug in object-store. + // TODO: fix this. + .filter(|entry| { + let entry = entry.as_ref().unwrap(); + !entry + .file_name() + .as_os_str() + .to_string_lossy() + .contains(".manifest#") + }) + .collect::>(); + assert!(unexpected_entries.is_empty(), "{:?}", unexpected_entries); } } @@ -271,8 +298,20 @@ mod test { // manually simulate last version is out of sync let localfs: Box = Box::new(LocalFileSystem::new()); - localfs.delete(&manifest_path(&ds.base, 6)).await.unwrap(); - + // Move version 6 to a temporary location, put that in the store. + let base_path = Path::parse(ds_uri).unwrap(); + let version_six_staging_location = + base_path.child(format!("6.manifest-{}", uuid::Uuid::new_v4())); + localfs + .rename(&manifest_path(&ds.base, 6), &version_six_staging_location) + .await + .unwrap(); + { + inner_store.lock().await.insert( + (ds.base.to_string(), 6), + version_six_staging_location.to_string(), + ); + } // set the store back to dataset path with -{uuid} suffix let mut version_six = localfs .list(Some(&ds.base)) @@ -310,5 +349,20 @@ mod test { let ds = DatasetBuilder::from_uri(ds_uri).load().await.unwrap(); assert_eq!(ds.version().version, 6); assert_eq!(ds.count_rows(None).await.unwrap(), 60); + + // No temporary manifests left over + let manifest_path = dir.path().join("_versions/"); + let unexpected_entries = std::fs::read_dir(manifest_path) + .unwrap() + .filter(|entry| { + let entry = entry.as_ref().unwrap(); + !entry + .file_name() + .as_os_str() + .to_string_lossy() + .ends_with(".manifest") + }) + .collect::>(); + assert!(unexpected_entries.is_empty(), "{:?}", unexpected_entries); } }