Skip to content

Commit

Permalink
Merge pull request #799 from dyvenia/dev
Browse files Browse the repository at this point in the history
Release 0.4.21 PR
  • Loading branch information
m-paz committed Oct 26, 2023
2 parents a6ced4d + 6be1ad8 commit 584fa05
Show file tree
Hide file tree
Showing 44 changed files with 2,195 additions and 51 deletions.
12 changes: 10 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,20 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [0.4.21] - 2023-10-26
### Added
- Added `validate_df` task to task_utils.
- Added `SharepointList` source class.
- Added `SharepointListToDF` task class.
- Added `SharepointListToADLS` flow class.
- Added tests for `SharepointList`.
- Added `get_nested_dict` to untils.py.

### Fixed

### Changed

- Changed `GenesysToCSV` logic for end_point == "conversations". Added new fields to extraction.

## [0.4.20] - 2023-10-12
### Added
Expand Down Expand Up @@ -618,4 +626,4 @@ specified in the `SUPERMETRICS_DEFAULT_USER` secret
- Moved from poetry to pip

### Fixed
- Fix `AzureBlobStorage`'s `to_storage()` method is missing the final upload blob part
- Fix `AzureBlobStorage`'s `to_storage()` method is missing the final upload blob part
69 changes: 68 additions & 1 deletion tests/integration/flows/test_bigquery_to_adls.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import os

import pendulum
from prefect.tasks.secrets import PrefectSecret
import pytest
from unittest import mock
import pandas as pd

from prefect.tasks.secrets import PrefectSecret
from viadot.flows import BigQueryToADLS
from viadot.tasks import AzureDataLakeRemove

from viadot.exceptions import ValidationError

ADLS_DIR_PATH = "raw/tests/"
ADLS_FILE_NAME = str(pendulum.now("utc")) + ".parquet"
BIGQ_CREDENTIAL_KEY = "BIGQUERY-TESTS"
Expand Down Expand Up @@ -72,6 +77,68 @@ def test_bigquery_to_adls_false():
assert result.is_failed()
os.remove("test_bigquery_to_adls_overwrite_false.parquet")
os.remove("test_bigquery_to_adls_overwrite_false.json")


DATA = {
"type": ["banner", "banner"],
"country": ["PL", "DE"],
}


@mock.patch(
"viadot.tasks.BigQueryToDF.run",
return_value=pd.DataFrame(data=DATA),
)
@pytest.mark.run
def test_bigquery_to_adls_validate_df_fail(mocked_data):
flow_bigquery = BigQueryToADLS(
name="Test BigQuery to ADLS validate df fail",
dataset_name="official_empty",
table_name="space",
credentials_key=BIGQ_CREDENTIAL_KEY,
adls_file_name=ADLS_FILE_NAME,
overwrite_adls=True,
adls_dir_path=ADLS_DIR_PATH,
adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET,
validate_df_dict={"column_list_to_match": ["type", "country", "test"]},
)
try:
result = flow_bigquery.run()
except ValidationError:
pass

os.remove("test_bigquery_to_adls_validate_df_fail.parquet")
os.remove("test_bigquery_to_adls_validate_df_fail.json")


@mock.patch(
"viadot.tasks.BigQueryToDF.run",
return_value=pd.DataFrame(data=DATA),
)
@pytest.mark.run
def test_bigquery_to_adls_validate_df_success(mocked_data):
flow_bigquery = BigQueryToADLS(
name="Test BigQuery to ADLS validate df success",
dataset_name="official_empty",
table_name="space",
credentials_key=BIGQ_CREDENTIAL_KEY,
adls_file_name=ADLS_FILE_NAME,
overwrite_adls=True,
adls_dir_path=ADLS_DIR_PATH,
adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET,
validate_df_dict={"column_list_to_match": ["type", "country"]},
)
result = flow_bigquery.run()

result = flow_bigquery.run()
assert result.is_successful()

task_results = result.result.values()
assert all([task_result.is_successful() for task_result in task_results])

os.remove("test_bigquery_to_adls_validate_df_success.parquet")
os.remove("test_bigquery_to_adls_validate_df_success.json")

rm = AzureDataLakeRemove(
path=ADLS_DIR_PATH + ADLS_FILE_NAME, vault_name="azuwevelcrkeyv001s"
)
Expand Down
59 changes: 59 additions & 0 deletions tests/integration/flows/test_cloud_for_customers_report_to_adls.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from viadot.config import local_config
from viadot.flows import CloudForCustomersReportToADLS
from viadot.exceptions import ValidationError


def test_cloud_for_customers_report_to_adls():
Expand Down Expand Up @@ -27,3 +28,61 @@ def test_cloud_for_customers_report_to_adls():

task_results = result.result.values()
assert all([task_result.is_successful() for task_result in task_results])

assert len(flow.tasks) == 6


def test_cloud_for_customers_report_to_adls_validation_fail(caplog):
credentials = local_config.get("CLOUD_FOR_CUSTOMERS")
credentials_prod = credentials["Prod"]
channels = ["VEL_B_AFS", "VEL_B_ASA"]
month = ["01"]
year = ["2021"]
flow = CloudForCustomersReportToADLS(
report_url=credentials_prod["server"],
env="Prod",
channels=channels,
months=month,
years=year,
name="test_c4c_report_to_adls",
local_file_path=f"test_c4c_report_to_adls.csv",
adls_sp_credentials_secret=credentials["adls_sp_credentials_secret"],
adls_dir_path=credentials["adls_dir_path"],
validate_df_dict={"column_size": {"ChannelName ID": 10}},
)
try:
result = flow.run()
except ValidationError:
pass


def test_cloud_for_customers_report_to_adls_validation_success():
credentials = local_config.get("CLOUD_FOR_CUSTOMERS")
credentials_prod = credentials["Prod"]
channels = ["VEL_B_AFS", "VEL_B_ASA"]
month = ["01"]
year = ["2021"]
flow = CloudForCustomersReportToADLS(
report_url=credentials_prod["server"],
env="Prod",
channels=channels,
months=month,
years=year,
name="test_c4c_report_to_adls",
local_file_path=f"test_c4c_report_to_adls.csv",
adls_sp_credentials_secret=credentials["adls_sp_credentials_secret"],
adls_dir_path=credentials["adls_dir_path"],
validate_df_dict={"column_size": {"ChannelName ID": 13}},
)

try:
result = flow.run()
except ValidationError:
assert False, "Validation failed but was expected to pass"

assert result.is_successful()

task_results = result.result.values()
assert all([task_result.is_successful() for task_result in task_results])

assert len(flow.tasks) == 7
53 changes: 53 additions & 0 deletions tests/integration/flows/test_customer_gauge_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pytest

from viadot.flows import CustomerGaugeToADLS
from viadot.exceptions import ValidationError

DATA = {
"user_name": ["Jane", "Bob"],
Expand All @@ -15,6 +16,7 @@
"user_address_country_name": "United States",
"user_address_country_code": "US",
}

COLUMNS = ["user_name", "user_address_street"]
ADLS_FILE_NAME = "test_customer_gauge.parquet"
ADLS_DIR_PATH = "raw/tests/"
Expand All @@ -40,3 +42,54 @@ def test_customer_gauge_to_adls_run_flow(mocked_class):
assert result.is_successful()
os.remove("test_customer_gauge_to_adls_flow_run.parquet")
os.remove("test_customer_gauge_to_adls_flow_run.json")


@mock.patch(
"viadot.tasks.CustomerGaugeToDF.run",
return_value=pd.DataFrame(data=DATA),
)
@pytest.mark.run
def test_customer_gauge_to_adls_run_flow_validation_success(mocked_class):
flow = CustomerGaugeToADLS(
"test_customer_gauge_to_adls_run_flow_validation_success",
endpoint="responses",
total_load=False,
anonymize=True,
columns_to_anonymize=COLUMNS,
adls_dir_path=ADLS_DIR_PATH,
adls_file_name=ADLS_FILE_NAME,
overwrite_adls=True,
validate_df_dict={"column_size": {"user_address_state": 2}},
)
result = flow.run()
assert result.is_successful()
assert len(flow.tasks) == 11

os.remove("test_customer_gauge_to_adls_run_flow_validation_success.parquet")
os.remove("test_customer_gauge_to_adls_run_flow_validation_success.json")


@mock.patch(
"viadot.tasks.CustomerGaugeToDF.run",
return_value=pd.DataFrame(data=DATA),
)
@pytest.mark.run
def test_customer_gauge_to_adls_run_flow_validation_failure(mocked_class):
flow = CustomerGaugeToADLS(
"test_customer_gauge_to_adls_run_flow_validation_failure",
endpoint="responses",
total_load=False,
anonymize=True,
columns_to_anonymize=COLUMNS,
adls_dir_path=ADLS_DIR_PATH,
adls_file_name=ADLS_FILE_NAME,
overwrite_adls=True,
validate_df_dict={"column_size": {"user_name": 5}},
)
try:
flow.run()
except ValidationError:
pass

os.remove("test_customer_gauge_to_adls_run_flow_validation_failure.parquet")
os.remove("test_customer_gauge_to_adls_run_flow_validation_failure.json")
26 changes: 25 additions & 1 deletion tests/integration/flows/test_eurostat_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

from viadot.flows import EurostatToADLS

DATA = {"geo": ["PL", "DE", "NL"], "indicator": [35, 55, 77]}
DATA = {
"geo": ["PL", "DE", "NL"],
"indicator": [35, 55, 77],
"time": ["2023-01", "2023-51", "2023-07"],
}
ADLS_FILE_NAME = "test_eurostat.parquet"
ADLS_DIR_PATH = "raw/tests/"

Expand All @@ -28,3 +32,23 @@ def test_eurostat_to_adls_run_flow(mocked_class):
assert result.is_successful()
os.remove("test_eurostat_to_adls_flow_run.parquet")
os.remove("test_eurostat_to_adls_flow_run.json")


@mock.patch(
"viadot.tasks.EurostatToDF.run",
return_value=pd.DataFrame(data=DATA),
)
@pytest.mark.run
def test_validate_df(mocked_class):
flow = EurostatToADLS(
"test_validate_df",
dataset_code="ILC_DI04",
overwrite_adls=True,
validate_df_dict={"column_size": {"time": 7}},
adls_dir_path=ADLS_DIR_PATH,
adls_file_name=ADLS_FILE_NAME,
)
result = flow.run()
assert result.is_successful()
os.remove("test_validate_df.parquet")
os.remove("test_validate_df.json")
79 changes: 79 additions & 0 deletions tests/integration/flows/test_hubspot_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest

from viadot.flows import HubspotToADLS
from viadot.exceptions import ValidationError

DATA = {
"id": {"0": "820306930"},
Expand Down Expand Up @@ -60,3 +61,81 @@ def test_hubspot_to_adls_flow_run(mocked_class):
assert result.is_successful()
os.remove("test_hubspot_to_adls_flow_run.parquet")
os.remove("test_hubspot_to_adls_flow_run.json")


@mock.patch(
"viadot.tasks.HubspotToDF.run",
return_value=pd.DataFrame(data=DATA),
)
@pytest.mark.run
def test_hubspot_to_adls_flow_run_validate_fail(mocked_class):
flow = HubspotToADLS(
"test_hubspot_to_adls_flow_run",
hubspot_credentials_key="HUBSPOT",
endpoint="line_items",
filters=[
{
"filters": [
{
"propertyName": "createdate",
"operator": "BETWEEN",
"highValue": "2021-01-01",
"value": "2021-01-01",
},
{"propertyName": "quantity", "operator": "EQ", "value": "2"},
]
},
{
"filters": [
{"propertyName": "amount", "operator": "EQ", "value": "3744.000"}
]
},
],
overwrite_adls=True,
adls_dir_path=ADLS_DIR_PATH,
adls_file_name=ADLS_FILE_NAME,
validate_df_dict={"column_size": {"id": 0}},
)
try:
flow.run()
except ValidationError:
pass


@mock.patch(
"viadot.tasks.HubspotToDF.run",
return_value=pd.DataFrame(data=DATA),
)
@pytest.mark.run
def test_hubspot_to_adls_flow_run_validate_success(mocked_class):
flow = HubspotToADLS(
"test_hubspot_to_adls_flow_run",
hubspot_credentials_key="HUBSPOT",
endpoint="line_items",
filters=[
{
"filters": [
{
"propertyName": "createdate",
"operator": "BETWEEN",
"highValue": "2021-01-01",
"value": "2021-01-01",
},
{"propertyName": "quantity", "operator": "EQ", "value": "2"},
]
},
{
"filters": [
{"propertyName": "amount", "operator": "EQ", "value": "3744.000"}
]
},
],
overwrite_adls=True,
adls_dir_path=ADLS_DIR_PATH,
adls_file_name=ADLS_FILE_NAME,
validate_df_dict={"column_unique_values": ["id"]},
)
result = flow.run()
assert result.is_successful()
os.remove("test_hubspot_to_adls_flow_run.parquet")
os.remove("test_hubspot_to_adls_flow_run.json")
Loading

0 comments on commit 584fa05

Please sign in to comment.