Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Managed Iceberg] Allow updating partition specs at runtime #32879

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

ahmedabu98
Copy link
Contributor

Users may update the table's spec while a write pipeline is running (e.g. streaming). Sometimes, this update can happen after serializing DataFiles. The partition spec is not serializable so we don't preserve it, but we do keep its ID. We can use this information to fetch the correct partition spec and recreate the DataFile before appending to the table.

Fixes #32862

@ahmedabu98 ahmedabu98 changed the title [Managed Iceberg] Allowed updating partition specs at runtime [Managed Iceberg] Allow updating partition specs at runtime Oct 18, 2024
@ahmedabu98
Copy link
Contributor Author

R: @DanielMorales9

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@github-actions github-actions bot added the build label Oct 18, 2024
@DanielMorales9
Copy link

I've looked into the Iceberg codebase to check for any violations of snapshot consistency. Interestingly, it seems possible to commit data files produced with an old partitioning scheme (and therefore from a previous snapshot). Could this actually be a feature of partition evolution?! 🤔
Any thoughts?

@ahmedabu98
Copy link
Contributor Author

Interestingly, it seems possible to commit data files produced with an old partitioning scheme

Yep this is what I found empirically as well. The docs mention that each partition spec's metadata is kept separate from each other. This allows Iceberg to carry out a separate query plan for each partition spec, then combine all files afterwards.

Old data written with the old partition spec doesn't get affected. What I'm seeing is new data written with the old spec is also not affected. This is convenient for streaming writes because after updating the table's partition spec, there may be some inflight data that is still using the old partition spec. Iceberg will still accept that data when it gets committed and manage the metadata accordingly.

@DanielMorales9
Copy link

DanielMorales9 commented Oct 21, 2024

Yep, makes sense! 👍
I checked once again and I can confirm that the manifest files are stored in the manifest list along with their Partition Spec Id and the writing process groups Data Files by their Partition Spec Id. We can merge ✅

Example Manifest list:

{
  "manifest_path": "gs://<bucket>/<schema>/<table>/metadata/c4aa6dc6-5e0b-454c-b126-1d97d2f223f8-m1.avro",
  "manifest_length": 8363467,
  "partition_spec_id": 0,
  "content": 0,
  "sequence_number": 4814,
  "min_sequence_number": 4093,
  "added_snapshot_id": 481579842725173797,
  "added_data_files_count": 0,
  "existing_data_files_count": 24356,
  "deleted_data_files_count": 0,
  "added_rows_count": 0,
  "existing_rows_count": 785677,
  "deleted_rows_count": 0,
  "partitions": {
    "array": []
  }
}
{
  "manifest_path": "gs://<bucket>/<schema>/<table>//metadata/9753c692-b48e-49bb-9c27-713c8a41842e-m1.avro",
  "manifest_length": 8372685,
  "partition_spec_id": 0,
  "content": 0,
  "sequence_number": 4093,
  "min_sequence_number": 3557,
  "added_snapshot_id": 1992827298894205835,
  "added_data_files_count": 0,
  "existing_data_files_count": 19878,
  "deleted_data_files_count": 0,
  "added_rows_count": 0,
  "existing_rows_count": 1263244,
  "deleted_rows_count": 0,
  "partitions": {
    "array": []
  }
}

Copy link

@DanielMorales9 DanielMorales9 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@ahmedabu98
Copy link
Contributor Author

@DanielMorales9 sorry but I did some experimenting and found it needed some more work, namely what was missing:

  • Refreshing our cached tables, so that we can use the updated partition spec when writing
  • Handling batches that contain files using old and new spec. Performing a single commit on all data files is no longer an option because commits are limited to one partition spec. However, performing multiple commits is not safe (if a bundle fails between commits, we may end up with data duplication). Instead, we can create a manifest file for each spec and append them all using one commit

I fixed this up in the previous commit baba789, can you take another look?

// in the rare case we get a batch that contains multiple partition specs, we will group
// data into manifest files and append.
// note: either way, we must use a single commit operation for atomicity.
if (containsMultiplePartitionSpecs(fileWriteResults)) {
Copy link

@DanielMorales9 DanielMorales9 Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather distinguish between partitioned and unpartitioned table

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg allows us to use one commit operation that appends data files of different partitions. I think having a manifest file per partition might be a little overkill and unnecessary.

The only reason we need manifest files is to differentiate between two specs. Technically, this code path addresses an edge case, and realistically will only be used during the few moments after a table's spec gets updated -- in these moments, we will potentially have two specs in the same batch of files. So we'd need one manifest file for each spec.

I'm open to other perspectives though in case I'm missing something

Copy link

@DanielMorales9 DanielMorales9 Oct 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. I got confused with something else.

if (containsMultiplePartitionSpecs(fileWriteResults)) {
appendManifestFiles(table, fileWriteResults);
} else {
appendDataFiles(table, fileWriteResults);
Copy link

@DanielMorales9 DanielMorales9 Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we return the AppendFiles update and commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any particular reason? i'm leaning towards keeping it this way for simplicity

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep reduce duplicated code

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it's not a big duplication issue, so you can discard

// To handle this, we create a manifest file for each partition spec, and group data files
// accordingly.
// Afterward, we append all manifests using a single commit operation.
private void appendManifestFiles(Table table, Iterable<FileWriteResult> fileWriteResults)
Copy link

@DanielMorales9 DanielMorales9 Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a long method 😲

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug]: IcebergIO fails when the table's partition layout is changed at runtime
2 participants