Skip to content

Commit

Permalink
Propogate error message about argument mismatch to Python client (#3044)
Browse files Browse the repository at this point in the history
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
sighingnow authored Jul 21, 2023
1 parent 5f6f20e commit 185790e
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 73 deletions.
8 changes: 5 additions & 3 deletions analytical_engine/core/grape_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,14 @@ bl::result<std::string> GrapeInstance::query(const rpc::GSParams& params,
app->Query(worker.get(), query_args, context_key, wrapper));
std::string context_type;
std::string context_schema;
if (ctx_wrapper != nullptr) {
if (ctx_wrapper == nullptr) {
RETURN_GS_ERROR(
vineyard::ErrorCode::kIllegalStateError,
"Query returns a null context wrapper without useful error message");
} else {
context_type = ctx_wrapper->context_type();
context_schema = ctx_wrapper->schema();
BOOST_LEAF_CHECK(object_manager_.PutObject(ctx_wrapper));
} else {
LOG(ERROR) << "Error occur when querying";
}
return toJson({{"context_type", context_type},
{"context_key", context_key},
Expand Down
26 changes: 12 additions & 14 deletions analytical_engine/frame/app_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,27 +82,25 @@ __attribute__((visibility("hidden"))) std::nullptr_t DeleteWorker(
return nullptr;
}

__attribute__((visibility("hidden"))) std::nullptr_t Query(
__attribute__((visibility("hidden"))) bl::result<std::nullptr_t> Query(
void* worker_handler, const gs::rpc::QueryArgs& query_args,
const std::string& context_key,
std::shared_ptr<gs::IFragmentWrapper> frag_wrapper,
std::shared_ptr<gs::IContextWrapper>& ctx_wrapper,
bl::result<nullptr_t>& wrapper_error) {
std::shared_ptr<gs::IContextWrapper>& ctx_wrapper) {
auto worker = static_cast<worker_handler_t*>(worker_handler)->worker;
auto result = gs::AppInvoker<_APP_TYPE>::Query(worker, query_args);
if (!result) {
wrapper_error = std::move(result);
return nullptr;
if (result) {
if (!context_key.empty()) {
auto ctx = worker->GetContext();
ctx_wrapper = gs::CtxWrapperBuilder<typename _APP_TYPE::context_t>::build(
context_key, frag_wrapper, ctx);
}
}

if (!context_key.empty()) {
auto ctx = worker->GetContext();
ctx_wrapper = gs::CtxWrapperBuilder<typename _APP_TYPE::context_t>::build(
context_key, frag_wrapper, ctx);
}
return nullptr;
return result;
}

} // namespace detail

extern "C" {
void* CreateWorker(const std::shared_ptr<void>& fragment,
const grape::CommSpec& comm_spec,
Expand All @@ -125,6 +123,6 @@ void Query(void* worker_handler, const gs::rpc::QueryArgs& query_args,
bl::result<nullptr_t>& wrapper_error) {
__FRAME_CATCH_AND_ASSIGN_GS_ERROR(
wrapper_error, detail::Query(worker_handler, query_args, context_key,
frag_wrapper, ctx_wrapper, wrapper_error));
frag_wrapper, ctx_wrapper));
}
} // extern "C"
24 changes: 10 additions & 14 deletions analytical_engine/frame/cython_app_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,25 +173,21 @@ __attribute__((visibility("hidden"))) std::nullptr_t DeleteWorker(
return nullptr;
}

__attribute__((visibility("hidden"))) std::nullptr_t Query(
__attribute__((visibility("hidden"))) bl::result<std::nullptr_t> Query(
void* worker_handler, const gs::rpc::QueryArgs& query_args,
const std::string& context_key,
std::shared_ptr<gs::IFragmentWrapper> frag_wrapper,
std::shared_ptr<gs::IContextWrapper>& ctx_wrapper,
bl::result<nullptr_t>& wrapper_error) {
std::shared_ptr<gs::IContextWrapper>& ctx_wrapper) {
auto worker = static_cast<worker_handler_t*>(worker_handler)->worker;
auto result = gs::AppInvoker<_APP_TYPE>::Query(worker, query_args);
if (!result) {
wrapper_error = std::move(result);
return nullptr;
if (result) {
if (!context_key.empty()) {
auto ctx = worker->GetContext();
ctx_wrapper = gs::CtxWrapperBuilder<typename _APP_TYPE::context_t>::build(
context_key, frag_wrapper, ctx);
}
}

if (!context_key.empty()) {
auto ctx = worker->GetContext();
ctx_wrapper = gs::CtxWrapperBuilder<typename _APP_TYPE::context_t>::build(
context_key, frag_wrapper, ctx);
}
return nullptr;
return result;
}

} // namespace detail
Expand All @@ -218,6 +214,6 @@ void Query(void* worker_handler, const gs::rpc::QueryArgs& query_args,
bl::result<nullptr_t>& wrapper_error) {
__FRAME_CATCH_AND_ASSIGN_GS_ERROR(
wrapper_error, detail::Query(worker_handler, query_args, context_key,
frag_wrapper, ctx_wrapper, wrapper_error));
frag_wrapper, ctx_wrapper));
}
} // extern "C"
24 changes: 10 additions & 14 deletions analytical_engine/frame/cython_pie_app_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,25 +161,21 @@ __attribute__((visibility("hidden"))) std::nullptr_t DeleteWorker(
return nullptr;
}

__attribute__((visibility("hidden"))) std::nullptr_t Query(
__attribute__((visibility("hidden"))) bl::result<std::nullptr_t> Query(
void* worker_handler, const gs::rpc::QueryArgs& query_args,
const std::string& context_key,
std::shared_ptr<gs::IFragmentWrapper> frag_wrapper,
std::shared_ptr<gs::IContextWrapper>& ctx_wrapper,
bl::result<nullptr_t>& wrapper_error) {
std::shared_ptr<gs::IContextWrapper>& ctx_wrapper) {
auto worker = static_cast<worker_handler_t*>(worker_handler)->worker;
auto result = gs::AppInvoker<_APP_TYPE>::Query(worker, query_args);
if (!result) {
wrapper_error = std::move(result);
return nullptr;
if (result) {
if (!context_key.empty()) {
auto ctx = worker->GetContext();
ctx_wrapper = gs::CtxWrapperBuilder<typename _APP_TYPE::context_t>::build(
context_key, frag_wrapper, ctx);
}
}

if (!context_key.empty()) {
auto ctx = worker->GetContext();
ctx_wrapper = gs::CtxWrapperBuilder<typename _APP_TYPE::context_t>::build(
context_key, frag_wrapper, ctx);
}
return nullptr;
return result;
}

} // namespace detail
Expand All @@ -206,7 +202,7 @@ void Query(void* worker_handler, const gs::rpc::QueryArgs& query_args,
bl::result<nullptr_t>& wrapper_error) {
__FRAME_CATCH_AND_ASSIGN_GS_ERROR(
wrapper_error, detail::Query(worker_handler, query_args, context_key,
frag_wrapper, ctx_wrapper, wrapper_error));
frag_wrapper, ctx_wrapper));
}

} // extern "C"
24 changes: 10 additions & 14 deletions analytical_engine/frame/flash_app_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,21 @@ __attribute__((visibility("hidden"))) std::nullptr_t DeleteWorker(
return nullptr;
}

__attribute__((visibility("hidden"))) std::nullptr_t Query(
__attribute__((visibility("hidden"))) bl::result<std::nullptr_t> Query(
void* worker_handler, const gs::rpc::QueryArgs& query_args,
const std::string& context_key,
std::shared_ptr<gs::IFragmentWrapper> frag_wrapper,
std::shared_ptr<gs::IContextWrapper>& ctx_wrapper,
bl::result<nullptr_t>& wrapper_error) {
std::shared_ptr<gs::IContextWrapper>& ctx_wrapper) {
auto worker = static_cast<worker_handler_t*>(worker_handler)->worker;
auto result = gs::FlashAppInvoker<_APP_TYPE>::Query(worker, query_args);
if (!result) {
wrapper_error = std::move(result);
return nullptr;
if (result) {
if (!context_key.empty()) {
auto ctx = worker->GetContext();
ctx_wrapper = gs::CtxWrapperBuilder<typename _APP_TYPE::context_t>::build(
context_key, frag_wrapper, ctx);
}
}

if (!context_key.empty()) {
auto ctx = worker->GetContext();
ctx_wrapper = gs::CtxWrapperBuilder<typename _APP_TYPE::context_t>::build(
context_key, frag_wrapper, ctx);
}
return nullptr;
return result;
}
} // namespace detail
extern "C" {
Expand All @@ -125,6 +121,6 @@ void Query(void* worker_handler, const gs::rpc::QueryArgs& query_args,
bl::result<nullptr_t>& wrapper_error) {
__FRAME_CATCH_AND_ASSIGN_GS_ERROR(
wrapper_error, detail::Query(worker_handler, query_args, context_key,
frag_wrapper, ctx_wrapper, wrapper_error));
frag_wrapper, ctx_wrapper));
}
} // extern "C"
18 changes: 10 additions & 8 deletions analytical_engine/test/run_app.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,16 @@ void Run() {
using AppType = grape::WCC<GraphType>;
CreateAndQuery<GraphType, AppType>(comm_spec, efile, vfile, out_prefix,
FLAGS_datasource, fnum, spec);
} else if (name == "wcc_opt") {
using GraphType =
grape::ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T,
grape::LoadStrategy::kBothOutIn,
VertexMapType>;
using AppType = grape::WCCOpt<GraphType>;
CreateAndQuery<GraphType, AppType>(comm_spec, efile, vfile, out_prefix,
FLAGS_datasource, fnum, spec);
// skip to help CI runners to pass the compilation without OOM.
//
// } else if (name == "wcc_opt") {
// using GraphType =
// grape::ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T,
// grape::LoadStrategy::kBothOutIn,
// VertexMapType>;
// using AppType = grape::WCCOpt<GraphType>;
// CreateAndQuery<GraphType, AppType>(comm_spec, efile, vfile, out_prefix,
// FLAGS_datasource, fnum, spec);
} else if (name == "lcc_auto") {
using GraphType =
grape::ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T,
Expand Down
1 change: 1 addition & 0 deletions python/graphscope/analytical/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from graphscope.analytical.app.clustering import avg_clustering
from graphscope.analytical.app.clustering import clustering
from graphscope.analytical.app.clustering import lcc
from graphscope.analytical.app.custom import custom_analytical_algorithm

# fmt: off
from graphscope.analytical.app.degree_assortativity_coefficient import \
Expand Down
73 changes: 73 additions & 0 deletions python/graphscope/analytical/app/custom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging

from graphscope.framework.app import AppAssets
from graphscope.framework.app import not_compatible_for
from graphscope.framework.app import project_to_simple

__all__ = ["custom_analytical_algorithm"]

logger = logging.getLogger("graphscope")


@project_to_simple
def custom_analytical_algorithm(
graph, algorithm, *args, context="vertex_data", cmake_extra_options=None, **kwargs
):
"""A special application DAG node to running arbitrary supported algorithms.
Note that this is only for debugging/profiling usage for developers and
the application should be defined in .gs_conf.yaml in coordinator.
Args:
graph (:class:`graphscope.Graph`): A simple graph.
algorithm (:code:`str`): A predefined algorithm name,
e.g., `sssp`, `wcc`, `lcc`, etc.
context (:code:`str`): The context type of the algorithm,
e.g., `vertex_data`, etc. Defaults to `vertex_data`.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
:class:`graphscope.framework.context.ContextDAGNode`:
A context, evaluated in eager mode.
Examples:
.. code:: python
>>> import graphscope
>>> from graphscope.dataset import load_p2p_network
>>> sess = graphscope.session(cluster_type="hosts", mode="eager")
>>>
>>> g = load_p2p_network(sess)
>>>
>>> # project to a simple graph (if needed)
>>> pg = g.project(vertices={"host": ["id"]}, edges={"connect": ["dist"]})
>>>
>>> c = graphscope.custom_analytical_algorithm(pg, 'wcc')
>>>
>>> c = graphscope.custom_analytical_algorithm(pg, 'sssp', 6)
>>>
>>> sess.close()
"""
return AppAssets(
algo=algorithm, context=context, cmake_extra_options=cmake_extra_options
)(graph, *args, **kwargs)
6 changes: 3 additions & 3 deletions python/graphscope/analytical/app/sssp.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ def sssp(graph, src=0, weight=None):
"""
if not isinstance(graph, GraphDAGNode):
if graph.schema.edata_type == graph_def_pb2.NULLVALUE:
raise RuntimeError(
raise ValueError(
"The edge data is empty, and the edge data type should be integers or "
"floating point numbers to run SSSP. Suggest to use bfs() algorithm."
"floating point numbers to run SSSP. Use the `bfs()` algorithm instead."
)
if graph.schema.edata_type == graph_def_pb2.STRING:
raise RuntimeError(
raise ValueError(
"The edge data type is string, and the edge data type should be "
"integers or floating point numbers to run SSSP."
)
Expand Down
2 changes: 1 addition & 1 deletion python/graphscope/analytical/app/wcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from graphscope.framework.app import not_compatible_for
from graphscope.framework.app import project_to_simple

__all__ = ["wcc", "wcc_auto", "wcc_projected"]
__all__ = ["wcc", "wcc_opt", "wcc_auto", "wcc_projected"]

logger = logging.getLogger("graphscope")

Expand Down
10 changes: 8 additions & 2 deletions python/graphscope/tests/unittest/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from graphscope import avg_clustering
from graphscope import bfs
from graphscope import clustering # directed / undirected would call different app
from graphscope import custom_analytical_algorithm
from graphscope import degree_centrality
from graphscope import eigenvector_centrality
from graphscope import hits
Expand Down Expand Up @@ -248,12 +249,17 @@ def test_run_app_on_string_oid_graph(p2p_project_undirected_graph_string):
r1 = ctx.to_dataframe({"node": "v.id", "r": "r"})


@pytest.mark.skipif("FULL_TEST_SUITE" not in os.environ, reason="Run in nightly CI")
def test_error_on_run_app(projected_pg_no_edge_data):
# compile error: wrong type of edge data with sssp
with pytest.raises(graphscope.CompilationError):
with pytest.raises(ValueError):
sssp(projected_pg_no_edge_data, src=4)

with pytest.raises(
graphscope.AnalyticalEngineInternalError,
match="args_num >= query_args.args_size()",
):
custom_analytical_algorithm(projected_pg_no_edge_data, "wcc", 1, 2, 3)


def test_app_on_local_vm_graph(
p2p_property_graph_undirected_local_vm,
Expand Down

0 comments on commit 185790e

Please sign in to comment.