From 38bf91acd5e71f20c7d0466bf2a7e9c4224af01e Mon Sep 17 00:00:00 2001 From: VersusFacit Date: Mon, 30 Sep 2024 11:19:35 -0700 Subject: [PATCH 1/3] Add iceberg ddl generation. --- dbt/include/snowflake/macros/adapters.sql | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index aa8895819..b60cea0b0 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -195,7 +195,7 @@ {% macro snowflake__alter_column_type(relation, column_name, new_column_type) -%} {% call statement('alter_column_type') %} - alter table {{ relation.render() }} alter {{ adapter.quote(column_name) }} set data type {{ new_column_type }}; + alter {{ relation.get_ddl_prefix_for_alter() }} table {{ relation.render() }} alter {{ adapter.quote(column_name) }} set data type {{ new_column_type }}; {% endcall %} {% endmacro %} @@ -216,7 +216,7 @@ {% else -%} {% set relation_type = relation.type %} {% endif %} - alter {{ relation_type }} {{ relation.render() }} alter + alter {{ relation.get_ddl_prefix_for_alter() }} {{ relation_type }} {{ relation.render() }} alter {% for column_name in existing_columns if (column_name in existing_columns) or (column_name|lower in existing_columns) %} {{ get_column_comment_sql(column_name, column_dict) }} {{- ',' if not loop.last else ';' }} {% endfor %} @@ -275,7 +275,7 @@ {% if add_columns %} {% set sql -%} - alter {{ relation_type }} {{ relation.render() }} add column + alter {{ relation.get_ddl_prefix_for_alter() }} {{ relation_type }} {{ relation.render() }} add column {% for column in add_columns %} {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }} {% endfor %} @@ -288,7 +288,7 @@ {% if remove_columns %} {% set sql -%} - alter {{ relation_type }} {{ relation.render() }} drop column + alter {{ relation.get_ddl_prefix_for_alter() }} {{ relation_type }} {{ relation.render() }} drop column {% for column in remove_columns %} {{ column.name }}{{ ',' if not loop.last }} {% endfor %} From 695ea07c351e02a9b0ce36d77ef271d364989409 Mon Sep 17 00:00:00 2001 From: VersusFacit Date: Mon, 30 Sep 2024 11:20:53 -0700 Subject: [PATCH 2/3] Add changelog. --- .changes/unreleased/Features-20240930-112041.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20240930-112041.yaml diff --git a/.changes/unreleased/Features-20240930-112041.yaml b/.changes/unreleased/Features-20240930-112041.yaml new file mode 100644 index 000000000..1395a8bf7 --- /dev/null +++ b/.changes/unreleased/Features-20240930-112041.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add support for all on_schema_change incremental model strategies. +time: 2024-09-30T11:20:41.99589-07:00 +custom: + Author: versusfacit + Issue: "321" From a3da64792a90ebcb553f88c154b0e03e62cb703e Mon Sep 17 00:00:00 2001 From: VersusFacit Date: Mon, 30 Sep 2024 12:05:31 -0700 Subject: [PATCH 3/3] add test. --- .../iceberg/test_incremental_models.py | 46 ++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/tests/functional/iceberg/test_incremental_models.py b/tests/functional/iceberg/test_incremental_models.py index f8f1d6b89..35ccdcd89 100644 --- a/tests/functional/iceberg/test_incremental_models.py +++ b/tests/functional/iceberg/test_incremental_models.py @@ -3,7 +3,7 @@ from pathlib import Path -from dbt.tests.util import run_dbt, run_dbt_and_capture +from dbt.tests.util import run_dbt, run_dbt_and_capture, write_file _SEED_INCREMENTAL_STRATEGIES = """ @@ -32,6 +32,7 @@ incremental_strategy='{strategy}', unique_key="world_id", external_volume = "s3_iceberg_snow", + on_schema_change = "sync_all_columns" ) }}}} select * from {{{{ ref('upstream_table') }}}} @@ -123,3 +124,46 @@ def test_incremental_strategies_with_update(self, project, setup_class): self.__check_correct_operations(self.append, rows_affected=2) self.__check_correct_operations("merge", rows_affected=1) self.__check_correct_operations("delete_insert", rows_affected=1) + + +class TestIcebergIncrementalOnSchemaChangeMutatesRelations: + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"enable_iceberg_materializations": True}} + + @pytest.fixture(scope="class") + def seeds(self): + return { + "seed.csv": _SEED_INCREMENTAL_STRATEGIES, + } + + @pytest.fixture(scope="function", autouse=True) + def setup_class(self, project): + run_dbt(["seed"]) + run_dbt(["run"]) + yield + + @pytest.fixture(scope="class") + def models(self): + return { + "upstream_table.sql": _MODEL_BASIC_TABLE_MODEL, + "merge.sql": _MODEL_INCREMENTAL_ICEBERG_MERGE, + } + + def test_sync_and_append_semantics(self, project, setup_class): + model_file = project.project_root / Path("models") / Path("merge.sql") + sql = f"show columns in {project.database}.{project.test_schema}.merge;" + column_names = [column[2] for column in project.run_sql(sql, fetch="all")] + assert len(column_names) == 3 + + write_file(_MODEL_INCREMENTAL_ICEBERG_MERGE.replace("*", "*, 1 as new_column"), model_file) + run_dbt() + column_names = [column[2].lower() for column in project.run_sql(sql, fetch="all")] + assert len(column_names) == 4 + assert "new_column" in column_names + + write_file(_MODEL_INCREMENTAL_ICEBERG_MERGE, model_file) + run_dbt() + column_names = [column[2].lower() for column in project.run_sql(sql, fetch="all")] + assert len(column_names) == 3 + assert "new_column" not in column_names