diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/search/SearchAcrossLineageResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/search/SearchAcrossLineageResolver.java index 1a8b7734c093e..4a0eacaf09671 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/search/SearchAcrossLineageResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/search/SearchAcrossLineageResolver.java @@ -6,6 +6,7 @@ import com.google.common.collect.ImmutableSet; import com.linkedin.common.urn.Urn; +import com.linkedin.datahub.graphql.generated.AndFilterInput; import com.linkedin.datahub.graphql.generated.EntityType; import com.linkedin.datahub.graphql.generated.FacetFilterInput; import com.linkedin.datahub.graphql.generated.LineageDirection; @@ -92,9 +93,14 @@ public CompletableFuture get(DataFetchingEnvironment final int start = input.getStart() != null ? input.getStart() : DEFAULT_START; final int count = input.getCount() != null ? input.getCount() : DEFAULT_COUNT; - final List filters = - input.getFilters() != null ? input.getFilters() : new ArrayList<>(); - final Integer maxHops = getMaxHops(filters); + final List filters = + input.getOrFilters() != null ? input.getOrFilters() : new ArrayList<>(); + final List facetFilters = + filters.stream() + .map(AndFilterInput::getAnd) + .flatMap(List::stream) + .collect(Collectors.toList()); + final Integer maxHops = getMaxHops(facetFilters); @Nullable final Long startTimeMillis = @@ -117,7 +123,8 @@ public CompletableFuture get(DataFetchingEnvironment start, count); - final Filter filter = ResolverUtils.buildFilter(filters, input.getOrFilters()); + final Filter filter = + ResolverUtils.buildFilter(input.getFilters(), input.getOrFilters()); SearchFlags searchFlags = null; com.linkedin.datahub.graphql.generated.SearchFlags inputFlags = input.getSearchFlags(); if (inputFlags != null) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java index 270615aa0e356..5fc3dfc779fa4 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java @@ -302,6 +302,7 @@ public LineageResponse getLineage( exploreMultiplePaths); for (LineageRelationship oneHopRelnship : oneHopRelationships) { if (result.containsKey(oneHopRelnship.getEntity())) { + log.debug("Urn encountered again during graph walk {}", oneHopRelnship.getEntity()); result.put( oneHopRelnship.getEntity(), mergeLineageRelationships(result.get(oneHopRelnship.getEntity()), oneHopRelnship)); @@ -553,26 +554,6 @@ public static void addEdgeToPaths( addEdgeToPaths(existingPaths, parentUrn, null, childUrn); } - /** - * Utility method to log paths to the debug log. - * - * @param paths - * @param message - */ - private static void logPaths(UrnArrayArray paths, String message) { - if (log.isDebugEnabled()) { - log.debug("xxxxxxxxxx"); - log.debug(message); - log.debug("---------"); - if (paths != null) { - paths.forEach(path -> log.debug("{}", path)); - } else { - log.debug("EMPTY"); - } - log.debug("xxxxxxxxxx"); - } - } - private static boolean containsCycle(final UrnArray path) { Set urnSet = path.stream().collect(Collectors.toUnmodifiableSet()); // path contains a cycle if any urn is repeated twice @@ -587,8 +568,6 @@ public static boolean addEdgeToPaths( boolean edgeAdded = false; // Collect all full-paths to this child node. This is what will be returned. UrnArrayArray pathsToParent = existingPaths.get(parentUrn); - logPaths(pathsToParent, String.format("Paths to Parent: %s, Child: %s", parentUrn, childUrn)); - logPaths(existingPaths.get(childUrn), String.format("Existing Paths to Child: %s", childUrn)); if (pathsToParent != null && !pathsToParent.isEmpty()) { // If there are existing paths to this parent node, then we attempt // to append the child to each of the existing paths (lengthen it). @@ -630,7 +609,6 @@ public static boolean addEdgeToPaths( existingPaths.get(childUrn).add(pathToChild); edgeAdded = true; } - logPaths(existingPaths.get(childUrn), String.format("New paths to Child: %s", childUrn)); return edgeAdded; } @@ -655,7 +633,6 @@ private static List extractRelationships( for (SearchHit hit : hits) { index++; final Map document = hit.getSourceAsMap(); - log.debug("{}: hit: {}", index, document); final Urn sourceUrn = UrnUtils.getUrn(((Map) document.get(SOURCE)).get("urn").toString()); final Urn destinationUrn = @@ -808,7 +785,6 @@ private static List extractRelationships( } List result = new ArrayList<>(lineageRelationshipMap.values()); log.debug("Number of lineage relationships in list: {}", result.size()); - log.debug("Result: {}", result); return result; } catch (Exception e) { // This exception handler merely exists to log the exception at an appropriate point and diff --git a/metadata-service/configuration/src/main/resources/application.yml b/metadata-service/configuration/src/main/resources/application.yml index 617982e53e4d6..dc49590425b6a 100644 --- a/metadata-service/configuration/src/main/resources/application.yml +++ b/metadata-service/configuration/src/main/resources/application.yml @@ -230,7 +230,7 @@ elasticsearch: timeoutSeconds: ${ELASTICSEARCH_SEARCH_GRAPH_TIMEOUT_SECONDS:50} # graph dao timeout seconds batchSize: ${ELASTICSEARCH_SEARCH_GRAPH_BATCH_SIZE:1000} # graph dao batch size maxResult: ${ELASTICSEARCH_SEARCH_GRAPH_MAX_RESULT:10000} # graph dao max result size - enableMultiPathSearch: ${ELASTICSEARCH_SEARCH_GRAPH_MULTI_PATH_SEARCH:true} + enableMultiPathSearch: ${ELASTICSEARCH_SEARCH_GRAPH_MULTI_PATH_SEARCH:false} # TODO: Kafka topic convention kafka: @@ -315,7 +315,7 @@ systemUpdate: backOffFactor: ${BOOTSTRAP_SYSTEM_UPDATE_BACK_OFF_FACTOR:2} # Multiplicative factor for back off, default values will result in waiting 5min 15s waitForSystemUpdate: ${BOOTSTRAP_SYSTEM_UPDATE_WAIT_FOR_SYSTEM_UPDATE:true} dataJobNodeCLL: - enabled: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_ENABLED:true} + enabled: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_ENABLED:false} batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_BATCH_SIZE:200} browsePathsV2: enabled: ${BOOTSTRAP_SYSTEM_UPDATE_BROWSE_PATHS_V2_ENABLED:true} diff --git a/smoke-test/tests/lineage/test_lineage.py b/smoke-test/tests/lineage/test_lineage.py index 9cd98d1245bbb..a24a700593378 100644 --- a/smoke-test/tests/lineage/test_lineage.py +++ b/smoke-test/tests/lineage/test_lineage.py @@ -8,11 +8,8 @@ import pytest from datahub.cli.cli_utils import get_url_and_token from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.ingestion.graph.client import ( - DatahubClientConfig, - DataHubGraph, - get_default_graph, -) +from datahub.ingestion.graph.client import DataHubGraph # get_default_graph, +from datahub.ingestion.graph.client import DatahubClientConfig from datahub.metadata.schema_classes import ( AuditStampClass, ChangeAuditStampsClass, @@ -959,32 +956,33 @@ def ingest_multipath_metadata( wait_for_writes_to_sync() -@pytest.mark.dependency(depends=["test_healthchecks"]) -def test_simple_lineage_multiple_paths( - ingest_multipath_metadata, - chart_urn_fixture, - intermediates_fixture, - destination_urn_fixture, -): - chart_urn = chart_urn_fixture - intermediates = intermediates_fixture - destination_urn = destination_urn_fixture - results = search_across_lineage( - get_default_graph(), - chart_urn, - direction="UPSTREAM", - convert_schema_fields_to_datasets=True, - ) - assert destination_urn in [ - x["entity"]["urn"] for x in results["searchAcrossLineage"]["searchResults"] - ] - for search_result in results["searchAcrossLineage"]["searchResults"]: - if search_result["entity"]["urn"] == destination_urn: - assert ( - len(search_result["paths"]) == 2 - ) # 2 paths from the chart to the dataset - for path in search_result["paths"]: - assert len(path["path"]) == 3 - assert path["path"][-1]["urn"] == destination_urn - assert path["path"][0]["urn"] == chart_urn - assert path["path"][1]["urn"] in intermediates +# TODO: Reenable once fixed +# @pytest.mark.dependency(depends=["test_healthchecks"]) +# def test_simple_lineage_multiple_paths( +# ingest_multipath_metadata, +# chart_urn_fixture, +# intermediates_fixture, +# destination_urn_fixture, +# ): +# chart_urn = chart_urn_fixture +# intermediates = intermediates_fixture +# destination_urn = destination_urn_fixture +# results = search_across_lineage( +# get_default_graph(), +# chart_urn, +# direction="UPSTREAM", +# convert_schema_fields_to_datasets=True, +# ) +# assert destination_urn in [ +# x["entity"]["urn"] for x in results["searchAcrossLineage"]["searchResults"] +# ] +# for search_result in results["searchAcrossLineage"]["searchResults"]: +# if search_result["entity"]["urn"] == destination_urn: +# assert ( +# len(search_result["paths"]) == 2 +# ) # 2 paths from the chart to the dataset +# for path in search_result["paths"]: +# assert len(path["path"]) == 3 +# assert path["path"][-1]["urn"] == destination_urn +# assert path["path"][0]["urn"] == chart_urn +# assert path["path"][1]["urn"] in intermediates