Skip to content

Commit

Permalink
refactor(interactive): Introduce call_proc as a new physical operat…
Browse files Browse the repository at this point in the history
…or and implement it (#4288)

Currently have too kinds graph queries. One is adhoc query, the other is
`call_proc` query. We provide two different route and APIs for these two
kind of queries, but it is possible to merge `call_proc` query into
adhoc query, as an operator.

In this PR, we Introduce a `CALL_PROCEDURE` operator into `PhysicalPlan`
proto. So, we could remove the `/v1/graph/{id}/query` from compiler
side. However, we still need the API in server side, for SDK to use.

TODO
- [x] Revisit the dependency introduce by `procedure_call.cc`, including
get app_base, and get session. Ideally, we shoud not only involve
interfaces of ReadTransaction. -> Negative
- [x] Make sure the introduce of StdStringValue is necessary. ->
Removed.
- [x] Finish all if-else check in `procedure_call.cc`.
- [x] We need to call the procedure for each record. Could we optimize?
-> Negative


User now could embed the `call_proc` in a cypher query, for example
```cypher
MATCH(p: person) with p.id as oid CALL k_neighbors("person", oid, 1) return label_name, vertex_oid;
```
where `k_neighbors` is a builtin procedure.

Fixes #4278

---------

Co-authored-by: shirly121 <yihe.zxl@alibaba-inc.com>
Co-authored-by: BingqingLyu <bingqing.lbq@alibaba-inc.com>
  • Loading branch information
3 people authored Oct 22, 2024
1 parent 542cb16 commit 6aa557c
Show file tree
Hide file tree
Showing 37 changed files with 814 additions and 58 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ jobs:
INTERACTIVE_WORKSPACE: /tmp/interactive_workspace
run: |
cd ${GITHUB_WORKSPACE}/flex/tests/hqps
bash hqps_robust_test.sh ${INTERACTIVE_WORKSPACE} ./interactive_config_test.yaml
bash hqps_robust_test.sh ${INTERACTIVE_WORKSPACE} ./interactive_config_test.yaml ./interactive_config_test_cbo.yaml
- name: Sample Query test
env:
Expand Down
5 changes: 2 additions & 3 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,8 @@ void GraphDB::Close() {
std::fill(app_factories_.begin(), app_factories_.end(), nullptr);
}

ReadTransaction GraphDB::GetReadTransaction() {
uint32_t ts = version_manager_.acquire_read_timestamp();
return {graph_, version_manager_, ts};
ReadTransaction GraphDB::GetReadTransaction(int thread_id) {
return contexts_[thread_id].session.GetReadTransaction();
}

InsertTransaction GraphDB::GetInsertTransaction(int thread_id) {
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/graph_db/database/graph_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class GraphDB {
*
* @return graph_dir The directory of graph data.
*/
ReadTransaction GetReadTransaction();
ReadTransaction GetReadTransaction(int thread_id = 0);

/** @brief Create a transaction to insert vertices and edges with a default
* allocator.
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace gs {

ReadTransaction GraphDBSession::GetReadTransaction() const {
uint32_t ts = db_.version_manager_.acquire_read_timestamp();
return ReadTransaction(db_.graph_, db_.version_manager_, ts);
return ReadTransaction(*this, db_.graph_, db_.version_manager_, ts);
}

InsertTransaction GraphDBSession::GetInsertTransaction() {
Expand Down
7 changes: 5 additions & 2 deletions flex/engines/graph_db/database/read_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

namespace gs {

ReadTransaction::ReadTransaction(const MutablePropertyFragment& graph,
ReadTransaction::ReadTransaction(const GraphDBSession& session,
const MutablePropertyFragment& graph,
VersionManager& vm, timestamp_t timestamp)
: graph_(graph), vm_(vm), timestamp_(timestamp) {}
: session_(session), graph_(graph), vm_(vm), timestamp_(timestamp) {}
ReadTransaction::~ReadTransaction() { release(); }

timestamp_t ReadTransaction::timestamp() const { return timestamp_; }
Expand Down Expand Up @@ -135,4 +136,6 @@ void ReadTransaction::release() {
}
}

const GraphDBSession& ReadTransaction::GetSession() const { return session_; }

} // namespace gs
7 changes: 6 additions & 1 deletion flex/engines/graph_db/database/read_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
namespace gs {

class MutablePropertyFragment;
class GraphDBSession;
class VersionManager;
template <typename EDATA_T>
class AdjListView {
Expand Down Expand Up @@ -276,7 +277,8 @@ class SingleImmutableGraphView<std::string_view> {

class ReadTransaction {
public:
ReadTransaction(const MutablePropertyFragment& graph, VersionManager& vm,
ReadTransaction(const GraphDBSession& session,
const MutablePropertyFragment& graph, VersionManager& vm,
timestamp_t timestamp);
~ReadTransaction();

Expand Down Expand Up @@ -429,9 +431,12 @@ class ReadTransaction {
return SingleImmutableGraphView<EDATA_T>(*csr);
}

const GraphDBSession& GetSession() const;

private:
void release();

const GraphDBSession& session_;
const MutablePropertyFragment& graph_;
VersionManager& vm_;
timestamp_t timestamp_;
Expand Down
5 changes: 5 additions & 0 deletions flex/engines/graph_db/runtime/adhoc/operators/operators.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ bl::result<Context> eval_join(const physical::Join& opr, Context&& ctx,

bl::result<Context> eval_limit(const algebra::Limit& opr, Context&& ctx);

bl::result<Context> eval_procedure_call(const std::vector<int32_t>& alias,
const physical::ProcedureCall& opr,
const ReadTransaction& txn,
Context&& ctx);

void eval_sink(const Context& ctx, const ReadTransaction& txn, Encoder& output);

} // namespace runtime
Expand Down
Loading

0 comments on commit 6aa557c

Please sign in to comment.