Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tag filtering and speedup #88

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion discoverx/dx.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ def __init__(

def _can_read_columns_table(self) -> bool:
try:
self.spark.sql(f"SELECT * FROM {self.INFORMATION_SCHEMA}.columns WHERE table_catalog = 'system' LIMIT 1")
self.spark.sql(
f"SELECT * FROM {self.INFORMATION_SCHEMA}.columns WHERE table_catalog = 'system' AND table_schema = 'columns' LIMIT 1"
edurdevic marked this conversation as resolved.
Show resolved Hide resolved
)
return True
except Exception as e:
self.logger.error(f"Error while reading table {self.INFORMATION_SCHEMA}.columns: {e}")
Expand Down
24 changes: 21 additions & 3 deletions discoverx/explorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, from_tables, spark: SparkSession, info_fetcher: InfoFetcher)
self._sql_query_template = None
self._max_concurrency = 10
self._with_tags = False
self._having_tags = []

@staticmethod
def validate_from_components(from_tables: str):
Expand Down Expand Up @@ -70,6 +71,19 @@ def having_columns(self, *columns) -> "DataExplorer":
new_obj._having_columns.extend(columns)
return new_obj

def having_tag(self, tag_name: str, tag_value: str = None) -> "DataExplorer":
"""Will select tables tagged with the provided tag name and optionally value
either at table, schema, or catalog level.

Args:
tag_name (str): Tag name
tag_value (str, optional): Tag value. Defaults to None.
"""
new_obj = copy.deepcopy(self)
new_obj._having_tags.extend(TagInfo(tag_name, tag_value))
new_obj._with_tags = True
return new_obj

def with_concurrency(self, max_concurrency) -> "DataExplorer":
"""Sets the maximum number of concurrent queries to run"""
new_obj = copy.deepcopy(self)
Expand Down Expand Up @@ -140,7 +154,9 @@ def scan(
self._catalogs,
self._schemas,
self._tables,
self._info_fetcher.get_tables_info(self._catalogs, self._schemas, self._tables, self._having_columns),
self._info_fetcher.get_tables_info(
self._catalogs, self._schemas, self._tables, self._having_columns, self._having_tags
),
custom_rules=custom_rules,
locale=locale,
)
Expand All @@ -163,6 +179,7 @@ def map(self, f) -> list[any]:
self._tables,
self._having_columns,
self._with_tags,
self._having_tags,
)
with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_concurrency) as executor:
# Submit tasks to the thread pool
Expand Down Expand Up @@ -203,9 +220,9 @@ def _get_stack_string_columns_expression(table_info: TableInfo) -> str:
@staticmethod
def _build_sql(sql_template: str, table_info: TableInfo) -> str:
if table_info.catalog and table_info.catalog != "None":
full_table_name = f"{table_info.catalog}.{table_info.schema}.{table_info.table}"
full_table_name = f"`{table_info.catalog}`.`{table_info.schema}`.`{table_info.table}`"
else:
full_table_name = f"{table_info.schema}.{table_info.table}"
full_table_name = f"`{table_info.schema}`.`{table_info.table}`"

stack_string_columns = DataExplorerActions._get_stack_string_columns_expression(table_info)

Expand Down Expand Up @@ -244,6 +261,7 @@ def _get_sql_commands(self, data_explorer: DataExplorer) -> list[tuple[str, Tabl
data_explorer._tables,
data_explorer._having_columns,
data_explorer._with_tags,
data_explorer._having_tags,
)
sql_commands = [
(
Expand Down
26 changes: 24 additions & 2 deletions discoverx/table_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ def get_tables_info(
schemas: str,
tables: str,
columns: list[str] = [],
with_tags=False,
with_tags: bool = False,
having_tags: list[TagInfo] = [],
) -> list[TableInfo]:
# Filter tables by matching filter
table_list_sql = self._get_table_list_sql(catalogs, schemas, tables, columns, with_tags)
Expand All @@ -120,7 +121,28 @@ def get_tables_info(
if len(filtered_tables) == 0:
raise ValueError(f"No tables found matching filter: {catalogs}.{schemas}.{tables}")

return self._to_info_list(filtered_tables)
info_list = self._to_info_list(filtered_tables)
return [info for info in info_list if InfoFetcher._contains_all_tags(info.tags, having_tags)]

@staticmethod
def _contains_all_tags(tags_info: TagsInfo, tags: list[TagInfo]) -> bool:
if not tags:
return True
if not tags_info:
return False

all_tags = []

if tags_info.catalog_tags:
all_tags.extend(tags_info.catalog_tags)

if tags_info.schema_tags:
all_tags.extend(tags_info.schema_tags)

if tags_info.table_tags:
all_tags.extend(tags_info.table_tags)

return all([tag in all_tags for tag in tags])

def _get_table_list_sql(
self,
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/explorer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_validate_from_components():

def test_build_sql(sample_table_info):
sql_template = "SELECT * FROM {full_table_name}"
expected_sql = "SELECT * FROM catalog1.schema1.table1"
expected_sql = "SELECT * FROM `catalog1`.`schema1`.`table1`"
assert DataExplorerActions._build_sql(sql_template, sample_table_info) == expected_sql


Expand Down
23 changes: 23 additions & 0 deletions tests/unit/table_info_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pytest
from discoverx.explorer import InfoFetcher, TagsInfo, TagInfo


def test_validate_from_components():
info_table = TagsInfo([], [TagInfo("a", "v1")], [], [])
info_schema = TagsInfo([], [], [TagInfo("a", "v1")], [])
info_catalog = TagsInfo([], [], [], [TagInfo("a", "v1")])
info_no_tags = TagsInfo([], [], [], [])

assert InfoFetcher._contains_all_tags(info_table, [TagInfo("a", "v1")])
assert not InfoFetcher._contains_all_tags(info_table, [TagInfo("a", "v2")])
assert not InfoFetcher._contains_all_tags(info_table, [TagInfo("b", "v1")])
assert not InfoFetcher._contains_all_tags(info_table, [TagInfo("a", None)])
# If no tags to check, then it should be true
assert InfoFetcher._contains_all_tags(info_table, [])

assert InfoFetcher._contains_all_tags(info_schema, [TagInfo("a", "v1")])

assert InfoFetcher._contains_all_tags(info_catalog, [TagInfo("a", "v1")])

assert InfoFetcher._contains_all_tags(info_no_tags, [])
assert not InfoFetcher._contains_all_tags(info_no_tags, [TagInfo("a", "v1")])
Loading