From f590fd116cd900855cf8f0b40155f0955752d48d Mon Sep 17 00:00:00 2001 From: Tao He Date: Thu, 27 Jul 2023 17:20:27 +0800 Subject: [PATCH] feat(analytical): Add the Graph.consolidate_columns() interface in Python client (#3060) Signed-off-by: Tao He --- analytical_engine/core/grape_instance.cc | 34 ++++++++ analytical_engine/core/grape_instance.h | 4 + .../core/object/fragment_wrapper.h | 62 +++++++++++++++ .../core/object/i_fragment_wrapper.h | 6 ++ .../frame/property_graph_frame.cc | 1 + coordinator/gscoordinator/dag_manager.py | 1 + coordinator/gscoordinator/op_executor.py | 8 +- coordinator/gscoordinator/utils.py | 13 ++++ docs/reference/graph.rst | 2 +- proto/types.proto | 4 + python/graphscope/framework/dag_utils.py | 53 +++++++++++++ python/graphscope/framework/graph.py | 77 +++++++++++++++++++ python/graphscope/framework/utils.py | 18 ++++- python/graphscope/tests/conftest.py | 14 ++++ .../graphscope/tests/unittest/test_graph.py | 38 +++++++++ 15 files changed, 329 insertions(+), 6 deletions(-) diff --git a/analytical_engine/core/grape_instance.cc b/analytical_engine/core/grape_instance.cc index c39c03b39a09..1e9db96e91bf 100644 --- a/analytical_engine/core/grape_instance.cc +++ b/analytical_engine/core/grape_instance.cc @@ -1179,6 +1179,35 @@ bl::result GrapeInstance::addLabelsToGraph( return dst_wrapper->graph_def(); } +bl::result GrapeInstance::consolidateColumns( + const rpc::GSParams& params) { + BOOST_LEAF_AUTO(src_graph_name, params.Get(rpc::GRAPH_NAME)); + BOOST_LEAF_AUTO(label, + params.Get(rpc::CONSOLIDATE_COLUMNS_LABEL)); + BOOST_LEAF_AUTO(columns, + params.Get(rpc::CONSOLIDATE_COLUMNS_COLUMNS)); + BOOST_LEAF_AUTO(result_column, params.Get( + rpc::CONSOLIDATE_COLUMNS_RESULT_COLUMN)); + BOOST_LEAF_AUTO( + src_wrapper, + object_manager_.GetObject(src_graph_name)); + if (src_wrapper->graph_def().graph_type() != rpc::graph::ARROW_PROPERTY) { + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, + "ConsolidateColumns is only avaiable for ArrowFragment"); + } + std::string dst_graph_name = "graph_" + generateId(); + + VLOG(1) << "Consolidate columns from " << src_graph_name + << ", graph name: " << dst_graph_name << ":" + << "\nlabel = " << label << "\ncolumns = " << columns + << "\nresult_column = " << result_column; + BOOST_LEAF_AUTO(dst_wrapper, src_wrapper->ConsolidateColumns( + comm_spec_, dst_graph_name, label, columns, + result_column)); + BOOST_LEAF_CHECK(object_manager_.PutObject(dst_wrapper)); + return dst_wrapper->graph_def(); +} + bl::result> GrapeInstance::graphToNumpy( const rpc::GSParams& params) { std::pair range; @@ -1413,6 +1442,11 @@ bl::result> GrapeInstance::OnReceive( r->set_graph_def(graph_def); break; } + case rpc::CONSOLIDATE_COLUMNS: { + BOOST_LEAF_AUTO(graph_def, consolidateColumns(params)); + r->set_graph_def(graph_def); + break; + } case rpc::CONTEXT_TO_NUMPY: { BOOST_LEAF_AUTO(arc, contextToNumpy(params)); r->set_data(*arc, DispatchResult::AggregatePolicy::kPickFirst, true); diff --git a/analytical_engine/core/grape_instance.h b/analytical_engine/core/grape_instance.h index 03e32fbb6fa8..75f31c359a89 100644 --- a/analytical_engine/core/grape_instance.h +++ b/analytical_engine/core/grape_instance.h @@ -164,6 +164,10 @@ class GrapeInstance : public Subscriber { bl::result addLabelsToGraph( const rpc::GSParams& params); + + bl::result consolidateColumns( + const rpc::GSParams& params); + bl::result getContextData(const rpc::GSParams& params); bl::result> graphToNumpy( diff --git a/analytical_engine/core/object/fragment_wrapper.h b/analytical_engine/core/object/fragment_wrapper.h index aded8aaaea68..bcec84edc323 100644 --- a/analytical_engine/core/object/fragment_wrapper.h +++ b/analytical_engine/core/object/fragment_wrapper.h @@ -257,6 +257,7 @@ class FragmentWrapper< using fragment_t = vineyard::ArrowFragment; using label_id_t = typename fragment_t::label_id_t; + using prop_id_t = typename fragment_t::prop_id_t; public: FragmentWrapper(const std::string& id, rpc::graph::GraphDefPb graph_def, @@ -357,6 +358,67 @@ class FragmentWrapper< return std::dynamic_pointer_cast(wrapper); } + bl::result> ConsolidateColumns( + const grape::CommSpec& comm_spec, const std::string& dst_graph_name, + const std::string& label, const std::string& columns, + const std::string& result_column) override { + auto& schema = fragment_->schema(); + + label_id_t vertex_label_id = schema.GetVertexLabelId(label); + label_id_t edge_label_id = schema.GetEdgeLabelId(label); + + std::vector column_names; + boost::split(column_names, columns, boost::is_any_of(",;")); + + if (vertex_label_id == -1 && edge_label_id == -1) { + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, + "Invalid vertex or edge label: " + label); + } + + auto& meta = fragment_->meta(); + auto* client = dynamic_cast(meta.GetClient()); + vineyard::ObjectID new_frag_id = vineyard::InvalidObjectID(); + if (vertex_label_id != -1) { + BOOST_LEAF_ASSIGN(new_frag_id, fragment_->ConsolidateVertexColumns( + *client, vertex_label_id, column_names, + result_column)); + } else if (edge_label_id != -1) { + BOOST_LEAF_ASSIGN(new_frag_id, fragment_->ConsolidateEdgeColumns( + *client, edge_label_id, column_names, + result_column)); + } + + VINEYARD_CHECK_OK(client->Persist(new_frag_id)); + BOOST_LEAF_AUTO(frag_group_id, vineyard::ConstructFragmentGroup( + *client, new_frag_id, comm_spec)); + auto fg = std::dynamic_pointer_cast( + client->GetObject(frag_group_id)); + auto new_frag = client->GetObject(new_frag_id); + + rpc::graph::GraphDefPb new_graph_def; + + new_graph_def.set_key(dst_graph_name); + new_graph_def.set_compact_edges(new_frag->compact_edges()); + new_graph_def.set_use_perfect_hash(new_frag->use_perfect_hash()); + + gs::rpc::graph::VineyardInfoPb vy_info; + if (graph_def_.has_extension()) { + graph_def_.extension().UnpackTo(&vy_info); + } + vy_info.set_vineyard_id(frag_group_id); + vy_info.clear_fragments(); + for (auto const& item : fg->Fragments()) { + vy_info.add_fragments(item.second); + } + new_graph_def.mutable_extension()->PackFrom(vy_info); + + set_graph_def(new_frag, new_graph_def); + + auto wrapper = std::make_shared>( + dst_graph_name, new_graph_def, new_frag); + return std::dynamic_pointer_cast(wrapper); + } + bl::result> AddColumn( const grape::CommSpec& comm_spec, const std::string& dst_graph_name, std::shared_ptr& ctx_wrapper, diff --git a/analytical_engine/core/object/i_fragment_wrapper.h b/analytical_engine/core/object/i_fragment_wrapper.h index 91837214d8f8..f0878604dec1 100644 --- a/analytical_engine/core/object/i_fragment_wrapper.h +++ b/analytical_engine/core/object/i_fragment_wrapper.h @@ -90,6 +90,12 @@ class ILabeledFragmentWrapper : public IFragmentWrapper { const std::map>& vertices, const std::map>& edges) = 0; + virtual bl::result> + ConsolidateColumns(const grape::CommSpec& comm_spec, + const std::string& dst_graph_name, + const std::string& label, const std::string& columns, + const std::string& result_column) = 0; + virtual bl::result> AddColumn( const grape::CommSpec& comm_spec, const std::string& dst_graph_name, std::shared_ptr& ctx_wrapper, diff --git a/analytical_engine/frame/property_graph_frame.cc b/analytical_engine/frame/property_graph_frame.cc index 50ac6e9d89e7..992f618e4202 100644 --- a/analytical_engine/frame/property_graph_frame.cc +++ b/analytical_engine/frame/property_graph_frame.cc @@ -378,6 +378,7 @@ AddLabelsToGraph(vineyard::ObjectID origin_frag_id, graph_name, graph_def, frag); return std::dynamic_pointer_cast(wrapper); } + } // namespace detail /** diff --git a/coordinator/gscoordinator/dag_manager.py b/coordinator/gscoordinator/dag_manager.py index 50e579e25cd4..dade1bb0c72a 100644 --- a/coordinator/gscoordinator/dag_manager.py +++ b/coordinator/gscoordinator/dag_manager.py @@ -49,6 +49,7 @@ class DAGManager(object): types_pb2.CREATE_GRAPH, # spawn an io stream to read/write data from/to vineyard types_pb2.BIND_APP, # need loaded graph to compile types_pb2.ADD_LABELS, # need loaded graph + types_pb2.CONSOLIDATE_COLUMNS, # need loaded graph to transform selector types_pb2.RUN_APP, # need loaded app types_pb2.CONTEXT_TO_NUMPY, # need loaded graph to transform selector types_pb2.CONTEXT_TO_DATAFRAME, # need loaded graph to transform selector diff --git a/coordinator/gscoordinator/op_executor.py b/coordinator/gscoordinator/op_executor.py index 4c5499ec78b3..a4ef6a3dcc61 100644 --- a/coordinator/gscoordinator/op_executor.py +++ b/coordinator/gscoordinator/op_executor.py @@ -130,7 +130,11 @@ def pre_process(self, dag_def, dag_bodies, loader_op_bodies): ) # Handle op that depends on loader (data source) - if op.op == types_pb2.CREATE_GRAPH or op.op == types_pb2.ADD_LABELS: + if op.op in [ + types_pb2.CREATE_GRAPH, + types_pb2.CONSOLIDATE_COLUMNS, + types_pb2.ADD_LABELS, + ]: for key_of_parent_op in op.parents: parent_op = self._key_to_op[key_of_parent_op] if parent_op.op == types_pb2.DATA_SOURCE: @@ -153,6 +157,7 @@ def pre_process(self, dag_def, dag_bodies, loader_op_bodies): ) or op.op == types_pb2.TRANSFORM_GRAPH or op.op == types_pb2.PROJECT_TO_SIMPLE + or op.op == types_pb2.CONSOLIDATE_COLUMNS or op.op == types_pb2.ADD_LABELS or op.op == types_pb2.ARCHIVE_GRAPH ): @@ -187,6 +192,7 @@ def post_process(self, response_head, response_bodies): if op.op in ( types_pb2.CREATE_GRAPH, types_pb2.PROJECT_GRAPH, + types_pb2.CONSOLIDATE_COLUMNS, types_pb2.PROJECT_TO_SIMPLE, types_pb2.TRANSFORM_GRAPH, types_pb2.ADD_LABELS, diff --git a/coordinator/gscoordinator/utils.py b/coordinator/gscoordinator/utils.py index d20cfe611570..49ded0164cb0 100644 --- a/coordinator/gscoordinator/utils.py +++ b/coordinator/gscoordinator/utils.py @@ -674,6 +674,8 @@ def op_pre_process(op, op_result_pool, key_to_op, **kwargs): # noqa: C901 _pre_process_for_bind_app_op(op, op_result_pool, key_to_op, **kwargs) if op.op == types_pb2.PROJECT_GRAPH: _pre_process_for_project_op(op, op_result_pool, key_to_op, **kwargs) + if op.op == types_pb2.CONSOLIDATE_COLUMNS: + _pre_process_for_consolidate_columns_op(op, op_result_pool, key_to_op, **kwargs) if op.op == types_pb2.PROJECT_TO_SIMPLE: _pre_process_for_project_to_simple_op(op, op_result_pool, key_to_op, **kwargs) if op.op == types_pb2.ADD_COLUMN: @@ -1261,6 +1263,17 @@ def _get_all_e_props_id(schema, label): del op.attr[types_pb2.EDGE_COLLECTIONS] +def _pre_process_for_consolidate_columns_op(op, op_result_pool, key_to_op, **kwargs): + assert len(op.parents) == 1 + # get parent graph schema + key_of_parent_op = op.parents[0] + r = op_result_pool[key_of_parent_op] + graph_name = r.graph_def.key + op.attr[types_pb2.GRAPH_NAME].CopyFrom( + attr_value_pb2.AttrValue(s=graph_name.encode("utf-8", errors="ignore")) + ) + + def _pre_process_for_archive_graph_op(op, op_result_pool, key_to_op, **kwargs): assert len(op.parents) == 1 key_of_parent_op = op.parents[0] diff --git a/docs/reference/graph.rst b/docs/reference/graph.rst index f269f99477ab..cd331ab2ae62 100644 --- a/docs/reference/graph.rst +++ b/docs/reference/graph.rst @@ -9,7 +9,7 @@ Graph object .. autoclass:: GraphDAGNode :special-members: __init__ - :members: add_vertices, add_edges, add_column, project, unload + :members: add_vertices, add_edges, add_column, project, unload, consolidate_columns .. autoclass:: Graph :special-members: __init__ diff --git a/proto/types.proto b/proto/types.proto index 86b8039828ba..05ce8a3ed0c7 100644 --- a/proto/types.proto +++ b/proto/types.proto @@ -105,6 +105,7 @@ enum OperationType { ARCHIVE_GRAPH = 24; // archive graph SERIALIZE_GRAPH = 25; // serialize graph DESERIALIZE_GRAPH = 26; // desrialize graph + CONSOLIDATE_COLUMNS = 27; // consolidate property columns in the graph SUBGRAPH = 32; // subgraph in interactive query @@ -182,6 +183,9 @@ enum ParamKey { VERTEX_MAP_TYPE = 45; COMPACT_EDGES = 46; USE_PERFECT_HASH = 47; + CONSOLIDATE_COLUMNS_LABEL = 48; + CONSOLIDATE_COLUMNS_COLUMNS = 49; + CONSOLIDATE_COLUMNS_RESULT_COLUMN = 50; // project VERTEX_COLLECTIONS = 51; diff --git a/python/graphscope/framework/dag_utils.py b/python/graphscope/framework/dag_utils.py index 490330ca47ad..b27e96a80f24 100644 --- a/python/graphscope/framework/dag_utils.py +++ b/python/graphscope/framework/dag_utils.py @@ -18,6 +18,10 @@ import json import pickle +from typing import Dict +from typing import List +from typing import Tuple +from typing import Union from graphscope.framework import utils from graphscope.framework.errors import check_argument @@ -224,6 +228,55 @@ def add_labels_to_graph(graph, loader_op): return op +def consolidate_columns( + graph, + label: str, + columns: Union[List[str], Tuple[str]], + result_column: str, +): + """Consolidate property columns in the graph. + + Args: + graph (:class:`Graph`) + label (str): The label of the vertex/edge to be consolidated. + columns: The columns to be consolidated. + result_column: The column name of the result. + + Returns: + Operation + """ + check_argument(graph.graph_type == graph_def_pb2.ARROW_PROPERTY) + config = { + types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph._graph_type), + types_pb2.DIRECTED: utils.b_to_attr(graph._directed), + types_pb2.OID_TYPE: utils.s_to_attr(graph._oid_type), + types_pb2.GENERATE_EID: utils.b_to_attr(graph._generate_eid), + types_pb2.RETAIN_OID: utils.b_to_attr(graph._retain_oid), + types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(graph._vertex_map), + types_pb2.COMPACT_EDGES: utils.b_to_attr(graph._compact_edges), + types_pb2.USE_PERFECT_HASH: utils.b_to_attr(graph._use_perfect_hash), + types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"), + types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False), + types_pb2.IS_FROM_GAR: utils.b_to_attr(False), + types_pb2.CONSOLIDATE_COLUMNS_LABEL: utils.s_to_attr(label), + types_pb2.CONSOLIDATE_COLUMNS_COLUMNS: utils.s_to_attr(",".join(columns)), + types_pb2.CONSOLIDATE_COLUMNS_RESULT_COLUMN: utils.s_to_attr(result_column), + } + + # The key maybe filled later in coordinator + if hasattr(graph, "key"): + config[types_pb2.GRAPH_NAME] = utils.s_to_attr(graph.key) + + op = Operation( + graph.session_id, + types_pb2.CONSOLIDATE_COLUMNS, + config=config, + inputs=[graph.op], + output_types=types_pb2.GRAPH, + ) + return op + + def dynamic_to_arrow(graph): """Create an op to transform a :class:`nx.Graph` object to :class:`Graph`. diff --git a/python/graphscope/framework/graph.py b/python/graphscope/framework/graph.py index 78dcc6ec2ac7..911d3051f8e6 100644 --- a/python/graphscope/framework/graph.py +++ b/python/graphscope/framework/graph.py @@ -23,8 +23,10 @@ from abc import ABCMeta from abc import abstractmethod from copy import deepcopy +from typing import Dict from typing import List from typing import Mapping +from typing import Tuple from typing import Union try: @@ -41,6 +43,7 @@ from graphscope.framework.graph_utils import EdgeSubLabel from graphscope.framework.graph_utils import VertexLabel from graphscope.framework.operation import Operation +from graphscope.framework.utils import apply_docstring from graphscope.framework.utils import data_type_to_cpp from graphscope.proto import attr_value_pb2 from graphscope.proto import graph_def_pb2 @@ -87,6 +90,15 @@ def add_edges( ): raise NotImplementedError + @abstractmethod + def consolidate_columns( + self, + label: str, + columns: Union[List[str], Tuple[str]], + result_column: str, + ): + raise NotImplementedError + def is_directed(self): return self._directed @@ -683,6 +695,53 @@ def add_edges( graph_dag_node._base_graph = parent return graph_dag_node + def consolidate_columns( + self, + label: str, + columns: Union[List[str], Tuple[str]], + result_column: str, + ): + """Consolidate columns of given vertex / edge properties (of same type) into one column. + + For example, if we have a graph with vertex label "person", and edge labels "knows" + and "follows", and we want to consolidate the "weight0", "weight1" properties of the + vertex and both edges into a new column "weight", we can do: + + .. code:: python + + >>> g = ... + >>> g = g.consolidate_columns("person", ["weight0", "weight1"], "weight") + >>> g = g.consolidate_columns("knows", ["weight0", "weight1"], "weight") + >>> g = g.consolidate_columns("follows", ["weight0", "weight1"], "weight") + + Args: + label: the label of the vertex or edge. + columns (dict): the properties of given vertex or edge to be consolidated. + result_column: the name of the new column. + + Returns: + :class:`graphscope.framework.graph.GraphDAGNode`: + A new graph with column consolidated, evaluated in eager mode. + """ + check_argument( + isinstance(columns, (list, tuple)), + "columns must be a list or tuple of strings", + ) + op = dag_utils.consolidate_columns(self, label, columns, result_column) + graph_dag_node = GraphDAGNode( + self._session, + op, + self._oid_type, + self._directed, + self._generate_eid, + self._retain_oid, + self._vertex_map, + self._compact_edges, + self._use_perfect_hash, + ) + graph_dag_node._base_graph = self + return graph_dag_node + def _backtrack_graph_dag_node_by_op_key(self, key): if self.op.key == key: return self @@ -1010,11 +1069,13 @@ def __del__(self): except Exception: # pylint: disable=broad-except pass + @apply_docstring(GraphDAGNode._project_to_simple) def _project_to_simple(self, v_prop=None, e_prop=None): return self._session._wrapper( self._graph_node._project_to_simple(v_prop, e_prop) ) + @apply_docstring(GraphDAGNode.add_column) def add_column(self, results, selector): return self._session._wrapper(self._graph_node.add_column(results, selector)) @@ -1126,6 +1187,7 @@ def archive(self, path): """ return self._session._wrapper(self._graph_node.archive(path)) + @apply_docstring(GraphDAGNode.add_vertices) def add_vertices( self, vertices, label="_", properties=None, vid_field: Union[int, str] = 0 ) -> Union["Graph", GraphDAGNode]: @@ -1135,6 +1197,7 @@ def add_vertices( self._graph_node.add_vertices(vertices, label, properties, vid_field) ) + @apply_docstring(GraphDAGNode.add_edges) def add_edges( self, edges, @@ -1153,6 +1216,20 @@ def add_edges( ) ) + @apply_docstring(GraphDAGNode.consolidate_columns) + def consolidate_columns( + self, + label: str, + columns: Union[List[str], Tuple[str]], + result_column: str, + ) -> Union["Graph", GraphDAGNode]: + if not self.loaded(): + raise RuntimeError("The graph is not loaded") + return self._session._wrapper( + self._graph_node.consolidate_columns(label, columns, result_column) + ) + + @apply_docstring(GraphDAGNode.project) def project( self, vertices: Mapping[str, Union[List[str], None]], diff --git a/python/graphscope/framework/utils.py b/python/graphscope/framework/utils.py index 67572567d043..94214241b22b 100644 --- a/python/graphscope/framework/utils.py +++ b/python/graphscope/framework/utils.py @@ -502,13 +502,13 @@ def _unify_str_type(t): return graph_def_pb2.DataTypePb.STRING elif t == "bytes": return graph_def_pb2.DataTypePb.BYTES - elif t == "int_list": + elif t == "int_list" or t.startswith("fixedlistint"): return graph_def_pb2.DataTypePb.INT_LIST - elif t == "long_list": + elif t == "long_list" or t.startswith("fixedlistlong"): return graph_def_pb2.DataTypePb.LONG_LIST - elif t == "float_list": + elif t == "float_list" or t.startswith("fixedlistfloat"): return graph_def_pb2.DataTypePb.FLOAT_LIST - elif t == "double_list": + elif t == "double_list" or t.startswith("fixedlistdouble"): return graph_def_pb2.DataTypePb.DOUBLE_LIST elif t in ("empty", "grape::emptytype"): return graph_def_pb2.NULLVALUE @@ -692,3 +692,13 @@ def wrapper(*args, **kwargs): return wrapper return decorator + + +def apply_docstring(fn): + """Apply the docstring of `fn` to annotated function.""" + + def decorator(func): + func.__doc__ = fn.__doc__ + return func + + return decorator diff --git a/python/graphscope/tests/conftest.py b/python/graphscope/tests/conftest.py index 9b00dc3ebfc5..3439609c0218 100644 --- a/python/graphscope/tests/conftest.py +++ b/python/graphscope/tests/conftest.py @@ -403,6 +403,20 @@ def p2p_property_graph(graphscope_session): del g +@pytest.fixture(scope="module") +def p2p_multi_property_graph(graphscope_session): + g = graphscope_session.g(generate_eid=False, retain_oid=True, directed=True) + g = g.add_vertices(f"{property_dir}/p2p-31_multi_property_v_0", "person") + g = g.add_edges( + f"{property_dir}/p2p-31_multi_property_e_0", + label="knows", + src_label="person", + dst_label="person", + ) + yield g + del g + + @pytest.fixture(scope="module") def p2p_graph_from_pandas(graphscope_session): # set chunk size to 1k diff --git a/python/graphscope/tests/unittest/test_graph.py b/python/graphscope/tests/unittest/test_graph.py index bae36c787f2d..57c9c5d3fbf7 100644 --- a/python/graphscope/tests/unittest/test_graph.py +++ b/python/graphscope/tests/unittest/test_graph.py @@ -510,6 +510,44 @@ def test_project_project(ldbc_graph): assert pg2.schema.edge_labels == ["isSubclassOf"] +def test_consolidate_columns(p2p_multi_property_graph): + def assert_graph_has_props(g, vps, eps): + vprops = [p.name for p in g.schema.get_vertex_properties("person")] + eprops = [p.name for p in g.schema.get_edge_properties("knows")] + for p in vps: + assert p in vprops + for p in eps: + assert p in eprops + + def assert_graph_not_has_props(g, vps, eps): + vprops = [p.name for p in g.schema.get_vertex_properties("person")] + eprops = [p.name for p in g.schema.get_edge_properties("knows")] + for p in vps: + assert p not in vprops + for p in eps: + assert p not in eprops + + g = p2p_multi_property_graph + assert_graph_has_props( + g, + ["prop_0", "prop_1", "prop_2", "prop_3", "prop_4"], + ["prop_0", "prop_1", "prop_2", "prop_3", "prop_4"], + ) + + g = g.consolidate_columns("person", ["prop_0", "prop_1", "prop_2"], "vprops_column") + g = g.consolidate_columns("knows", ["prop_2", "prop_3", "prop_4"], "eprops_column") + assert_graph_has_props( + g, + ["vprops_column", "prop_3", "prop_4"], + ["prop_0", "prop_1", "eprops_column"], + ) + assert_graph_not_has_props( + g, + ["prop_0", "prop_1", "prop_2"], + ["prop_2", "prop_3", "prop_4"], + ) + + def test_error_on_project(arrow_property_graph, ldbc_graph): graph = arrow_property_graph g2 = graph.project(vertices={"v0": []}, edges={"e0": []})