Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the drop-duplicate functionality #4095

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
883644c
add SG support for dropping multi-edges through the CAPI
jnke2016 Jan 16, 2024
6113946
add MG support for dropping multi-edges and deprecaate parameter
jnke2016 Jan 17, 2024
f92d936
fix style
jnke2016 Jan 17, 2024
64ec680
fix copyright
jnke2016 Jan 17, 2024
2e4b0a7
Merge remote-tracking branch 'upstream/branch-24.02' into branch-24.0…
jnke2016 Jan 17, 2024
0695fed
Merge remote-tracking branch 'upstream/branch-24.02' into branch-24.0…
jnke2016 Jan 22, 2024
0442e53
fix typo
jnke2016 Jan 24, 2024
fd98039
fix typo
jnke2016 Jan 24, 2024
1d87370
reorder arguments
jnke2016 Jan 24, 2024
1d39ec6
Merge remote-tracking branch 'upstream/branch-24.02' into branch-24.0…
jnke2016 Jan 24, 2024
c4d9580
add 'do_expensive_check'
jnke2016 Jan 24, 2024
ad62f33
fix style
jnke2016 Jan 24, 2024
779bd2d
update graph creation warning description
jnke2016 Jan 24, 2024
5900bd8
update docstrings
jnke2016 Jan 26, 2024
705788f
fix style
jnke2016 Jan 26, 2024
5fa81c4
drop duplicates when viewing the edgelist
jnke2016 Jan 30, 2024
c85f622
drop duplicates when viewing edges and update tests
jnke2016 Jan 30, 2024
8f7436e
fix style
jnke2016 Jan 30, 2024
7c165e7
remove debug print
jnke2016 Jan 30, 2024
fc5e627
remove unused import
jnke2016 Jan 30, 2024
50a11c0
Merge remote-tracking branch 'upstream/branch-24.02' into branch-24.0…
jnke2016 Jan 30, 2024
f3f1c3f
undo changes to test
jnke2016 Jan 31, 2024
77fe0f5
drop duplicate edges, update tests and copyright
jnke2016 Jan 31, 2024
7db4a10
fix style
jnke2016 Jan 31, 2024
b3077cf
uncommment test
jnke2016 Jan 31, 2024
bdcd215
fix typo
jnke2016 Jan 31, 2024
2c6892a
remove outdated comment
jnke2016 Jan 31, 2024
ff5373a
remove debug print
jnke2016 Jan 31, 2024
1efef76
update tests
jnke2016 Jan 31, 2024
ebbd33f
fix style
jnke2016 Jan 31, 2024
ed76013
update number of edges count
jnke2016 Jan 31, 2024
890f885
fix style
jnke2016 Jan 31, 2024
ccc5189
reset changes to sg
jnke2016 Feb 1, 2024
63a669b
update tests
jnke2016 Feb 1, 2024
a6d767f
fix style
jnke2016 Feb 1, 2024
e7ea077
update copyright
jnke2016 Feb 1, 2024
ad2602c
Merge remote-tracking branch 'upstream/branch-24.02' into branch-24.0…
jnke2016 Feb 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
)
from cugraph.dask.common.mg_utils import run_gc_on_dask_cluster
import cugraph.dask.comms.comms as Comms
from cugraph.structure.symmetrize import _memory_efficient_drop_duplicates


class simpleDistributedGraphImpl:
Expand Down Expand Up @@ -95,6 +96,7 @@ def _make_plc_graph(
weight_type,
edge_id_type,
edge_type_id,
drop_multi_edges,
):
weights = None
edge_ids = None
Expand Down Expand Up @@ -149,6 +151,7 @@ def _make_plc_graph(
num_arrays=num_arrays,
store_transposed=store_transposed,
do_expensive_check=False,
drop_multi_edges=drop_multi_edges,
)
del edata_x
gc.collect()
Expand Down Expand Up @@ -267,7 +270,7 @@ def __from_edgelist(
input_ddf,
source,
destination,
multi=self.properties.multi_edge,
multi=True, # Deprecated parameter
symmetrize=not self.properties.directed,
)
value_col = None
Expand All @@ -277,7 +280,7 @@ def __from_edgelist(
source,
destination,
value_col_names,
multi=self.properties.multi_edge,
multi=True, # Deprecated parameter
symmetrize=not self.properties.directed,
)

Expand Down Expand Up @@ -364,6 +367,7 @@ def __from_edgelist(
self.weight_type,
self.edge_id_type,
self.edge_type_id_type,
not self.properties.multi_edge,
)
for w, edata in persisted_keys_d.items()
}
Expand Down Expand Up @@ -455,6 +459,15 @@ def view_edge_list(self):
else:
is_multi_column = True

if not self.properties.multi_edge:
# Drop parallel edges for non MultiGraph
# FIXME: Drop multi edges with the CAPI instead.
_client = default_client()
workers = _client.scheduler_info()["workers"]
edgelist_df = _memory_efficient_drop_duplicates(
edgelist_df, [srcCol, dstCol], len(workers)
)

edgelist_df[srcCol], edgelist_df[dstCol] = edgelist_df[
[srcCol, dstCol]
].min(axis=1), edgelist_df[[srcCol, dstCol]].max(axis=1)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -264,7 +264,7 @@ def __from_edgelist(
source,
destination,
edge_attr,
multi=self.properties.multi_edge,
multi=self.properties.multi_edge, # Deprecated parameter
symmetrize=not self.properties.directed,
)

Expand All @@ -279,7 +279,7 @@ def __from_edgelist(
elist,
source,
destination,
multi=self.properties.multi_edge,
multi=self.properties.multi_edge, # Deprecated parameter
symmetrize=not self.properties.directed,
)

Expand All @@ -298,7 +298,10 @@ def __from_edgelist(
self._replicate_edgelist()

self._make_plc_graph(
value_col=value_col, store_transposed=store_transposed, renumber=renumber
value_col=value_col,
store_transposed=store_transposed,
renumber=renumber,
drop_multi_edges=not self.properties.multi_edge,
)

def to_pandas_edgelist(
Expand Down Expand Up @@ -477,13 +480,15 @@ def view_edge_list(self):
edgelist_df[simpleGraphImpl.srcCol]
<= edgelist_df[simpleGraphImpl.dstCol]
]

elif not use_initial_input_df and self.properties.renumbered:
# Do not unrenumber the vertices if the initial input df was used
if not self.properties.directed:
edgelist_df = edgelist_df[
edgelist_df[simpleGraphImpl.srcCol]
<= edgelist_df[simpleGraphImpl.dstCol]
]

edgelist_df = self.renumber_map.unrenumber(
edgelist_df, simpleGraphImpl.srcCol
)
Expand Down Expand Up @@ -1084,6 +1089,7 @@ def _make_plc_graph(
value_col: Dict[str, cudf.DataFrame] = None,
store_transposed: bool = False,
renumber: bool = True,
drop_multi_edges: bool = False,
):
"""
Parameters
Expand All @@ -1100,6 +1106,8 @@ def _make_plc_graph(
Whether to renumber the vertices of the graph.
Required if inputted vertex ids are not of
int32 or int64 type.
drop_multi_edges: bool (default=False)
Whether to drop multi edges
"""

if value_col is None:
Expand Down Expand Up @@ -1163,6 +1171,7 @@ def _make_plc_graph(
renumber=renumber,
do_expensive_check=True,
input_array_format=input_array_format,
drop_multi_edges=drop_multi_edges,
)

def to_directed(self, DiG, store_transposed=False):
Expand Down
37 changes: 34 additions & 3 deletions python/cugraph/cugraph/structure/symmetrize.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2023, NVIDIA CORPORATION.
# Copyright (c) 2019-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -15,6 +15,7 @@
import cudf
import dask_cudf
from dask.distributed import default_client
import warnings


def symmetrize_df(
Expand Down Expand Up @@ -54,6 +55,11 @@ def symmetrize_df(
Name of the column in the data frame containing the weight ids

multi : bool, optional (default=False)
[Deprecated, Multi will be removed in future version, and the removal
of multi edges will no longer be supported from 'symmetrize'.
Multi edges will be removed upon creation of graph instance directly
based on if the graph is `curgaph.MultiGraph` or `cugraph.Graph`.]

Set to True if graph is a Multi(Di)Graph. This allows multiple
edges instead of dropping them.

Expand Down Expand Up @@ -84,6 +90,12 @@ def symmetrize_df(
if multi:
return result
else:
warnings.warn(
"Multi is deprecated and the removal of multi edges will no longer be "
"supported from 'symmetrize'. Multi edges will be removed upon creation "
"of graph instance.",
FutureWarning,
)
vertex_col_name = src_name + dst_name
result = result.groupby(by=[*vertex_col_name], as_index=False).min()
return result
Expand Down Expand Up @@ -128,6 +140,11 @@ def symmetrize_ddf(
Name of the column in the data frame containing the weight ids

multi : bool, optional (default=False)
[Deprecated, Multi will be removed in future version, and the removal
of multi edges will no longer be supported from 'symmetrize'.
Multi edges will be removed upon creation of graph instance directly
based on if the graph is `curgaph.MultiGraph` or `cugraph.Graph`.]

Set to True if graph is a Multi(Di)Graph. This allows multiple
edges instead of dropping them.

Expand Down Expand Up @@ -165,8 +182,15 @@ def symmetrize_ddf(
else:
result = ddf
if multi:
result = result.reset_index(drop=True).repartition(npartitions=len(workers) * 2)
return result
else:
warnings.warn(
"Multi is deprecated and the removal of multi edges will no longer be "
"supported from 'symmetrize'. Multi edges will be removed upon creation "
"of graph instance.",
FutureWarning,
)
vertex_col_name = src_name + dst_name
result = _memory_efficient_drop_duplicates(
result, vertex_col_name, len(workers)
Expand All @@ -181,6 +205,7 @@ def symmetrize(
value_col_name=None,
multi=False,
symmetrize=True,
do_expensive_check=False,
):
"""
Take a dataframe of source destination pairs along with associated
Expand Down Expand Up @@ -208,6 +233,11 @@ def symmetrize(
weights column name.

multi : bool, optional (default=False)
[Deprecated, Multi will be removed in future version, and the removal
of multi edges will no longer be supported from 'symmetrize'.
Multi edges will be removed upon creation of graph instance directly
based on if the graph is `curgaph.MultiGraph` or `cugraph.Graph`.]

Set to True if graph is a Multi(Di)Graph. This allows multiple
edges instead of dropping them.

Expand All @@ -234,8 +264,9 @@ def symmetrize(
if "edge_id" in input_df.columns and symmetrize:
raise ValueError("Edge IDs are not supported on undirected graphs")

csg.null_check(input_df[source_col_name])
csg.null_check(input_df[dest_col_name])
if do_expensive_check: # FIXME: Optimize this check as it is currently expensive
csg.null_check(input_df[source_col_name])
csg.null_check(input_df[dest_col_name])

if isinstance(input_df, dask_cudf.DataFrame):
output_df = symmetrize_ddf(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -26,6 +26,7 @@
from cugraph.testing import UNDIRECTED_DATASETS
from cugraph.dask import uniform_neighbor_sample
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.structure.symmetrize import _memory_efficient_drop_duplicates
from cugraph.datasets import email_Eu_core, small_tree
from pylibcugraph.testing.utils import gen_fixture_params_product

Expand Down Expand Up @@ -135,6 +136,14 @@ def test_mg_uniform_neighbor_sample_simple(dask_client, input_combo):
dg = input_combo["MGGraph"]

input_df = dg.input_df
# Drop parallel edges for non MultiGraph
# FIXME: Drop multi edges with the CAPI instead.
vertex_col_name = ["src", "dst"]
workers = dask_client.scheduler_info()["workers"]
input_df = _memory_efficient_drop_duplicates(
input_df, vertex_col_name, len(workers)
)

result_nbr = uniform_neighbor_sample(
dg,
input_combo["start_list"],
Expand Down
2 changes: 1 addition & 1 deletion python/pylibcugraph/pylibcugraph/graphs.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,9 @@ cdef class MGGraph(_GPUGraph):
edge_type_view_ptr_ptr,
store_transposed,
num_arrays,
do_expensive_check,
drop_self_loops,
drop_multi_edges,
do_expensive_check,
&(self.c_graph_ptr),
&error_ptr)

Expand Down
Loading