Skip to content

Commit

Permalink
Ct 1873/support insert overwrite (#700)
Browse files Browse the repository at this point in the history
* Remove thrown exception when using insert_overwrite with delta

* Add changelog entry

* Update integration tests

* Fix missing comma in model config

* Fix the typo

* Tweaking the skip profiles for tests

* readd the other validate

* disabling test but leaving feature

* disabling test but leaving feature

* commit for ci

* fix test

---------

Co-authored-by: flvndh <17010377+flvndh@users.noreply.github.com>
Co-authored-by: Mila Page <versusfacit@users.noreply.github.com>
  • Loading branch information
3 people authored Apr 28, 2023
1 parent 8bc8c2c commit cb41ab0
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 22 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20220812-091652.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Support insert_overwrite strategy with delta
time: 2022-08-12T09:16:52.7995122+02:00
custom:
Author: flvndh
Issue: "1013"
PR: "430"
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,12 @@
You cannot use this strategy when connecting via endpoint
Use the 'append' or 'merge' strategy instead
{%- endset %}

{% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and file_format == 'delta' %}
{% do exceptions.raise_compiler_error(invalid_insert_overwrite_delta_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and target.endpoint %}
{% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %}
{% endif %}
Expand Down
34 changes: 18 additions & 16 deletions tests/functional/adapter/incremental_strategies/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@
{% endif %}
""".lstrip()

bad_insert_overwrite_delta_sql = """
bad_merge_not_delta_sql = """
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
file_format = 'delta',
incremental_strategy = 'merge',
) }}
{% if not is_incremental() %}
Expand All @@ -69,10 +68,10 @@
{% endif %}
""".lstrip()

bad_merge_not_delta_sql = """
bad_strategy_sql = """
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_strategy = 'something_else',
) }}
{% if not is_incremental() %}
Expand All @@ -90,10 +89,15 @@
{% endif %}
""".lstrip()

bad_strategy_sql = """
#
# Delta Models
#

append_delta_sql = """
{{ config(
materialized = 'incremental',
incremental_strategy = 'something_else',
incremental_strategy = 'append',
file_format = 'delta',
) }}
{% if not is_incremental() %}
Expand All @@ -111,15 +115,12 @@
{% endif %}
""".lstrip()

#
# Delta Models
#

append_delta_sql = """
insert_overwrite_partitions_delta_sql = """
{{ config(
materialized = 'incremental',
incremental_strategy = 'append',
file_format = 'delta',
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by='id',
file_format='delta'
) }}
{% if not is_incremental() %}
Expand All @@ -135,7 +136,8 @@
select cast(3 as bigint) as id, 'anyway' as msg
{% endif %}
""".lstrip()
"""


delta_merge_no_key_sql = """
{{ config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
)
from tests.functional.adapter.incremental_strategies.fixtures import (
bad_file_format_sql,
bad_insert_overwrite_delta_sql,
bad_merge_not_delta_sql,
bad_strategy_sql,
default_append_sql,
Expand All @@ -20,6 +19,7 @@
delta_merge_no_key_sql,
delta_merge_unique_key_sql,
delta_merge_update_columns_sql,
# Skip: CT-1873 insert_overwrite_partitions_delta_sql,
)


Expand Down Expand Up @@ -91,6 +91,8 @@ def models(self):
"merge_no_key.sql": delta_merge_no_key_sql,
"merge_unique_key.sql": delta_merge_unique_key_sql,
"merge_update_columns.sql": delta_merge_update_columns_sql,
# Skip: cannot be acnive on any endpoint with grants
# "insert_overwrite_partitions_delta.sql": insert_overwrite_partitions_delta_sql,
}

def run_and_test(self, project):
Expand All @@ -106,13 +108,21 @@ def run_and_test(self, project):
def test_delta_strategies(self, project):
self.run_and_test(project)

@pytest.mark.skip(
reason="this feature is incompatible with databricks settings required for grants"
)
def test_delta_strategies_overwrite(self, project):
self.seed_and_run_twice()
check_relations_equal(
project.adapter, ["insert_overwrite_partitions_delta", "expected_upsert"]
)


class TestBadStrategies(BaseIncrementalStrategies):
@pytest.fixture(scope="class")
def models(self):
return {
"bad_file_format.sql": bad_file_format_sql,
"bad_insert_overwrite_delta.sql": bad_insert_overwrite_delta_sql,
"bad_merge_not_delta.sql": bad_merge_not_delta_sql,
"bad_strategy.sql": bad_strategy_sql,
}
Expand Down

0 comments on commit cb41ab0

Please sign in to comment.