Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(interactive): Template the runtime module #4293

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions flex/bin/adhoc_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,15 @@ void load_params(const std::string& filename,
}

gs::runtime::Context eval_plan(
const physical::PhysicalPlan& plan, gs::ReadTransaction& txn,
const physical::PhysicalPlan& plan,
gs::runtime::GraphInterface<gs::ReadTransaction>& graph_interface,
const std::map<std::string, std::string>& params) {
gs::runtime::Context ctx;

{
ctx = bl::try_handle_all(
[&plan, &txn, &params]() {
return gs::runtime::runtime_eval(plan, txn, params);
[&plan, &graph_interface, &params]() {
return gs::runtime::runtime_eval(plan, graph_interface, params);
},
[&ctx](const gs::Status& err) {
LOG(FATAL) << "Error in execution: " << err.error_message();
Expand Down Expand Up @@ -170,6 +172,7 @@ int main(int argc, char** argv) {
std::string req_file = vm["query-file"].as<std::string>();
std::string query = read_pb(req_file);
auto txn = db.GetReadTransaction();
gs::runtime::GraphInterface<gs::ReadTransaction> graph_interface(txn);
std::vector<std::map<std::string, std::string>> map;
load_params(vm["params_file"].as<std::string>(), map);
size_t params_num = map.size();
Expand All @@ -185,29 +188,29 @@ int main(int argc, char** argv) {
double t1 = -grape::GetCurrentTime();
for (int i = 0; i < query_num; ++i) {
auto& m = map[i % params_num];
auto ctx = eval_plan(pb, txn, m);
auto ctx = eval_plan(pb, graph_interface, m);
gs::Encoder output(outputs[i]);
gs::runtime::eval_sink(ctx, txn, output);
gs::runtime::eval_sink(ctx, graph_interface, output);
}
t1 += grape::GetCurrentTime();

double t2 = -grape::GetCurrentTime();
for (int i = 0; i < query_num; ++i) {
auto& m = map[i % params_num];
auto ctx = eval_plan(pb, txn, m);
auto ctx = eval_plan(pb, graph_interface, m);
outputs[i].clear();
gs::Encoder output(outputs[i]);
gs::runtime::eval_sink(ctx, txn, output);
gs::runtime::eval_sink(ctx, graph_interface, output);
}
t2 += grape::GetCurrentTime();

double t3 = -grape::GetCurrentTime();
for (int i = 0; i < query_num; ++i) {
auto& m = map[i % params_num];
auto ctx = eval_plan(pb, txn, m);
auto ctx = eval_plan(pb, graph_interface, m);
outputs[i].clear();
gs::Encoder output(outputs[i]);
gs::runtime::eval_sink(ctx, txn, output);
gs::runtime::eval_sink(ctx, graph_interface, output);
}
t3 += grape::GetCurrentTime();

Expand Down
4 changes: 2 additions & 2 deletions flex/engines/graph_db/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
add_subdirectory(runtime)
file(GLOB_RECURSE GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/app/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/database/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/app/builtin/*.cc")
Expand All @@ -7,7 +6,7 @@ add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES})

target_include_directories(flex_graph_db PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>)
target_link_libraries(flex_graph_db flex_rt_mutable_graph flex_utils ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(flex_graph_db flex_plan_proto runtime_adhoc)
target_link_libraries(flex_graph_db flex_plan_proto)
install_flex_target(flex_graph_db)

install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h
Expand All @@ -32,3 +31,4 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/adhoc_app.h
DESTINATION include/flex/engines/graph_db/app)



4 changes: 3 additions & 1 deletion flex/engines/graph_db/app/adhoc_app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/

#include "flex/engines/graph_db/app/adhoc_app.h"
#include "flex/engines/graph_db/runtime/adhoc/graph_interface.h"
#include "flex/engines/graph_db/runtime/adhoc/operators/operators.h"
#include "flex/engines/graph_db/runtime/adhoc/runtime.h"
#include "flex/proto_generated_gie/physical.pb.h"
Expand All @@ -26,7 +27,8 @@ namespace gs {

bool AdhocReadApp::Query(const GraphDBSession& graph, Decoder& input,
Encoder& output) {
auto txn = graph.GetReadTransaction();
auto read_txn = graph.GetReadTransaction();
gs::runtime::GraphInterface<ReadTransaction> txn(read_txn);

std::string_view plan_str = input.get_bytes();
physical::PhysicalPlan plan;
Expand Down
3 changes: 2 additions & 1 deletion flex/engines/graph_db/database/read_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class MutablePropertyFragment;
class VersionManager;
template <typename EDATA_T>
class AdjListView {
public:
class nbr_iterator {
using const_nbr_t = typename MutableNbrSlice<EDATA_T>::const_nbr_t;
using const_nbr_ptr_t = typename MutableNbrSlice<EDATA_T>::const_nbr_ptr_t;
Expand Down Expand Up @@ -68,7 +69,6 @@ class AdjListView {
timestamp_t timestamp_;
};

public:
using slice_t = MutableNbrSlice<EDATA_T>;

AdjListView(const slice_t& slice, timestamp_t timestamp)
Expand Down Expand Up @@ -276,6 +276,7 @@ class SingleImmutableGraphView<std::string_view> {

class ReadTransaction {
public:
using vertex_index_t = vid_t;
ReadTransaction(const MutablePropertyFragment& graph, VersionManager& vm,
timestamp_t timestamp);
~ReadTransaction();
Expand Down
13 changes: 1 addition & 12 deletions flex/engines/graph_db/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,16 +1,5 @@
file(GLOB_RECURSE COMMON_SOURCES "common/*.cc")
add_library(runtime_common SHARED ${COMMON_SOURCES})
target_link_libraries(runtime_common ${Boost_LIBRARIES} flex_utils flex_plan_proto)
install_flex_target(runtime_common)

file(GLOB_RECURSE ADHOC_SOURCES "adhoc/*.cc")
add_library(runtime_adhoc SHARED ${ADHOC_SOURCES})
target_link_libraries(runtime_adhoc runtime_common)
install_flex_target(runtime_adhoc)


install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
DESTINATION include/flex/engines/graph_db
DESTINATION include/flex/engines/graph_db/runtime
FILES_MATCHING
PATTERN "*.h"
)
58 changes: 0 additions & 58 deletions flex/engines/graph_db/runtime/adhoc/expr.cc

This file was deleted.

32 changes: 23 additions & 9 deletions flex/engines/graph_db/runtime/adhoc/expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,36 @@ namespace gs {

namespace runtime {

template <typename GRAPH_IMPL>
class Expr {
public:
Expr(const ReadTransaction& txn, const Context& ctx,
Expr(const GraphInterface<GRAPH_IMPL>& txn, const Context& ctx,
const std::map<std::string, std::string>& params,
const common::Expression& expr, VarType var_type);
const common::Expression& expr, VarType var_type) {
expr_ = parse_expression(txn, ctx, params, expr, var_type);
}

RTAny eval_path(size_t idx) const;
RTAny eval_vertex(label_t label, vid_t v, size_t idx) const;
RTAny eval_path(size_t idx) const {
RTAny ret = expr_->eval_path(idx);
return ret;
}
RTAny eval_vertex(label_t label, vid_t v, size_t idx) const {
return expr_->eval_vertex(label, v, idx);
}
RTAny eval_edge(const LabelTriplet& label, vid_t src, vid_t dst,
const Any& data, size_t idx) const;
RTAny eval_path(size_t idx, int) const;
RTAny eval_vertex(label_t label, vid_t v, size_t idx, int) const;
const Any& data, size_t idx) const {
return expr_->eval_edge(label, src, dst, data, idx);
}
RTAny eval_path(size_t idx, int) const { return expr_->eval_path(idx, 0); }
RTAny eval_vertex(label_t label, vid_t v, size_t idx, int) const {
return expr_->eval_vertex(label, v, idx, 0);
}
RTAny eval_edge(const LabelTriplet& label, vid_t src, vid_t dst,
const Any& data, size_t idx, int) const;
const Any& data, size_t idx, int) const {
return expr_->eval_edge(label, src, dst, data, idx, 0);
}

RTAnyType type() const;
RTAnyType type() const { return expr_->type(); }

std::shared_ptr<IContextColumnBuilder> builder() const {
return expr_->builder();
Expand Down
Loading
Loading