diff --git a/analytical_engine/core/grape_instance.cc b/analytical_engine/core/grape_instance.cc index 0e1e1c5b5087..c39c03b39a09 100644 --- a/analytical_engine/core/grape_instance.cc +++ b/analytical_engine/core/grape_instance.cc @@ -318,12 +318,14 @@ bl::result GrapeInstance::query(const rpc::GSParams& params, app->Query(worker.get(), query_args, context_key, wrapper)); std::string context_type; std::string context_schema; - if (ctx_wrapper != nullptr) { + if (ctx_wrapper == nullptr) { + RETURN_GS_ERROR( + vineyard::ErrorCode::kIllegalStateError, + "Query returns a null context wrapper without useful error message"); + } else { context_type = ctx_wrapper->context_type(); context_schema = ctx_wrapper->schema(); BOOST_LEAF_CHECK(object_manager_.PutObject(ctx_wrapper)); - } else { - LOG(ERROR) << "Error occur when querying"; } return toJson({{"context_type", context_type}, {"context_key", context_key}, diff --git a/analytical_engine/frame/app_frame.cc b/analytical_engine/frame/app_frame.cc index 14edae6db9f5..39bf7ccd975b 100644 --- a/analytical_engine/frame/app_frame.cc +++ b/analytical_engine/frame/app_frame.cc @@ -82,27 +82,25 @@ __attribute__((visibility("hidden"))) std::nullptr_t DeleteWorker( return nullptr; } -__attribute__((visibility("hidden"))) std::nullptr_t Query( +__attribute__((visibility("hidden"))) bl::result Query( void* worker_handler, const gs::rpc::QueryArgs& query_args, const std::string& context_key, std::shared_ptr frag_wrapper, - std::shared_ptr& ctx_wrapper, - bl::result& wrapper_error) { + std::shared_ptr& ctx_wrapper) { auto worker = static_cast(worker_handler)->worker; auto result = gs::AppInvoker<_APP_TYPE>::Query(worker, query_args); - if (!result) { - wrapper_error = std::move(result); - return nullptr; + if (result) { + if (!context_key.empty()) { + auto ctx = worker->GetContext(); + ctx_wrapper = gs::CtxWrapperBuilder::build( + context_key, frag_wrapper, ctx); + } } - - if (!context_key.empty()) { - auto ctx = worker->GetContext(); - ctx_wrapper = gs::CtxWrapperBuilder::build( - context_key, frag_wrapper, ctx); - } - return nullptr; + return result; } + } // namespace detail + extern "C" { void* CreateWorker(const std::shared_ptr& fragment, const grape::CommSpec& comm_spec, @@ -125,6 +123,6 @@ void Query(void* worker_handler, const gs::rpc::QueryArgs& query_args, bl::result& wrapper_error) { __FRAME_CATCH_AND_ASSIGN_GS_ERROR( wrapper_error, detail::Query(worker_handler, query_args, context_key, - frag_wrapper, ctx_wrapper, wrapper_error)); + frag_wrapper, ctx_wrapper)); } } // extern "C" diff --git a/analytical_engine/frame/cython_app_frame.cc b/analytical_engine/frame/cython_app_frame.cc index 71914faff1ea..bd9223b9c81c 100644 --- a/analytical_engine/frame/cython_app_frame.cc +++ b/analytical_engine/frame/cython_app_frame.cc @@ -173,25 +173,21 @@ __attribute__((visibility("hidden"))) std::nullptr_t DeleteWorker( return nullptr; } -__attribute__((visibility("hidden"))) std::nullptr_t Query( +__attribute__((visibility("hidden"))) bl::result Query( void* worker_handler, const gs::rpc::QueryArgs& query_args, const std::string& context_key, std::shared_ptr frag_wrapper, - std::shared_ptr& ctx_wrapper, - bl::result& wrapper_error) { + std::shared_ptr& ctx_wrapper) { auto worker = static_cast(worker_handler)->worker; auto result = gs::AppInvoker<_APP_TYPE>::Query(worker, query_args); - if (!result) { - wrapper_error = std::move(result); - return nullptr; + if (result) { + if (!context_key.empty()) { + auto ctx = worker->GetContext(); + ctx_wrapper = gs::CtxWrapperBuilder::build( + context_key, frag_wrapper, ctx); + } } - - if (!context_key.empty()) { - auto ctx = worker->GetContext(); - ctx_wrapper = gs::CtxWrapperBuilder::build( - context_key, frag_wrapper, ctx); - } - return nullptr; + return result; } } // namespace detail @@ -218,6 +214,6 @@ void Query(void* worker_handler, const gs::rpc::QueryArgs& query_args, bl::result& wrapper_error) { __FRAME_CATCH_AND_ASSIGN_GS_ERROR( wrapper_error, detail::Query(worker_handler, query_args, context_key, - frag_wrapper, ctx_wrapper, wrapper_error)); + frag_wrapper, ctx_wrapper)); } } // extern "C" diff --git a/analytical_engine/frame/cython_pie_app_frame.cc b/analytical_engine/frame/cython_pie_app_frame.cc index 2771d98eeff5..5da38b7764dd 100644 --- a/analytical_engine/frame/cython_pie_app_frame.cc +++ b/analytical_engine/frame/cython_pie_app_frame.cc @@ -161,25 +161,21 @@ __attribute__((visibility("hidden"))) std::nullptr_t DeleteWorker( return nullptr; } -__attribute__((visibility("hidden"))) std::nullptr_t Query( +__attribute__((visibility("hidden"))) bl::result Query( void* worker_handler, const gs::rpc::QueryArgs& query_args, const std::string& context_key, std::shared_ptr frag_wrapper, - std::shared_ptr& ctx_wrapper, - bl::result& wrapper_error) { + std::shared_ptr& ctx_wrapper) { auto worker = static_cast(worker_handler)->worker; auto result = gs::AppInvoker<_APP_TYPE>::Query(worker, query_args); - if (!result) { - wrapper_error = std::move(result); - return nullptr; + if (result) { + if (!context_key.empty()) { + auto ctx = worker->GetContext(); + ctx_wrapper = gs::CtxWrapperBuilder::build( + context_key, frag_wrapper, ctx); + } } - - if (!context_key.empty()) { - auto ctx = worker->GetContext(); - ctx_wrapper = gs::CtxWrapperBuilder::build( - context_key, frag_wrapper, ctx); - } - return nullptr; + return result; } } // namespace detail @@ -206,7 +202,7 @@ void Query(void* worker_handler, const gs::rpc::QueryArgs& query_args, bl::result& wrapper_error) { __FRAME_CATCH_AND_ASSIGN_GS_ERROR( wrapper_error, detail::Query(worker_handler, query_args, context_key, - frag_wrapper, ctx_wrapper, wrapper_error)); + frag_wrapper, ctx_wrapper)); } } // extern "C" diff --git a/analytical_engine/frame/flash_app_frame.cc b/analytical_engine/frame/flash_app_frame.cc index eeedaf659963..63c01c7a12df 100644 --- a/analytical_engine/frame/flash_app_frame.cc +++ b/analytical_engine/frame/flash_app_frame.cc @@ -82,25 +82,21 @@ __attribute__((visibility("hidden"))) std::nullptr_t DeleteWorker( return nullptr; } -__attribute__((visibility("hidden"))) std::nullptr_t Query( +__attribute__((visibility("hidden"))) bl::result Query( void* worker_handler, const gs::rpc::QueryArgs& query_args, const std::string& context_key, std::shared_ptr frag_wrapper, - std::shared_ptr& ctx_wrapper, - bl::result& wrapper_error) { + std::shared_ptr& ctx_wrapper) { auto worker = static_cast(worker_handler)->worker; auto result = gs::FlashAppInvoker<_APP_TYPE>::Query(worker, query_args); - if (!result) { - wrapper_error = std::move(result); - return nullptr; + if (result) { + if (!context_key.empty()) { + auto ctx = worker->GetContext(); + ctx_wrapper = gs::CtxWrapperBuilder::build( + context_key, frag_wrapper, ctx); + } } - - if (!context_key.empty()) { - auto ctx = worker->GetContext(); - ctx_wrapper = gs::CtxWrapperBuilder::build( - context_key, frag_wrapper, ctx); - } - return nullptr; + return result; } } // namespace detail extern "C" { @@ -125,6 +121,6 @@ void Query(void* worker_handler, const gs::rpc::QueryArgs& query_args, bl::result& wrapper_error) { __FRAME_CATCH_AND_ASSIGN_GS_ERROR( wrapper_error, detail::Query(worker_handler, query_args, context_key, - frag_wrapper, ctx_wrapper, wrapper_error)); + frag_wrapper, ctx_wrapper)); } } // extern "C" diff --git a/analytical_engine/test/run_app.h b/analytical_engine/test/run_app.h index 292f3aba84d1..07147d7fc326 100644 --- a/analytical_engine/test/run_app.h +++ b/analytical_engine/test/run_app.h @@ -286,14 +286,16 @@ void Run() { using AppType = grape::WCC; CreateAndQuery(comm_spec, efile, vfile, out_prefix, FLAGS_datasource, fnum, spec); - } else if (name == "wcc_opt") { - using GraphType = - grape::ImmutableEdgecutFragment; - using AppType = grape::WCCOpt; - CreateAndQuery(comm_spec, efile, vfile, out_prefix, - FLAGS_datasource, fnum, spec); + // skip to help CI runners to pass the compilation without OOM. + // + // } else if (name == "wcc_opt") { + // using GraphType = + // grape::ImmutableEdgecutFragment; + // using AppType = grape::WCCOpt; + // CreateAndQuery(comm_spec, efile, vfile, out_prefix, + // FLAGS_datasource, fnum, spec); } else if (name == "lcc_auto") { using GraphType = grape::ImmutableEdgecutFragment>> import graphscope + >>> from graphscope.dataset import load_p2p_network + >>> sess = graphscope.session(cluster_type="hosts", mode="eager") + >>> + >>> g = load_p2p_network(sess) + >>> + >>> # project to a simple graph (if needed) + >>> pg = g.project(vertices={"host": ["id"]}, edges={"connect": ["dist"]}) + >>> + >>> c = graphscope.custom_analytical_algorithm(pg, 'wcc') + >>> + >>> c = graphscope.custom_analytical_algorithm(pg, 'sssp', 6) + >>> + >>> sess.close() + """ + return AppAssets( + algo=algorithm, context=context, cmake_extra_options=cmake_extra_options + )(graph, *args, **kwargs) diff --git a/python/graphscope/analytical/app/sssp.py b/python/graphscope/analytical/app/sssp.py index 41ff686b38e9..d09cf930497a 100644 --- a/python/graphscope/analytical/app/sssp.py +++ b/python/graphscope/analytical/app/sssp.py @@ -63,12 +63,12 @@ def sssp(graph, src=0, weight=None): """ if not isinstance(graph, GraphDAGNode): if graph.schema.edata_type == graph_def_pb2.NULLVALUE: - raise RuntimeError( + raise ValueError( "The edge data is empty, and the edge data type should be integers or " - "floating point numbers to run SSSP. Suggest to use bfs() algorithm." + "floating point numbers to run SSSP. Use the `bfs()` algorithm instead." ) if graph.schema.edata_type == graph_def_pb2.STRING: - raise RuntimeError( + raise ValueError( "The edge data type is string, and the edge data type should be " "integers or floating point numbers to run SSSP." ) diff --git a/python/graphscope/analytical/app/wcc.py b/python/graphscope/analytical/app/wcc.py index 347d1da55716..59a7793f4505 100644 --- a/python/graphscope/analytical/app/wcc.py +++ b/python/graphscope/analytical/app/wcc.py @@ -22,7 +22,7 @@ from graphscope.framework.app import not_compatible_for from graphscope.framework.app import project_to_simple -__all__ = ["wcc", "wcc_auto", "wcc_projected"] +__all__ = ["wcc", "wcc_opt", "wcc_auto", "wcc_projected"] logger = logging.getLogger("graphscope") diff --git a/python/graphscope/tests/unittest/test_app.py b/python/graphscope/tests/unittest/test_app.py index de833f4d9838..92d56695c360 100644 --- a/python/graphscope/tests/unittest/test_app.py +++ b/python/graphscope/tests/unittest/test_app.py @@ -27,6 +27,7 @@ from graphscope import avg_clustering from graphscope import bfs from graphscope import clustering # directed / undirected would call different app +from graphscope import custom_analytical_algorithm from graphscope import degree_centrality from graphscope import eigenvector_centrality from graphscope import hits @@ -248,12 +249,17 @@ def test_run_app_on_string_oid_graph(p2p_project_undirected_graph_string): r1 = ctx.to_dataframe({"node": "v.id", "r": "r"}) -@pytest.mark.skipif("FULL_TEST_SUITE" not in os.environ, reason="Run in nightly CI") def test_error_on_run_app(projected_pg_no_edge_data): # compile error: wrong type of edge data with sssp - with pytest.raises(graphscope.CompilationError): + with pytest.raises(ValueError): sssp(projected_pg_no_edge_data, src=4) + with pytest.raises( + graphscope.AnalyticalEngineInternalError, + match="args_num >= query_args.args_size()", + ): + custom_analytical_algorithm(projected_pg_no_edge_data, "wcc", 1, 2, 3) + def test_app_on_local_vm_graph( p2p_property_graph_undirected_local_vm,