Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix several situations where we were incorrectly inferring the storage version #2756

Merged
merged 5 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions python/python/tests/test_fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,29 @@ def test_fragment_v2(tmp_path):
assert len(fragments) == 1
ds = lance.dataset(dataset_uri)
assert "minor_version: 3" in format_fragment(fragments[0], ds)


def test_mixed_fragment_versions(tmp_path):
data = pa.table({"a": range(800), "b": range(800)})

# Create empty v2 dataset
ds = lance.write_dataset(
data_obj=[],
uri=tmp_path / "dataset2",
schema=data.schema,
data_storage_version="stable",
)

# Add one v1 file and one v2 file
fragments = []
fragments.append(
lance.LanceFragment.create(ds.uri, data, data_storage_version="legacy")
)
fragments.append(
lance.LanceFragment.create(ds.uri, data, data_storage_version="stable")
)

# Attempt to commit
operation = lance.LanceOperation.Overwrite(ds.schema, fragments)
with pytest.raises(OSError, match="All data files must have the same version"):
lance.LanceDataset.commit(ds.uri, operation)
46 changes: 46 additions & 0 deletions python/python/tests/test_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,49 @@ def test_dataset_distributed_optimize(tmp_path: Path):
assert metrics.fragments_added == 1
# Compaction occurs in two transactions so it increments the version by 2.
assert dataset.version == 3


def test_migration_via_fragment_apis(tmp_path):
"""
This test is a regression of a case where we were using fragment APIs to migrate
from v1 to v2 but that left the dataset in a state where it had v2 files but wasn't
marked with the v2 writer flag.
"""
data = pa.table({"a": range(800), "b": range(800)})

# Create v1 dataset
ds = lance.write_dataset(
data, tmp_path / "dataset", max_rows_per_file=200, data_storage_version="legacy"
)

# Create empty v2 dataset
lance.write_dataset(
data_obj=[],
uri=tmp_path / "dataset2",
schema=ds.schema,
data_storage_version="stable",
)

# Add v2 files
fragments = []
for frag in ds.get_fragments():
reader = ds.scanner(fragments=[frag])
fragments.append(
lance.LanceFragment.create(
dataset_uri=tmp_path / "dataset2",
data=reader,
fragment_id=frag.fragment_id,
data_storage_version="stable",
)
)

# Commit
operation = lance.LanceOperation.Overwrite(ds.schema, fragments)
ds2 = lance.LanceDataset.commit(tmp_path / "dataset2", operation)

# Compact, dataset should still be v2
ds2.optimize.compact_files()

ds2 = lance.dataset(tmp_path / "dataset2")

assert ds2.data_storage_version == "2.0"
14 changes: 14 additions & 0 deletions rust/lance-encoding/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ impl LanceFileVersion {
// This will go away soon, but there are a few spots where the Legacy default doesn't make sense
Self::V2_0
}

pub fn try_from_major_minor(major: u32, minor: u32) -> Result<Self> {
match (major, minor) {
(0, 0) => Ok(Self::Legacy),
(0, 1) => Ok(Self::Legacy),
(0, 3) => Ok(Self::V2_0),
(2, 0) => Ok(Self::V2_0),
(2, 1) => Ok(Self::V2_1),
_ => Err(Error::InvalidInput {
source: format!("Unknown Lance storage version: {}.{}", major, minor).into(),
location: location!(),
}),
}
}
}

impl std::fmt::Display for LanceFileVersion {
Expand Down
28 changes: 28 additions & 0 deletions rust/lance-table/src/format/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use lance_core::Error;
use lance_file::format::{MAJOR_VERSION, MINOR_VERSION_NEXT};
use lance_file::version::LanceFileVersion;
use object_store::path::Path;
use serde::{Deserialize, Serialize};
use snafu::{location, Location};
Expand Down Expand Up @@ -301,6 +302,33 @@ impl Fragment {
// If any file in a fragment is legacy then all files in the fragment must be
self.files[0].is_legacy_file()
}

// Helper method to infer the Lance version from a set of fragments
pub fn try_infer_version(fragments: &[Self]) -> Result<LanceFileVersion> {
// Otherwise we need to check the actual file versions
// Determine version from first file
let sample_file = &fragments[0].files[0];
let file_version = LanceFileVersion::try_from_major_minor(
sample_file.file_major_version,
sample_file.file_minor_version,
)?;
// Ensure all files match
for frag in fragments {
for file in &frag.files {
let this_file_version = LanceFileVersion::try_from_major_minor(
file.file_major_version,
file.file_minor_version,
)?;
if file_version != this_file_version {
return Err(Error::invalid_input(
"All data files must have the same version",
location!(),
));
}
}
}
Ok(file_version)
}
}

impl TryFrom<pb::DataFragment> for Fragment {
Expand Down
12 changes: 9 additions & 3 deletions rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,16 @@ impl TryFrom<pb::Manifest> for Manifest {

let data_storage_format = match p.data_format {
None => {
if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
DataStorageFormat::new(LanceFileVersion::V2_0)
if fragments.is_empty() {
// No fragments to inspect, best we can do is look at writer flags
if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
DataStorageFormat::new(LanceFileVersion::Legacy)
} else {
DataStorageFormat::new(LanceFileVersion::Stable)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this backwards?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Let me make some migration tests for these cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I added some regression test cases (confirmed they detected this) and switched the logic (now tests pass)

} else {
DataStorageFormat::new(LanceFileVersion::Legacy)
// If there are fragments, they are a better indicator
DataStorageFormat::new(Fragment::try_infer_version(fragments.as_ref())?)
}
}
Some(format) => DataStorageFormat::from(format),
Expand Down
42 changes: 36 additions & 6 deletions rust/lance/src/dataset/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,31 @@ impl Transaction {
})
}

fn data_storage_format_from_files(
fragments: &[Fragment],
user_requested: Option<LanceFileVersion>,
) -> Result<DataStorageFormat> {
// If no files use user-requested or default
if fragments.is_empty() {
return Ok(user_requested
.map(DataStorageFormat::new)
.unwrap_or_default());
}
// Otherwise we need to check the actual file versions
let file_version = Fragment::try_infer_version(fragments)?;

// Ensure user-requested matches data files
if let Some(user_requested) = user_requested {
if user_requested != file_version {
return Err(Error::invalid_input(
format!("User requested data storage version ({}) does not match version in data files ({})", user_requested, file_version),
location!(),
));
}
}
Ok(DataStorageFormat::new(file_version))
}

pub(crate) async fn restore_old_manifest(
object_store: &ObjectStore,
commit_handler: &dyn CommitHandler,
Expand Down Expand Up @@ -568,21 +593,26 @@ impl Transaction {
// If a fragment was reserved then it may not belong at the end of the fragments list.
final_fragments.sort_by_key(|frag| frag.id);

let data_storage_format = match (&config.storage_format, config.use_legacy_format) {
(Some(storage_format), _) => storage_format.clone(),
(None, Some(true)) => DataStorageFormat::new(LanceFileVersion::Legacy),
(None, Some(false)) => DataStorageFormat::new(LanceFileVersion::V2_0),
(None, None) => DataStorageFormat::default(),
let user_requested_version = match (&config.storage_format, config.use_legacy_format) {
(Some(storage_format), _) => Some(storage_format.lance_file_version()?),
(None, Some(true)) => Some(LanceFileVersion::Legacy),
(None, Some(false)) => Some(LanceFileVersion::V2_0),
(None, None) => None,
};

let mut manifest = if let Some(current_manifest) = current_manifest {
let mut prev_manifest =
Manifest::new_from_previous(current_manifest, schema, Arc::new(final_fragments));
if matches!(self.operation, Operation::Overwrite { .. }) {
prev_manifest.data_storage_format = data_storage_format;
prev_manifest.data_storage_format = Self::data_storage_format_from_files(
&prev_manifest.fragments,
user_requested_version,
)?;
}
prev_manifest
} else {
let data_storage_format =
Self::data_storage_format_from_files(&final_fragments, user_requested_version)?;
Manifest::new(schema, Arc::new(final_fragments), data_storage_format)
};

Expand Down
Loading