Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: stop reading latest manifest #1365

Merged
merged 3 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,24 +327,28 @@ impl Dataset {
.await?;

// Read expected manifest path for the dataset
let latest_manifest_path = object_store
let dataset_exists = match object_store
.commit_handler
.resolve_latest_version(&base, &object_store)
.await?;
let flag_dataset_exists = object_store.exists(&latest_manifest_path).await?;
.await
{
Ok(_) => true,
Err(Error::NotFound { .. }) => false,
Err(e) => return Err(e),
};

let (stream, schema) = reader_to_stream(batches)?;

// Running checks for the different write modes
// create + dataset already exists = error
if flag_dataset_exists && matches!(params.mode, WriteMode::Create) {
if dataset_exists && matches!(params.mode, WriteMode::Create) {
return Err(Error::DatasetAlreadyExists {
uri: uri.to_owned(),
});
}

// append + dataset doesn't already exists = warn + switch to create mode
if !flag_dataset_exists
if !dataset_exists
&& (matches!(params.mode, WriteMode::Append)
|| matches!(params.mode, WriteMode::Overwrite))
{
Expand Down Expand Up @@ -642,20 +646,24 @@ impl Dataset {
.await?;

// Test if the dataset exists
let latest_manifest = object_store
let dataset_exists = match object_store
.commit_handler
.resolve_latest_version(&base, &object_store)
.await?;
let flag_dataset_exists = object_store.exists(&latest_manifest).await?;
.await
{
Ok(_) => true,
Err(Error::NotFound { .. }) => false,
Err(e) => return Err(e),
};

if !flag_dataset_exists && !matches!(operation, Operation::Overwrite { .. }) {
if !dataset_exists && !matches!(operation, Operation::Overwrite { .. }) {
return Err(Error::DatasetNotFound {
path: base.to_string(),
source: "The dataset must already exist unless the operation is Overwrite".into(),
});
}

let dataset = if flag_dataset_exists {
let dataset = if dataset_exists {
Some(
Self::open_with_params(
base_uri,
Expand Down
46 changes: 41 additions & 5 deletions rust/lance/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,49 @@ fn manifest_path(base: &Path, version: u64) -> Path {
.child(format!("{version}.{MANIFEST_EXTENSION}"))
}

/// Get the latest manifest path
fn latest_manifest_path(base: &Path) -> Path {
base.child(LATEST_MANIFEST_NAME)
}

/// Get the latest manifest path
async fn current_manifest_path(object_store: &ObjectStore, base: &Path) -> Result<Path> {
// TODO: list gives us the size, so we could also return the size of the manifest.
// That avoids a HEAD request later.

// We use `list_with_delimiter` to avoid listing the contents of child directories.
let manifest_files = object_store
.inner
.list_with_delimiter(Some(&base.child(VERSIONS_DIR)))
.await?;

let current = manifest_files
.objects
.into_iter()
.map(|meta| meta.location)
.filter(|path| {
path.filename().is_some() && path.filename().unwrap().ends_with(MANIFEST_EXTENSION)
})
.filter_map(|path| {
let version = path
.filename()
.unwrap()
.split_once('.')
.and_then(|(version_str, _)| version_str.parse::<u64>().ok())?;
Some((version, path))
})
.max_by_key(|(version, _)| *version)
.map(|(_, path)| path);

if let Some(path) = current {
Ok(path)
} else {
Err(Error::NotFound {
uri: manifest_path(base, 1).to_string(),
location: location!(),
})
}
}

fn make_staging_manifest_path(base: &Path) -> Result<Path> {
let id = uuid::Uuid::new_v4().to_string();
Path::parse(format!("{base}-{id}")).map_err(|e| crate::Error::IO {
Expand Down Expand Up @@ -138,12 +176,10 @@ pub(crate) trait CommitHandler: Debug + Send + Sync {
async fn resolve_latest_version(
&self,
base_path: &Path,
_object_store: &ObjectStore,
object_store: &ObjectStore,
) -> std::result::Result<Path, crate::Error> {
// use the _latest.manifest file to get the latest version
// TODO: this isn't 100% safe, we should list the /_versions directory and find the latest version
// TODO: we need to pade 0's to the version number on the manifest file path
Ok(latest_manifest_path(base_path))
Ok(current_manifest_path(object_store, base_path).await?)
}

/// Get the path to a specific versioned manifest of a dataset at the base_path
Expand Down
8 changes: 4 additions & 4 deletions rust/lance/src/io/commit/external_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::io::ObjectStore;
use crate::{Error, Result};

use super::{
latest_manifest_path, make_staging_manifest_path, manifest_path, write_latest_manifest,
current_manifest_path, make_staging_manifest_path, manifest_path, write_latest_manifest,
MANIFEST_EXTENSION,
};

Expand Down Expand Up @@ -119,8 +119,8 @@ impl CommitHandler for ExternalManifestCommitHandler {
Ok(object_store_manifest_path)
}
// Dataset not found in the external store, this could be because the dataset did not
// use external store for commit before. In this case, we use the _latest.manifest file
None => Ok(latest_manifest_path(base_path)),
// use external store for commit before. In this case, we search for the latest manifest
None => current_manifest_path(object_store, base_path).await,
}
}

Expand Down Expand Up @@ -251,7 +251,7 @@ mod test {

use crate::{
dataset::{ReadParams, WriteMode, WriteParams},
io::object_store::ObjectStoreParams,
io::{commit::latest_manifest_path, object_store::ObjectStoreParams},
Dataset,
};

Expand Down