diff --git a/Cargo.lock b/Cargo.lock index 9197e73c2708..27c18e30a344 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11483,6 +11483,7 @@ dependencies = [ "futures", "futures-async-stream", "iana-time-zone", + "iceberg", "icelake", "itertools 0.12.1", "jsonbb", diff --git a/e2e_test/iceberg/test_case/append_only_with_checkpoint_interval.slt b/e2e_test/iceberg/test_case/append_only_with_checkpoint_interval.slt index 7d1f9254e468..0a8a042a86ac 100644 --- a/e2e_test/iceberg/test_case/append_only_with_checkpoint_interval.slt +++ b/e2e_test/iceberg/test_case/append_only_with_checkpoint_interval.slt @@ -29,7 +29,7 @@ CREATE SINK sink1 AS select * from mv1 WITH ( table.name = 't1', catalog.name = 'demo', catalog.type = 'storage', - warehouse.path = 's3://icebergdata/demo', + warehouse.path = 's3a://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', s3.region = 'us-east-1', s3.access.key = 'hummockadmin', @@ -45,7 +45,7 @@ CREATE SINK sink2 AS select * from mv1 WITH ( table.name = 't2', catalog.name = 'demo', catalog.type = 'storage', - warehouse.path = 's3://icebergdata/demo', + warehouse.path = 's3a://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', s3.region = 'us-east-1', s3.access.key = 'hummockadmin', diff --git a/e2e_test/iceberg/test_case/cdc/load.slt b/e2e_test/iceberg/test_case/cdc/load.slt index e9f1d815d20c..df0c31999037 100644 --- a/e2e_test/iceberg/test_case/cdc/load.slt +++ b/e2e_test/iceberg/test_case/cdc/load.slt @@ -28,7 +28,7 @@ CREATE SINK s1 AS select * from products WITH ( database.name = 'demo_db', table.name = 'demo_table', catalog.type = 'storage', - warehouse.path = 's3://icebergdata/demo', + warehouse.path = 's3a://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', s3.region = 'us-east-1', s3.access.key = 'hummockadmin', diff --git a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt index 505bf0ba7d21..658a9ae8563e 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt @@ -28,7 +28,7 @@ CREATE SINK s6 AS select * from mv6 WITH ( table.name = 'no_partition_append_only_table', catalog.name = 'demo', catalog.type = 'storage', - warehouse.path = 's3://icebergdata/demo', + warehouse.path = 's3a://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', s3.region = 'us-east-1', s3.access.key = 'hummockadmin', diff --git a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table_verify.slt b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table_verify.slt index 74629053344b..a78978236726 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table_verify.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table_verify.slt @@ -7,7 +7,7 @@ WITH ( s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', catalog.type = 'storage', - warehouse.path = 's3://icebergdata/demo', + warehouse.path = 's3a://icebergdata/demo', database.name = 'demo_db', table.name = 'no_partition_append_only_table', ); diff --git a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_upsert_table.slt b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_upsert_table.slt index b59c7013290e..1ff15295c745 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_upsert_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_upsert_table.slt @@ -16,7 +16,7 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, database.name = 'demo_db', table.name = 'no_partition_upsert_table', catalog.type = 'storage', - warehouse.path = 's3://icebergdata/demo', + warehouse.path = 's3a://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', s3.region = 'us-east-1', s3.access.key = 'hummockadmin', diff --git a/e2e_test/iceberg/test_case/iceberg_sink_partition_append_only_table.slt b/e2e_test/iceberg/test_case/iceberg_sink_partition_append_only_table.slt index da55b35626ff..bd328f7834d6 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_partition_append_only_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_partition_append_only_table.slt @@ -28,7 +28,7 @@ CREATE SINK s6 AS select * from mv6 WITH ( table.name = 'partition_append_only_table', catalog.name = 'demo', catalog.type = 'storage', - warehouse.path = 's3://icebergdata/demo', + warehouse.path = 's3a://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', s3.region = 'us-east-1', s3.access.key = 'hummockadmin', diff --git a/e2e_test/iceberg/test_case/iceberg_sink_partition_append_only_table_verify.slt b/e2e_test/iceberg/test_case/iceberg_sink_partition_append_only_table_verify.slt index 4e6beb709f92..58d848b54cb2 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_partition_append_only_table_verify.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_partition_append_only_table_verify.slt @@ -7,7 +7,7 @@ WITH ( s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', catalog.type = 'storage', - warehouse.path = 's3://icebergdata/demo', + warehouse.path = 's3a://icebergdata/demo', database.name = 'demo_db', table.name = 'partition_append_only_table', ); diff --git a/e2e_test/iceberg/test_case/iceberg_sink_partition_upsert_table.slt b/e2e_test/iceberg/test_case/iceberg_sink_partition_upsert_table.slt index 20800c1b4787..1435e03877f4 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_partition_upsert_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_partition_upsert_table.slt @@ -16,7 +16,7 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, database.name = 'demo_db', table.name = 'partition_upsert_table', catalog.type = 'storage', - warehouse.path = 's3://icebergdata/demo', + warehouse.path = 's3a://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', s3.region = 'us-east-1', s3.access.key = 'hummockadmin', diff --git a/e2e_test/iceberg/test_case/iceberg_sink_range_partition_append_only_table.slt b/e2e_test/iceberg/test_case/iceberg_sink_range_partition_append_only_table.slt index 2a8d562b6606..e71fe49f31f9 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_range_partition_append_only_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_range_partition_append_only_table.slt @@ -28,7 +28,7 @@ CREATE SINK s6 AS select * from mv6 WITH ( table.name = 'range_partition_append_only_table', catalog.name = 'demo', catalog.type = 'storage', - warehouse.path = 's3://icebergdata/demo', + warehouse.path = 's3a://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', s3.region = 'us-east-1', s3.access.key = 'hummockadmin', diff --git a/e2e_test/iceberg/test_case/iceberg_sink_range_partition_append_only_table_verify.slt b/e2e_test/iceberg/test_case/iceberg_sink_range_partition_append_only_table_verify.slt index 9d03d99aada1..eaa6b3d95eb6 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_range_partition_append_only_table_verify.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_range_partition_append_only_table_verify.slt @@ -7,7 +7,7 @@ WITH ( s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', catalog.type = 'storage', - warehouse.path = 's3://icebergdata/demo', + warehouse.path = 's3a://icebergdata/demo', database.name = 'demo_db', table.name = 'range_partition_append_only_table', ); diff --git a/e2e_test/iceberg/test_case/iceberg_sink_range_partition_upsert_table.slt b/e2e_test/iceberg/test_case/iceberg_sink_range_partition_upsert_table.slt index 6e963b0e1661..daf1ceaa40bf 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_range_partition_upsert_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_range_partition_upsert_table.slt @@ -16,7 +16,7 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, database.name = 'demo_db', table.name = 'range_partition_upsert_table', catalog.type = 'storage', - warehouse.path = 's3://icebergdata/demo', + warehouse.path = 's3a://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', s3.region = 'us-east-1', s3.access.key = 'hummockadmin', diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index 05cf2d1108b3..a7f7567075e9 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -9,7 +9,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH connector = 'iceberg', type = 'upsert', primary_key = 'v1', - warehouse.path = 's3://iceberg', + warehouse.path = 's3a://iceberg', s3.endpoint = 'http://127.0.0.1:9301', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', @@ -23,7 +23,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH statement ok CREATE SOURCE iceberg_demo_source WITH ( connector = 'iceberg', - warehouse.path = 's3://iceberg', + warehouse.path = 's3a://iceberg', s3.endpoint = 'http://127.0.0.1:9301', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', diff --git a/integration_tests/iceberg-sink2/docker/hive/config.ini b/integration_tests/iceberg-sink2/docker/hive/config.ini index d644f3c0d46a..55a03c3f9475 100644 --- a/integration_tests/iceberg-sink2/docker/hive/config.ini +++ b/integration_tests/iceberg-sink2/docker/hive/config.ini @@ -10,7 +10,7 @@ type=append-only force_append_only = true catalog.type = hive catalog.uri = thrift://metastore:9083 -warehouse.path = s3://icebergdata/demo +warehouse.path = s3a://icebergdata/demo s3.endpoint=http://minio-0:9301 s3.access.key = hummockadmin s3.secret.key = hummockadmin diff --git a/integration_tests/iceberg-sink2/docker/storage/config.ini b/integration_tests/iceberg-sink2/docker/storage/config.ini index 13e912b8fc3b..b4e8263f2309 100644 --- a/integration_tests/iceberg-sink2/docker/storage/config.ini +++ b/integration_tests/iceberg-sink2/docker/storage/config.ini @@ -14,6 +14,6 @@ s3.secret.key = hummockadmin s3.region = ap-southeast-1 catalog.type = storage catalog.name = demo -warehouse.path = s3://icebergdata/demo +warehouse.path = s3a://icebergdata/demo database.name=s1 table.name=t1 \ No newline at end of file diff --git a/integration_tests/iceberg-source/docker/hive/config.ini b/integration_tests/iceberg-source/docker/hive/config.ini index df07c7525825..80ce27ab14fd 100644 --- a/integration_tests/iceberg-source/docker/hive/config.ini +++ b/integration_tests/iceberg-source/docker/hive/config.ini @@ -8,7 +8,7 @@ port=4566 connector = iceberg catalog.type = hive catalog.uri = thrift://metastore:9083 -warehouse.path = s3://icebergdata/demo +warehouse.path = s3a://icebergdata/demo s3.endpoint=http://minio-0:9301 s3.access.key = hummockadmin s3.secret.key = hummockadmin diff --git a/integration_tests/iceberg-source/docker/storage/config.ini b/integration_tests/iceberg-source/docker/storage/config.ini index dd795fd3ef68..6439453dcaae 100644 --- a/integration_tests/iceberg-source/docker/storage/config.ini +++ b/integration_tests/iceberg-source/docker/storage/config.ini @@ -11,6 +11,6 @@ s3.access.key = hummockadmin s3.secret.key = hummockadmin s3.region = ap-southeast-1 catalog.type = storage -warehouse.path = s3://icebergdata/demo +warehouse.path = s3a://icebergdata/demo database.name=s1 table.name=t1 \ No newline at end of file diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 1ea9599dad69..7bf3835944b3 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -43,7 +43,7 @@ use crate::executor::{DataChunk, Executor}; /// database_name: Some("demo_db".into()), /// table_name: "demo_table".into(), /// catalog_type: Some("storage".into()), -/// path: "s3a://hummock001/".into(), +/// path: "s3://hummock001/".into(), /// endpoint: Some("http://127.0.0.1:9301".into()), /// access_key: "hummockadmin".into(), /// secret_key: "hummockadmin".into(), diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index f38f867e1925..92880341b588 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -16,7 +16,8 @@ use std::collections::HashMap; use anyhow::anyhow; use async_trait::async_trait; -use icelake::types::DataContentType; +use futures::StreamExt; +use iceberg::spec::{DataContentType, ManifestList}; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::types::JsonbVal; @@ -171,58 +172,64 @@ impl IcebergSplitEnumerator { if batch_parallelism == 0 { bail!("Batch parallelism is 0. Cannot split the iceberg files."); } - let table = self.config.load_table().await?; + let table = self.config.load_table_v2().await?; let snapshot_id = match time_traval_info { Some(IcebergTimeTravelInfo::Version(version)) => { - let Some(snapshot) = table.current_table_metadata().snapshot(version) else { + let Some(snapshot) = table.metadata().snapshot_by_id(version) else { bail!("Cannot find the snapshot id in the iceberg table."); }; - snapshot.snapshot_id + snapshot.snapshot_id() } Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => { - match &table.current_table_metadata().snapshots { - Some(snapshots) => { - let snapshot = snapshots - .iter() - .filter(|snapshot| snapshot.timestamp_ms <= timestamp) - .max_by_key(|snapshot| snapshot.timestamp_ms); - match snapshot { - Some(snapshot) => snapshot.snapshot_id, - None => { - // convert unix time to human readable time - let time = chrono::DateTime::from_timestamp_millis(timestamp); - if time.is_some() { - bail!("Cannot find a snapshot older than {}", time.unwrap()); - } else { - bail!("Cannot find a snapshot"); - } - } - } - } + let snapshot = table + .metadata() + .snapshots() + .filter(|snapshot| snapshot.timestamp().timestamp_millis() <= timestamp) + .max_by_key(|snapshot| snapshot.timestamp().timestamp_millis()); + match snapshot { + Some(snapshot) => snapshot.snapshot_id(), None => { - bail!("Cannot find the snapshots in the iceberg table."); + // convert unix time to human readable time + let time = chrono::DateTime::from_timestamp_millis(timestamp); + if time.is_some() { + bail!("Cannot find a snapshot older than {}", time.unwrap()); + } else { + bail!("Cannot find a snapshot"); + } } } } - None => match table.current_table_metadata().current_snapshot_id { - Some(snapshot_id) => snapshot_id, + None => match table.metadata().current_snapshot() { + Some(snapshot) => snapshot.snapshot_id(), None => bail!("Cannot find the current snapshot id in the iceberg table."), }, }; let mut files = vec![]; - for file in table - .data_files_of_snapshot( - table - .current_table_metadata() - .snapshot(snapshot_id) - .expect("snapshot must exists"), - ) - .await? - { - if file.content != DataContentType::Data { - bail!("Reading iceberg table with delete files is unsupported. Please try to compact the table first."); + + let snapshot = table + .metadata() + .snapshot_by_id(snapshot_id) + .expect("snapshot must exist"); + + let manifest_list: ManifestList = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .map_err(|e| anyhow!(e))?; + for entry in manifest_list.entries() { + let manifest = entry + .load_manifest(table.file_io()) + .await + .map_err(|e| anyhow!(e))?; + let mut manifest_entries_stream = + futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive())); + + while let Some(manifest_entry) = manifest_entries_stream.next().await { + let file = manifest_entry.data_file(); + if file.content_type() != DataContentType::Data { + bail!("Reading iceberg table with delete files is unsupported. Please try to compact the table first."); + } + files.push(file.file_path().to_string()); } - files.push(file.file_path); } let split_num = batch_parallelism; // evenly split the files into splits based on the parallelism. diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index a59ce2e55f67..418ab86e4307 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -38,6 +38,7 @@ fixedbitset = "0.5" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } iana-time-zone = "0.1" +iceberg = { workspace = true } icelake = { workspace = true } itertools = { workspace = true } jsonbb = { workspace = true } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_files.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_files.rs index 2bb764892089..aaeb8aaa064c 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_files.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_files.rs @@ -15,7 +15,9 @@ use std::ops::Deref; use anyhow::anyhow; -use icelake::Table; +use futures::StreamExt; +use iceberg::spec::ManifestList; +use iceberg::table::Table; use risingwave_common::types::Fields; use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::ConnectorProperties; @@ -46,7 +48,7 @@ struct RwIcebergFiles { /// Required when content is `EqualityDeletes` and should be null /// otherwise. Fields with ids listed in this column must be present /// in the delete file - pub equality_ids: Option>, + pub equality_ids: Vec, /// ID representing sort order for this file. /// /// If sort order ID is missing or unknown, then the order is assumed to @@ -81,26 +83,37 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result> { let config = ConnectorProperties::extract(source_props, false)?; if let ConnectorProperties::Iceberg(iceberg_properties) = config { let iceberg_config: IcebergConfig = iceberg_properties.to_iceberg_config(); - let table: Table = iceberg_config.load_table().await?; - result.extend( - table - .current_data_files() + let table: Table = iceberg_config.load_table_v2().await?; + if let Some(snapshot) = table.metadata().current_snapshot() { + let manifest_list: ManifestList = snapshot + .load_manifest_list(table.file_io(), table.metadata()) .await - .map_err(|e| anyhow!(e))? - .iter() - .map(|file| RwIcebergFiles { - source_id: source.id as i32, - schema_name: schema_name.clone(), - source_name: source.name.clone(), - content: file.content as i32, - file_path: file.file_path.clone(), - file_format: file.file_format.to_string(), - record_count: file.record_count, - file_size_in_bytes: file.file_size_in_bytes, - equality_ids: file.equality_ids.clone(), - sort_order_id: file.sort_order_id, - }), - ); + .map_err(|e| anyhow!(e))?; + for entry in manifest_list.entries() { + let manifest = entry + .load_manifest(table.file_io()) + .await + .map_err(|e| anyhow!(e))?; + let mut manifest_entries_stream = + futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive())); + + while let Some(manifest_entry) = manifest_entries_stream.next().await { + let file = manifest_entry.data_file(); + result.push(RwIcebergFiles { + source_id: source.id as i32, + schema_name: schema_name.clone(), + source_name: source.name.clone(), + content: file.content_type() as i32, + file_path: file.file_path().to_string(), + file_format: file.file_format().to_string(), + record_count: file.record_count() as i64, + file_size_in_bytes: file.file_size_in_bytes() as i64, + equality_ids: file.equality_ids().to_vec(), + sort_order_id: file.sort_order_id(), + }); + } + } + } } else { unreachable!() } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_snapshots.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_snapshots.rs index 6e32f48eec84..d7491cfeb0ca 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_snapshots.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_snapshots.rs @@ -14,7 +14,7 @@ use std::ops::Deref; -use icelake::Table; +use iceberg::table::Table; use jsonbb::{Value, ValueRef}; use risingwave_common::types::{Fields, JsonbVal, Timestamptz}; use risingwave_connector::sink::iceberg::IcebergConfig; @@ -61,27 +61,27 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result> let config = ConnectorProperties::extract(source_props, false)?; if let ConnectorProperties::Iceberg(iceberg_properties) = config { let iceberg_config: IcebergConfig = iceberg_properties.to_iceberg_config(); - let table: Table = iceberg_config.load_table().await?; - if let Some(snapshots) = &table.current_table_metadata().snapshots { - result.extend(snapshots.iter().map(|snapshot| { - RwIcebergSnapshots { - source_id: source.id as i32, - schema_name: schema_name.clone(), - source_name: source.name.clone(), - sequence_number: snapshot.sequence_number, - snapshot_id: snapshot.snapshot_id, - timestamp_ms: Timestamptz::from_millis(snapshot.timestamp_ms), - manifest_list: snapshot.manifest_list.clone(), - summary: Value::object( - snapshot - .summary - .iter() - .map(|(k, v)| (k.as_str(), ValueRef::String(v))), - ) - .into(), - } - })); - } + let table: Table = iceberg_config.load_table_v2().await?; + + result.extend(table.metadata().snapshots().map(|snapshot| { + RwIcebergSnapshots { + source_id: source.id as i32, + schema_name: schema_name.clone(), + source_name: source.name.clone(), + sequence_number: snapshot.sequence_number(), + snapshot_id: snapshot.snapshot_id(), + timestamp_ms: Timestamptz::from_millis(snapshot.timestamp().timestamp_millis()), + manifest_list: snapshot.manifest_list().to_string(), + summary: Value::object( + snapshot + .summary() + .other + .iter() + .map(|(k, v)| (k.as_str(), ValueRef::String(v))), + ) + .into(), + } + })); } } Ok(result)