Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(frontend): use iceberg-rust instead of icelake to enumerate iceberg files and snapshots. #17558

Merged
merged 6 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ CREATE SINK sink1 AS select * from mv1 WITH (
table.name = 't1',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand All @@ -45,7 +45,7 @@ CREATE SINK sink2 AS select * from mv1 WITH (
table.name = 't2',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/iceberg/test_case/cdc/load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ CREATE SINK s1 AS select * from products WITH (
database.name = 'demo_db',
table.name = 'demo_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ CREATE SINK s6 AS select * from mv6 WITH (
table.name = 'no_partition_append_only_table',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ WITH (
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 'no_partition_append_only_table',
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3,
database.name = 'demo_db',
table.name = 'no_partition_upsert_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ CREATE SINK s6 AS select * from mv6 WITH (
table.name = 'partition_append_only_table',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ WITH (
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 'partition_append_only_table',
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3,
database.name = 'demo_db',
table.name = 'partition_upsert_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ CREATE SINK s6 AS select * from mv6 WITH (
table.name = 'range_partition_append_only_table',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ WITH (
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 'range_partition_append_only_table',
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3,
database.name = 'demo_db',
table.name = 'range_partition_upsert_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH
connector = 'iceberg',
type = 'upsert',
primary_key = 'v1',
warehouse.path = 's3://iceberg',
warehouse.path = 's3a://iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
Expand All @@ -23,7 +23,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH
statement ok
CREATE SOURCE iceberg_demo_source WITH (
connector = 'iceberg',
warehouse.path = 's3://iceberg',
warehouse.path = 's3a://iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/iceberg-sink2/docker/hive/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type=append-only
force_append_only = true
catalog.type = hive
catalog.uri = thrift://metastore:9083
warehouse.path = s3://icebergdata/demo
warehouse.path = s3a://icebergdata/demo
s3.endpoint=http://minio-0:9301
s3.access.key = hummockadmin
s3.secret.key = hummockadmin
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/iceberg-sink2/docker/storage/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ s3.secret.key = hummockadmin
s3.region = ap-southeast-1
catalog.type = storage
catalog.name = demo
warehouse.path = s3://icebergdata/demo
warehouse.path = s3a://icebergdata/demo
database.name=s1
table.name=t1
2 changes: 1 addition & 1 deletion integration_tests/iceberg-source/docker/hive/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ port=4566
connector = iceberg
catalog.type = hive
catalog.uri = thrift://metastore:9083
warehouse.path = s3://icebergdata/demo
warehouse.path = s3a://icebergdata/demo
s3.endpoint=http://minio-0:9301
s3.access.key = hummockadmin
s3.secret.key = hummockadmin
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/iceberg-source/docker/storage/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ s3.access.key = hummockadmin
s3.secret.key = hummockadmin
s3.region = ap-southeast-1
catalog.type = storage
warehouse.path = s3://icebergdata/demo
warehouse.path = s3a://icebergdata/demo
database.name=s1
table.name=t1
2 changes: 1 addition & 1 deletion src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::executor::{DataChunk, Executor};
/// database_name: Some("demo_db".into()),
/// table_name: "demo_table".into(),
/// catalog_type: Some("storage".into()),
/// path: "s3a://hummock001/".into(),
/// path: "s3://hummock001/".into(),
/// endpoint: Some("http://127.0.0.1:9301".into()),
/// access_key: "hummockadmin".into(),
/// secret_key: "hummockadmin".into(),
Expand Down
83 changes: 45 additions & 38 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use std::collections::HashMap;

use anyhow::anyhow;
use async_trait::async_trait;
use icelake::types::DataContentType;
use futures::StreamExt;
use iceberg::spec::{DataContentType, ManifestList};
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::types::JsonbVal;
Expand Down Expand Up @@ -171,58 +172,64 @@ impl IcebergSplitEnumerator {
if batch_parallelism == 0 {
bail!("Batch parallelism is 0. Cannot split the iceberg files.");
}
let table = self.config.load_table().await?;
let table = self.config.load_table_v2().await?;
let snapshot_id = match time_traval_info {
Some(IcebergTimeTravelInfo::Version(version)) => {
let Some(snapshot) = table.current_table_metadata().snapshot(version) else {
let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
bail!("Cannot find the snapshot id in the iceberg table.");
};
snapshot.snapshot_id
snapshot.snapshot_id()
}
Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
match &table.current_table_metadata().snapshots {
Some(snapshots) => {
let snapshot = snapshots
.iter()
.filter(|snapshot| snapshot.timestamp_ms <= timestamp)
.max_by_key(|snapshot| snapshot.timestamp_ms);
match snapshot {
Some(snapshot) => snapshot.snapshot_id,
None => {
// convert unix time to human readable time
let time = chrono::DateTime::from_timestamp_millis(timestamp);
if time.is_some() {
bail!("Cannot find a snapshot older than {}", time.unwrap());
} else {
bail!("Cannot find a snapshot");
}
}
}
}
let snapshot = table
.metadata()
.snapshots()
.filter(|snapshot| snapshot.timestamp().timestamp_millis() <= timestamp)
.max_by_key(|snapshot| snapshot.timestamp().timestamp_millis());
match snapshot {
Some(snapshot) => snapshot.snapshot_id(),
None => {
bail!("Cannot find the snapshots in the iceberg table.");
// convert unix time to human readable time
let time = chrono::DateTime::from_timestamp_millis(timestamp);
if time.is_some() {
bail!("Cannot find a snapshot older than {}", time.unwrap());
} else {
bail!("Cannot find a snapshot");
}
}
}
}
None => match table.current_table_metadata().current_snapshot_id {
Some(snapshot_id) => snapshot_id,
None => match table.metadata().current_snapshot() {
Some(snapshot) => snapshot.snapshot_id(),
None => bail!("Cannot find the current snapshot id in the iceberg table."),
},
};
let mut files = vec![];
for file in table
.data_files_of_snapshot(
table
.current_table_metadata()
.snapshot(snapshot_id)
.expect("snapshot must exists"),
)
.await?
{
if file.content != DataContentType::Data {
bail!("Reading iceberg table with delete files is unsupported. Please try to compact the table first.");

let snapshot = table
.metadata()
.snapshot_by_id(snapshot_id)
.expect("snapshot must exist");

let manifest_list: ManifestList = snapshot
.load_manifest_list(table.file_io(), table.metadata())
.await
.map_err(|e| anyhow!(e))?;
for entry in manifest_list.entries() {
let manifest = entry
.load_manifest(table.file_io())
.await
.map_err(|e| anyhow!(e))?;
let mut manifest_entries_stream =
futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive()));

while let Some(manifest_entry) = manifest_entries_stream.next().await {
let file = manifest_entry.data_file();
if file.content_type() != DataContentType::Data {
bail!("Reading iceberg table with delete files is unsupported. Please try to compact the table first.");
}
files.push(file.file_path().to_string());
}
files.push(file.file_path);
}
let split_num = batch_parallelism;
// evenly split the files into splits based on the parallelism.
Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ fixedbitset = "0.5"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
iana-time-zone = "0.1"
iceberg = { workspace = true }
icelake = { workspace = true }
itertools = { workspace = true }
jsonbb = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use std::ops::Deref;

use anyhow::anyhow;
use icelake::Table;
use futures::StreamExt;
use iceberg::spec::ManifestList;
use iceberg::table::Table;
use risingwave_common::types::Fields;
use risingwave_connector::sink::iceberg::IcebergConfig;
use risingwave_connector::source::ConnectorProperties;
Expand Down Expand Up @@ -46,7 +48,7 @@ struct RwIcebergFiles {
/// Required when content is `EqualityDeletes` and should be null
/// otherwise. Fields with ids listed in this column must be present
/// in the delete file
pub equality_ids: Option<Vec<i32>>,
pub equality_ids: Vec<i32>,
/// ID representing sort order for this file.
///
/// If sort order ID is missing or unknown, then the order is assumed to
Expand Down Expand Up @@ -81,26 +83,37 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<RwIcebergFiles>> {
let config = ConnectorProperties::extract(source_props, false)?;
if let ConnectorProperties::Iceberg(iceberg_properties) = config {
let iceberg_config: IcebergConfig = iceberg_properties.to_iceberg_config();
let table: Table = iceberg_config.load_table().await?;
result.extend(
table
.current_data_files()
let table: Table = iceberg_config.load_table_v2().await?;
if let Some(snapshot) = table.metadata().current_snapshot() {
let manifest_list: ManifestList = snapshot
.load_manifest_list(table.file_io(), table.metadata())
.await
.map_err(|e| anyhow!(e))?
.iter()
.map(|file| RwIcebergFiles {
source_id: source.id as i32,
schema_name: schema_name.clone(),
source_name: source.name.clone(),
content: file.content as i32,
file_path: file.file_path.clone(),
file_format: file.file_format.to_string(),
record_count: file.record_count,
file_size_in_bytes: file.file_size_in_bytes,
equality_ids: file.equality_ids.clone(),
sort_order_id: file.sort_order_id,
}),
);
.map_err(|e| anyhow!(e))?;
for entry in manifest_list.entries() {
let manifest = entry
.load_manifest(table.file_io())
.await
.map_err(|e| anyhow!(e))?;
let mut manifest_entries_stream =
futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive()));

while let Some(manifest_entry) = manifest_entries_stream.next().await {
let file = manifest_entry.data_file();
result.push(RwIcebergFiles {
source_id: source.id as i32,
schema_name: schema_name.clone(),
source_name: source.name.clone(),
content: file.content_type() as i32,
file_path: file.file_path().to_string(),
file_format: file.file_format().to_string(),
record_count: file.record_count() as i64,
file_size_in_bytes: file.file_size_in_bytes() as i64,
equality_ids: file.equality_ids().to_vec(),
sort_order_id: file.sort_order_id(),
});
}
}
}
} else {
unreachable!()
}
Expand Down
Loading
Loading