Skip to content

Commit

Permalink
fix: cleanup external staging manifests
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Aug 26, 2024
1 parent ebb4f75 commit 7a69608
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 62 deletions.
129 changes: 69 additions & 60 deletions rust/lance-table/src/io/commit/external_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use async_trait::async_trait;
use lance_core::{Error, Result};
use lance_io::object_store::{ObjectStore, ObjectStoreExt};
use log::warn;
use object_store::{path::Path, ObjectStore as OSObjectStore};
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
use snafu::{location, Location};

use super::{
Expand Down Expand Up @@ -76,6 +76,50 @@ pub struct ExternalManifestCommitHandler {
pub external_manifest_store: Arc<dyn ExternalManifestStore>,
}

impl ExternalManifestCommitHandler {
/// The manifest is considered committed once the staging manifest is writen
/// to object store and that path is committed to the external store.
///
/// However, to fully complete this, the staging manifest should be materialized
/// into the final path, the final path should be committed to the external store
/// and the staging manifest should be deleted. These steps may be completed
/// by any number of readers or writers, so care should be taken to ensure
/// that the manifest is not lost nor any errors occur due to duplicate
/// operations.
async fn finalize_manifest(
&self,
base_path: &Path,
staging_manifest_path: &Path,
version: u64,
store: &dyn OSObjectStore,
) -> std::result::Result<Path, Error> {
// step 1: copy the manifest to the final location
let final_manifest_path = manifest_path(base_path, version);
match store
.copy(staging_manifest_path, &final_manifest_path)
.await
{
Ok(_) => {}
Err(ObjectStoreError::NotFound { .. }) => return Ok(final_manifest_path), // Another writer beat us to it.
Err(e) => return Err(e.into()),
};

// step 2: flip the external store to point to the final location
self.external_manifest_store
.put_if_exists(base_path.as_ref(), version, final_manifest_path.as_ref())
.await?;

// step 3: delete the staging manifest
match store.delete(staging_manifest_path).await {
Ok(_) => {}
Err(ObjectStoreError::NotFound { .. }) => {}
Err(e) => return Err(e.into()),
}

Ok(final_manifest_path)
}
}

#[async_trait]
impl CommitHandler for ExternalManifestCommitHandler {
async fn resolve_latest_location(
Expand Down Expand Up @@ -110,30 +154,10 @@ impl CommitHandler for ExternalManifestCommitHandler {
if path.ends_with(&format!(".{MANIFEST_EXTENSION}")) {
return Ok(Path::parse(path)?);
}
// path is not finalized yet, we should try to finalize the path before loading
// if sync/finalize fails, return error
//
// step 1: copy path -> object_store_manifest_path
let object_store_manifest_path = manifest_path(base_path, version);
let manifest_path = Path::parse(path)?;
let staging = make_staging_manifest_path(&manifest_path)?;
// TODO: remove copy-rename once we upgrade object_store crate
object_store.copy(&manifest_path, &staging).await?;
object_store
.inner
.rename(&staging, &object_store_manifest_path)
.await?;

// step 2: update external store to finalize path
self.external_manifest_store
.put_if_exists(
base_path.as_ref(),
version,
object_store_manifest_path.as_ref(),
)
.await?;

Ok(object_store_manifest_path)
let staged_path = Path::parse(&path)?;
self.finalize_manifest(base_path, &staged_path, version, &object_store.inner)
.await
}
// Dataset not found in the external store, this could be because the dataset did not
// use external store for commit before. In this case, we search for the latest manifest
Expand Down Expand Up @@ -204,24 +228,8 @@ impl CommitHandler for ExternalManifestCommitHandler {
return Ok(Path::parse(path)?);
}

let manifest_path = manifest_path(base_path, version);
let staging_path = make_staging_manifest_path(&manifest_path)?;

// step1: try to materialize the manifest from external store to object store
// multiple writers could try to copy at the same time, this is okay
// as the content is immutable and copy is atomic
// We can't use `copy_if_not_exists` here because not all store supports it
object_store
.copy(&Path::parse(path)?, &staging_path)
.await?;
object_store.rename(&staging_path, &manifest_path).await?;

// finalize the external store
self.external_manifest_store
.put_if_exists(base_path.as_ref(), version, manifest_path.as_ref())
.await?;

Ok(manifest_path)
self.finalize_manifest(base_path, &Path::parse(&path)?, version, object_store)
.await
}

async fn commit(
Expand All @@ -241,28 +249,29 @@ impl CommitHandler for ExternalManifestCommitHandler {
manifest_writer(object_store, manifest, indices, &staging_path).await?;

// step 2 & 3: Try to commit this version to external store, return err on failure
// TODO: add logic to clean up orphaned staged manifests, the ones that failed to commit to external store
// https://github.com/lancedb/lance/issues/1201
self.external_manifest_store
let res = self
.external_manifest_store
.put_if_not_exists(base_path.as_ref(), manifest.version, staging_path.as_ref())
.await
.map_err(|_| CommitError::CommitConflict {})?;
.map_err(|_| CommitError::CommitConflict {});

// step 4: copy the manifest to the final location
object_store.inner.copy(
&staging_path,
&path,
).await.map_err(|e| CommitError::OtherError(
Error::io(
format!("commit to external store is successful, but could not copy manifest to object store, with error: {}.", e),
location!(),
)
))?;
if res.is_err() {
// delete the staging manifest
match object_store.inner.delete(&staging_path).await {
Ok(_) => {}
Err(ObjectStoreError::NotFound { .. }) => {}
Err(e) => return Err(CommitError::OtherError(e.into())),
}
return res;
}

// step 5: flip the external store to point to the final location
self.external_manifest_store
.put_if_exists(base_path.as_ref(), manifest.version, path.as_ref())
.await?;
self.finalize_manifest(
base_path,
&staging_path,
manifest.version,
&object_store.inner,
)
.await?;

Ok(())
}
Expand Down
58 changes: 56 additions & 2 deletions rust/lance/src/io/commit/external_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod test {
use lance_table::io::commit::{manifest_path, CommitHandler};
use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use snafu::{location, Location};
use tokio::sync::Mutex;

Expand Down Expand Up @@ -237,6 +238,32 @@ mod test {
.await
.unwrap();
assert_eq!(ds.count_rows(None).await.unwrap(), 60);

// No temporary manifests left over
let manifest_path = dir.path().join("_versions/");
let unexpected_entries = std::fs::read_dir(manifest_path)
.unwrap()
.filter(|entry| {
let entry = entry.as_ref().unwrap();
!entry
.file_name()
.as_os_str()
.to_string_lossy()
.ends_with(".manifest")
})
// There is a bug in local fs where concurrent commits can leave behind
// temporary `x.manifest#n` files. This might be a bug in object-store.
// TODO: fix this.
.filter(|entry| {
let entry = entry.as_ref().unwrap();
!entry
.file_name()
.as_os_str()
.to_string_lossy()
.contains(".manifest#")
})
.collect::<Vec<_>>();
assert!(unexpected_entries.is_empty(), "{:?}", unexpected_entries);
}
}

Expand Down Expand Up @@ -271,8 +298,20 @@ mod test {

// manually simulate last version is out of sync
let localfs: Box<dyn object_store::ObjectStore> = Box::new(LocalFileSystem::new());
localfs.delete(&manifest_path(&ds.base, 6)).await.unwrap();

// Move version 6 to a temporary location, put that in the store.
let base_path = Path::parse(ds_uri).unwrap();
let version_six_staging_location =
base_path.child(format!("6.manifest-{}", uuid::Uuid::new_v4()));
localfs
.rename(&manifest_path(&ds.base, 6), &version_six_staging_location)
.await
.unwrap();
{
inner_store.lock().await.insert(
(ds.base.to_string(), 6),
version_six_staging_location.to_string(),
);
}
// set the store back to dataset path with -{uuid} suffix
let mut version_six = localfs
.list(Some(&ds.base))
Expand Down Expand Up @@ -310,5 +349,20 @@ mod test {
let ds = DatasetBuilder::from_uri(ds_uri).load().await.unwrap();
assert_eq!(ds.version().version, 6);
assert_eq!(ds.count_rows(None).await.unwrap(), 60);

// No temporary manifests left over
let manifest_path = dir.path().join("_versions/");
let unexpected_entries = std::fs::read_dir(manifest_path)
.unwrap()
.filter(|entry| {
let entry = entry.as_ref().unwrap();
!entry
.file_name()
.as_os_str()
.to_string_lossy()
.ends_with(".manifest")
})
.collect::<Vec<_>>();
assert!(unexpected_entries.is_empty(), "{:?}", unexpected_entries);
}
}

0 comments on commit 7a69608

Please sign in to comment.