Skip to content

Commit

Permalink
fix: stop reading latest manifest (#1365)
Browse files Browse the repository at this point in the history
As a quick fix to address safety/consistency issues, we will find the
latest manifest by scanning the `_versions` directory.

We continue to write the `_latest.manifest` file, so older readers are
still compatible.

Performance considerations: in object stores like S3, we can get 1,000
objects listed with 1 request. So right now this adds 1 IO for those
tables. Tables with more versions should probably have
`cleanup_old_versions` applied.

Fixes: #1356

We will optimize this in follow ups, as described in #1362
  • Loading branch information
wjones127 committed Oct 5, 2023
1 parent 0bed6a1 commit 299aab1
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 19 deletions.
28 changes: 18 additions & 10 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,24 +327,28 @@ impl Dataset {
.await?;

// Read expected manifest path for the dataset
let latest_manifest_path = object_store
let dataset_exists = match object_store
.commit_handler
.resolve_latest_version(&base, &object_store)
.await?;
let flag_dataset_exists = object_store.exists(&latest_manifest_path).await?;
.await
{
Ok(_) => true,
Err(Error::NotFound { .. }) => false,
Err(e) => return Err(e),
};

let (stream, schema) = reader_to_stream(batches)?;

// Running checks for the different write modes
// create + dataset already exists = error
if flag_dataset_exists && matches!(params.mode, WriteMode::Create) {
if dataset_exists && matches!(params.mode, WriteMode::Create) {
return Err(Error::DatasetAlreadyExists {
uri: uri.to_owned(),
});
}

// append + dataset doesn't already exists = warn + switch to create mode
if !flag_dataset_exists
if !dataset_exists
&& (matches!(params.mode, WriteMode::Append)
|| matches!(params.mode, WriteMode::Overwrite))
{
Expand Down Expand Up @@ -642,20 +646,24 @@ impl Dataset {
.await?;

// Test if the dataset exists
let latest_manifest = object_store
let dataset_exists = match object_store
.commit_handler
.resolve_latest_version(&base, &object_store)
.await?;
let flag_dataset_exists = object_store.exists(&latest_manifest).await?;
.await
{
Ok(_) => true,
Err(Error::NotFound { .. }) => false,
Err(e) => return Err(e),
};

if !flag_dataset_exists && !matches!(operation, Operation::Overwrite { .. }) {
if !dataset_exists && !matches!(operation, Operation::Overwrite { .. }) {
return Err(Error::DatasetNotFound {
path: base.to_string(),
source: "The dataset must already exist unless the operation is Overwrite".into(),
});
}

let dataset = if flag_dataset_exists {
let dataset = if dataset_exists {
Some(
Self::open_with_params(
base_uri,
Expand Down
46 changes: 41 additions & 5 deletions rust/lance/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,49 @@ fn manifest_path(base: &Path, version: u64) -> Path {
.child(format!("{version}.{MANIFEST_EXTENSION}"))
}

/// Get the latest manifest path
fn latest_manifest_path(base: &Path) -> Path {
base.child(LATEST_MANIFEST_NAME)
}

/// Get the latest manifest path
async fn current_manifest_path(object_store: &ObjectStore, base: &Path) -> Result<Path> {
// TODO: list gives us the size, so we could also return the size of the manifest.
// That avoids a HEAD request later.

// We use `list_with_delimiter` to avoid listing the contents of child directories.
let manifest_files = object_store
.inner
.list_with_delimiter(Some(&base.child(VERSIONS_DIR)))
.await?;

let current = manifest_files
.objects
.into_iter()
.map(|meta| meta.location)
.filter(|path| {
path.filename().is_some() && path.filename().unwrap().ends_with(MANIFEST_EXTENSION)
})
.filter_map(|path| {
let version = path
.filename()
.unwrap()
.split_once('.')
.and_then(|(version_str, _)| version_str.parse::<u64>().ok())?;
Some((version, path))
})
.max_by_key(|(version, _)| *version)
.map(|(_, path)| path);

if let Some(path) = current {
Ok(path)
} else {
Err(Error::NotFound {
uri: manifest_path(base, 1).to_string(),
location: location!(),
})
}
}

fn make_staging_manifest_path(base: &Path) -> Result<Path> {
let id = uuid::Uuid::new_v4().to_string();
Path::parse(format!("{base}-{id}")).map_err(|e| crate::Error::IO {
Expand Down Expand Up @@ -138,12 +176,10 @@ pub(crate) trait CommitHandler: Debug + Send + Sync {
async fn resolve_latest_version(
&self,
base_path: &Path,
_object_store: &ObjectStore,
object_store: &ObjectStore,
) -> std::result::Result<Path, crate::Error> {
// use the _latest.manifest file to get the latest version
// TODO: this isn't 100% safe, we should list the /_versions directory and find the latest version
// TODO: we need to pade 0's to the version number on the manifest file path
Ok(latest_manifest_path(base_path))
Ok(current_manifest_path(object_store, base_path).await?)
}

/// Get the path to a specific versioned manifest of a dataset at the base_path
Expand Down
8 changes: 4 additions & 4 deletions rust/lance/src/io/commit/external_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::io::ObjectStore;
use crate::{Error, Result};

use super::{
latest_manifest_path, make_staging_manifest_path, manifest_path, write_latest_manifest,
current_manifest_path, make_staging_manifest_path, manifest_path, write_latest_manifest,
MANIFEST_EXTENSION,
};

Expand Down Expand Up @@ -119,8 +119,8 @@ impl CommitHandler for ExternalManifestCommitHandler {
Ok(object_store_manifest_path)
}
// Dataset not found in the external store, this could be because the dataset did not
// use external store for commit before. In this case, we use the _latest.manifest file
None => Ok(latest_manifest_path(base_path)),
// use external store for commit before. In this case, we search for the latest manifest
None => current_manifest_path(object_store, base_path).await,
}
}

Expand Down Expand Up @@ -251,7 +251,7 @@ mod test {

use crate::{
dataset::{ReadParams, WriteMode, WriteParams},
io::object_store::ObjectStoreParams,
io::{commit::latest_manifest_path, object_store::ObjectStoreParams},
Dataset,
};

Expand Down

0 comments on commit 299aab1

Please sign in to comment.