Skip to content

Commit

Permalink
feat(ingest/looker): browse path followups (datahub-project#10217)
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Apr 12, 2024
1 parent 223b72f commit 8b79461
Show file tree
Hide file tree
Showing 32 changed files with 7,331 additions and 2,038 deletions.
7 changes: 6 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,12 @@ def close(self) -> None:

def _get_browse_path_processor(self, dry_run: bool) -> MetadataWorkUnitProcessor:
config = self.get_config()
platform = getattr(self, "platform", None) or getattr(config, "platform", None)

platform = (
getattr(config, "platform_name", None)
or getattr(self, "platform", None)
or getattr(config, "platform", None)
)
env = getattr(config, "env", None)
browse_path_drop_dirs = [
platform,
Expand Down
65 changes: 50 additions & 15 deletions metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def auto_browse_path_v2(
platform: Optional[str] = None,
platform_instance: Optional[str] = None,
) -> Iterable[MetadataWorkUnit]:
"""Generate BrowsePathsV2 from Container and BrowsePaths aspects.
"""Generate BrowsePathsV2 from Container and BrowsePaths and BrowsePathsV2 aspects.
Generates browse paths v2 on demand, rather than waiting for end of ingestion,
for better UI experience while ingestion is running.
Expand All @@ -251,37 +251,60 @@ def auto_browse_path_v2(
Calculates the correct BrowsePathsV2 at end of workunit stream,
and emits "corrections", i.e. a final BrowsePathsV2 for any urns that have changed.
Source-generated original BrowsePathsV2 are assumed to be correct and are preferred
over other aspects when generating BrowsePathsV2 of an entity or its children.
This helper also prepends platform instance BrowsePathEntry to BrowsePathsV2 so the
source need not include it in its browse paths v2.
"""

# For telemetry, to see if our sources violate assumptions
num_out_of_order = 0
num_out_of_batch = 0

# Set for all containers and urns with a Container aspect
# Used to construct container paths while iterating through stream
# Assumes topological order of entities in stream
# Used to construct browse path v2 while iterating through stream
# Assumes topological order of entities in stream, i.e. parent's
# browse path/container is seen before child's browse path/container.
paths: Dict[str, List[BrowsePathEntryClass]] = {}

emitted_urns: Set[str] = set()
containers_used_as_parent: Set[str] = set()
for urn, batch in _batch_workunits_by_urn(stream):
container_path: Optional[List[BrowsePathEntryClass]] = None
legacy_path: Optional[List[BrowsePathEntryClass]] = None
has_browse_path_v2 = False
browse_path_v2: Optional[List[BrowsePathEntryClass]] = None

for wu in batch:
yield wu
if not wu.is_primary_source:
yield wu
continue

browse_path_v2_aspect = wu.get_aspect_of_type(BrowsePathsV2Class)
if browse_path_v2_aspect is None:
yield wu
else:
# This is browse path v2 aspect. We will process
# and emit it later with platform instance, as required.
browse_path_v2 = browse_path_v2_aspect.path
if guess_entity_type(urn) == "container":
paths[urn] = browse_path_v2

container_aspect = wu.get_aspect_of_type(ContainerClass)
if container_aspect:
parent_urn = container_aspect.container
containers_used_as_parent.add(parent_urn)
paths[urn] = [
*paths.setdefault(parent_urn, []), # Guess parent has no parents
BrowsePathEntryClass(id=parent_urn, urn=parent_urn),
]
# If a container has both parent container and browsePathsV2
# emitted from source, prefer browsePathsV2, so using setdefault.
paths.setdefault(
urn,
[
*paths.setdefault(
parent_urn, []
), # Guess parent has no parents
BrowsePathEntryClass(id=parent_urn, urn=parent_urn),
],
)
container_path = paths[urn]

if urn in containers_used_as_parent:
Expand All @@ -297,16 +320,28 @@ def auto_browse_path_v2(
if p.strip() and p.strip() not in drop_dirs
]

if wu.get_aspect_of_type(BrowsePathsV2Class):
has_browse_path_v2 = True

path = container_path or legacy_path
if (path is not None or has_browse_path_v2) and urn in emitted_urns:
# Order of preference: browse path v2, container path, legacy browse path
path = browse_path_v2 or container_path or legacy_path
if path is not None and urn in emitted_urns:
# Batch invariant violated
# TODO: Add sentry alert
num_out_of_batch += 1
elif has_browse_path_v2:
elif browse_path_v2 is not None:
emitted_urns.add(urn)
if not dry_run:
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=BrowsePathsV2Class(
path=_prepend_platform_instance(
browse_path_v2, platform, platform_instance
)
),
).as_workunit()
else:
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=BrowsePathsV2Class(path=browse_path_v2),
).as_workunit()
elif path is not None:
emitted_urns.add(urn)
if not dry_run:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class DatasetContainerSubTypes(str, Enum):

class BIContainerSubTypes(str, Enum):
LOOKER_FOLDER = "Folder"
LOOKML_PROJECT = "LookML Project"
LOOKML_MODEL = "LookML Model"
TABLEAU_WORKBOOK = "Workbook"
POWERBI_WORKSPACE = "Workspace"
POWERBI_DATASET = "PowerBI Dataset"
Expand Down
Loading

0 comments on commit 8b79461

Please sign in to comment.