Skip to content

Commit

Permalink
Merge branch 'main' into leahwicz/splitEnv
Browse files Browse the repository at this point in the history
  • Loading branch information
mikealfare authored Oct 11, 2024
2 parents 5b13f43 + 257e390 commit d692b17
Show file tree
Hide file tree
Showing 25 changed files with 396 additions and 214 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240903-161003.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Allow configuring snapshot column names
time: 2024-09-03T16:10:03.021221-04:00
custom:
Author: gshank
Issue: "1096"
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240910-175846.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support custom ODBC connection parameters via `connection_string_suffix` config
time: 2024-09-10T17:58:46.141332-04:00
custom:
Author: colin-rogers-dbt jpoley nilan3
Issue: "1092"
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240925-125242.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add Microbatch Strategy to dbt-spark
time: 2024-09-25T12:52:42.872017+01:00
custom:
Author: michelleark
Issue: "1109"
1 change: 1 addition & 0 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ jobs:
test:
- "apache_spark"
- "spark_session"
- "spark_http_odbc"
- "databricks_sql_endpoint"
- "databricks_cluster"
- "databricks_http_cluster"
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/release-internal.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ jobs:
test:
- "apache_spark"
- "spark_session"
- "spark_http_odbc"
- "databricks_sql_endpoint"
- "databricks_cluster"
- "databricks_http_cluster"
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/release-prep.yml
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ jobs:
test:
- "apache_spark"
- "spark_session"
- "spark_http_odbc"
- "databricks_sql_endpoint"
- "databricks_cluster"
- "databricks_http_cluster"
Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ python dagger/run_dbt_spark_tests.py --profile databricks_sql_endpoint --test-pa
_options_:
- "apache_spark"
- "spark_session"
- "spark_http_odbc"
- "databricks_sql_endpoint"
- "databricks_cluster"
- "databricks_http_cluster"
Expand Down
2 changes: 1 addition & 1 deletion dagger/run_dbt_spark_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async def test_spark(test_args):
spark_ctr, spark_host = get_spark_container(client)
tst_container = tst_container.with_service_binding(alias=spark_host, service=spark_ctr)

elif test_profile in ["databricks_cluster", "databricks_sql_endpoint"]:
elif test_profile in ["databricks_cluster", "databricks_sql_endpoint", "spark_http_odbc"]:
tst_container = (
tst_container.with_workdir("/")
.with_exec(["./scripts/configure_odbc.sh"])
Expand Down
54 changes: 34 additions & 20 deletions dbt/adapters/spark/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class SparkCredentials(Credentials):
auth: Optional[str] = None
kerberos_service_name: Optional[str] = None
organization: str = "0"
connection_string_suffix: Optional[str] = None
connect_retries: int = 0
connect_timeout: int = 10
use_ssl: bool = False
Expand Down Expand Up @@ -483,38 +484,51 @@ def open(cls, connection: Connection) -> Connection:
http_path = cls.SPARK_SQL_ENDPOINT_HTTP_PATH.format(
endpoint=creds.endpoint
)
elif creds.connection_string_suffix is not None:
required_fields = ["driver", "host", "port", "connection_string_suffix"]
else:
raise DbtConfigError(
"Either `cluster` or `endpoint` must set when"
"Either `cluster`, `endpoint`, `connection_string_suffix` must set when"
" using the odbc method to connect to Spark"
)

cls.validate_creds(creds, required_fields)

dbt_spark_version = __version__.version
user_agent_entry = (
f"dbt-labs-dbt-spark/{dbt_spark_version} (Databricks)" # noqa
)

# http://simba.wpengine.com/products/Spark/doc/ODBC_InstallGuide/unix/content/odbc/hi/configuring/serverside.htm
ssp = {f"SSP_{k}": f"{{{v}}}" for k, v in creds.server_side_parameters.items()}

# https://www.simba.com/products/Spark/doc/v2/ODBC_InstallGuide/unix/content/odbc/options/driver.htm
connection_str = _build_odbc_connnection_string(
DRIVER=creds.driver,
HOST=creds.host,
PORT=creds.port,
UID="token",
PWD=creds.token,
HTTPPath=http_path,
AuthMech=3,
SparkServerType=3,
ThriftTransport=2,
SSL=1,
UserAgentEntry=user_agent_entry,
LCaseSspKeyName=0 if ssp else 1,
**ssp,
)
if creds.token is not None:
# https://www.simba.com/products/Spark/doc/v2/ODBC_InstallGuide/unix/content/odbc/options/driver.htm
connection_str = _build_odbc_connnection_string(
DRIVER=creds.driver,
HOST=creds.host,
PORT=creds.port,
UID="token",
PWD=creds.token,
HTTPPath=http_path,
AuthMech=3,
SparkServerType=3,
ThriftTransport=2,
SSL=1,
UserAgentEntry=user_agent_entry,
LCaseSspKeyName=0 if ssp else 1,
**ssp,
)
else:
connection_str = _build_odbc_connnection_string(
DRIVER=creds.driver,
HOST=creds.host,
PORT=creds.port,
ThriftTransport=2,
SSL=1,
UserAgentEntry=user_agent_entry,
LCaseSspKeyName=0 if ssp else 1,
**ssp,
)
if creds.connection_string_suffix is not None:
connection_str = connection_str + ";" + creds.connection_string_suffix

conn = pyodbc.connect(connection_str, autocommit=True)
handle = PyodbcConnectionWrapper(conn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
{%- endif -%}

{#-- Set Overwrite Mode --#}
{%- if strategy == 'insert_overwrite' and partition_by -%}
{%- if strategy in ['insert_overwrite', 'microbatch'] and partition_by -%}
{%- call statement() -%}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
{%- endcall -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@
{%- elif strategy == 'insert_overwrite' -%}
{#-- insert statements don't like CTEs, so support them via a temp view #}
{{ get_insert_overwrite_sql(source, target, existing) }}
{%- elif strategy == 'microbatch' -%}
{#-- microbatch wraps insert_overwrite, and requires a partition_by config #}
{% set missing_partition_key_microbatch_msg -%}
dbt-spark 'microbatch' incremental strategy requires a `partition_by` config.
Ensure you are using a `partition_by` column that is of grain {{ config.get('batch_size') }}.
{%- endset %}

{%- if not config.get('partition_by') -%}
{{ exceptions.raise_compiler_error(missing_partition_key_microbatch_msg) }}
{%- endif -%}
{{ get_insert_overwrite_sql(source, target, existing) }}
{%- elif strategy == 'merge' -%}
{#-- merge all columns for datasources which implement MERGE INTO (e.g. databricks, iceberg) - schema changes are handled for us #}
{{ get_merge_sql(target, source, unique_key, dest_columns=none, incremental_predicates=incremental_predicates) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
Expected one of: 'append', 'merge', 'insert_overwrite'
Expected one of: 'append', 'merge', 'insert_overwrite', 'microbatch'
{%- endset %}

{% set invalid_merge_msg -%}
Expand All @@ -35,13 +35,13 @@
Use the 'append' or 'merge' strategy instead
{%- endset %}

{% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %}
{% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'microbatch'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and target.endpoint %}
{% if raw_strategy in ['insert_overwrite', 'microbatch'] and target.endpoint %}
{% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %}
{% endif %}
{% endif %}
Expand Down
16 changes: 9 additions & 7 deletions dbt/include/spark/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@


{% macro spark__snapshot_merge_sql(target, source, insert_cols) -%}
{%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%}

merge into {{ target }} as DBT_INTERNAL_DEST
{% if target.is_iceberg %}
Expand All @@ -21,12 +22,12 @@
{% else %}
using {{ source }} as DBT_INTERNAL_SOURCE
{% endif %}
on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id
on DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }}
when matched
and DBT_INTERNAL_DEST.dbt_valid_to is null
and DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null
and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete')
then update
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to
set {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }}

when not matched
and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
Expand Down Expand Up @@ -81,13 +82,12 @@


{% materialization snapshot, adapter='spark' %}
{%- set config = model['config'] -%}

{%- set target_table = model.get('alias', model.get('name')) -%}

{%- set strategy_name = config.get('strategy') -%}
{%- set unique_key = config.get('unique_key') %}
{%- set file_format = config.get('file_format', 'parquet') -%}
{%- set file_format = config.get('file_format') or 'parquet' -%}
{%- set grant_config = config.get('grants') -%}

{% set target_relation_exists, target_relation = get_or_create_relation(
Expand Down Expand Up @@ -126,7 +126,7 @@
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", model['config'], target_relation_exists) %}

{% if not target_relation_exists %}

Expand All @@ -135,7 +135,9 @@

{% else %}

{{ adapter.valid_snapshot_target(target_relation) }}
{% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %}

{{ adapter.valid_snapshot_target(target_relation, columns) }}

{% set staging_table = spark_build_snapshot_staging_table(strategy, sql, target_relation) %}

Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def _get_plugin_version_dict():
include_package_data=True,
install_requires=[
"sqlparams>=3.0.0",
"dbt-common>=1.0.4,<2.0",
"dbt-adapters>=1.1.1,<2.0",
"dbt-common>=1.10,<2.0",
"dbt-adapters>=1.7,<2.0",
# add dbt-core to ensure backwards compatibility of installation, this is not a functional dependency
"dbt-core>=1.8.0",
],
Expand Down
16 changes: 16 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def dbt_profile_target(request):
target = databricks_http_cluster_target()
elif profile_type == "spark_session":
target = spark_session_target()
elif profile_type == "spark_http_odbc":
target = spark_http_odbc_target()
else:
raise ValueError(f"Invalid profile type '{profile_type}'")
return target
Expand Down Expand Up @@ -102,6 +104,20 @@ def spark_session_target():
}


def spark_http_odbc_target():
return {
"type": "spark",
"method": "odbc",
"host": os.getenv("DBT_DATABRICKS_HOST_NAME"),
"port": 443,
"driver": os.getenv("ODBC_DRIVER"),
"connection_string_suffix": f'UID=token;PWD={os.getenv("DBT_DATABRICKS_TOKEN")};HTTPPath=/sql/1.0/endpoints/{os.getenv("DBT_DATABRICKS_ENDPOINT")};AuthMech=3;SparkServerType=3',
"connect_retries": 3,
"connect_timeout": 5,
"retry_all": True,
}


@pytest.fixture(autouse=True)
def skip_by_profile_type(request):
profile_type = request.config.getoption("--profile")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_run_incremental_fail_on_schema_change(self, project):
assert "Compilation Error" in results_two[1].message


@pytest.mark.skip_profile("databricks_sql_endpoint")
@pytest.mark.skip_profile("databricks_sql_endpoint", "spark_http_odbc")
class TestAppendOnSchemaChange(IncrementalOnSchemaChangeIgnoreFail):
@pytest.fixture(scope="class")
def project_config_update(self):
Expand All @@ -32,7 +32,7 @@ def project_config_update(self):
}


@pytest.mark.skip_profile("databricks_sql_endpoint", "spark_session")
@pytest.mark.skip_profile("databricks_sql_endpoint", "spark_session", "spark_http_odbc")
class TestInsertOverwriteOnSchemaChange(IncrementalOnSchemaChangeIgnoreFail):
@pytest.fixture(scope="class")
def project_config_update(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def run_and_test(self, project):
check_relations_equal(project.adapter, ["default_append", "expected_append"])

@pytest.mark.skip_profile(
"databricks_http_cluster", "databricks_sql_endpoint", "spark_session"
"databricks_http_cluster", "databricks_sql_endpoint", "spark_session", "spark_http_odbc"
)
def test_default_append(self, project):
self.run_and_test(project)
Expand All @@ -77,7 +77,7 @@ def run_and_test(self, project):
check_relations_equal(project.adapter, ["insert_overwrite_partitions", "expected_upsert"])

@pytest.mark.skip_profile(
"databricks_http_cluster", "databricks_sql_endpoint", "spark_session"
"databricks_http_cluster", "databricks_sql_endpoint", "spark_session", "spark_http_odbc"
)
def test_insert_overwrite(self, project):
self.run_and_test(project)
Expand All @@ -103,7 +103,11 @@ def run_and_test(self, project):
check_relations_equal(project.adapter, ["merge_update_columns", "expected_partial_upsert"])

@pytest.mark.skip_profile(
"apache_spark", "databricks_http_cluster", "databricks_sql_endpoint", "spark_session"
"apache_spark",
"databricks_http_cluster",
"databricks_sql_endpoint",
"spark_session",
"spark_http_odbc",
)
def test_delta_strategies(self, project):
self.run_and_test(project)
Expand Down
21 changes: 21 additions & 0 deletions tests/functional/adapter/incremental_strategies/test_microbatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import pytest

from dbt.tests.adapter.incremental.test_incremental_microbatch import (
BaseMicrobatch,
)

# No requirement for a unique_id for spark microbatch!
_microbatch_model_no_unique_id_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), partition_by=['date_day'], file_format='parquet') }}
select *, cast(event_time as date) as date_day
from {{ ref('input_model') }}
"""


@pytest.mark.skip_profile(
"databricks_http_cluster", "databricks_sql_endpoint", "spark_session", "spark_http_odbc"
)
class TestMicrobatch(BaseMicrobatch):
@pytest.fixture(scope="class")
def microbatch_model_sql(self) -> str:
return _microbatch_model_no_unique_id_sql
Loading

0 comments on commit d692b17

Please sign in to comment.