From 11e0bbf56ac57c67b5c1a873b0d46ab0cd86cb46 Mon Sep 17 00:00:00 2001 From: ziyi chen Date: Sun, 2 Jul 2023 22:07:52 -0400 Subject: [PATCH] Profile pipeline refactor --- src/binder/bind/bind_explain.cpp | 3 +- src/include/binder/bound_explain.h | 8 +++- src/include/common/types/types.h | 5 ++ src/include/main/plan_printer.h | 5 +- src/include/main/prepared_statement.h | 10 +++- src/include/main/query_summary.h | 22 ++------- src/include/parser/explain_statement.h | 10 ++-- .../logical_operator/logical_explain.h | 26 +++++++--- src/include/processor/mapper/plan_mapper.h | 4 ++ .../processor/operator/physical_operator.h | 1 + src/include/processor/operator/profile.h | 46 ++++++++++++++++++ src/main/connection.cpp | 12 ++--- src/main/plan_printer.cpp | 4 -- src/main/prepared_statement.cpp | 9 +++- src/main/query_summary.cpp | 22 ++------- src/parser/transformer.cpp | 9 ++-- src/planner/operator/logical_explain.cpp | 17 ++++++- src/planner/planner.cpp | 5 +- src/processor/mapper/map_explain.cpp | 47 +++++++++++++------ src/processor/mapper/plan_mapper.cpp | 21 ++++++--- src/processor/operator/CMakeLists.txt | 1 + src/processor/operator/physical_operator.cpp | 3 ++ src/processor/operator/profile.cpp | 35 ++++++++++++++ src/processor/processor.cpp | 3 +- test/c_api/query_result_test.cpp | 2 + test/include/test_runner/test_parser.h | 1 + test/main/csv_output_test.cpp | 1 + test/test_files/tinysnb/explain/explain.test | 16 +++++++ test/test_runner/test_runner.cpp | 2 + tools/benchmark/benchmark.cpp | 14 +----- tools/shell/embedded_shell.cpp | 9 ---- 31 files changed, 251 insertions(+), 122 deletions(-) create mode 100644 src/include/processor/operator/profile.h create mode 100644 src/processor/operator/profile.cpp diff --git a/src/binder/bind/bind_explain.cpp b/src/binder/bind/bind_explain.cpp index cd7b09822bf..a45aea2302c 100644 --- a/src/binder/bind/bind_explain.cpp +++ b/src/binder/bind/bind_explain.cpp @@ -8,7 +8,8 @@ namespace binder { std::unique_ptr Binder::bindExplain(const parser::Statement& statement) { auto& explainStatement = (parser::ExplainStatement&)statement; auto boundStatementToExplain = bind(*explainStatement.getStatementToExplain()); - return std::make_unique(std::move(boundStatementToExplain)); + return std::make_unique( + std::move(boundStatementToExplain), explainStatement.getExplainType()); } } // namespace binder diff --git a/src/include/binder/bound_explain.h b/src/include/binder/bound_explain.h index ee815d90f58..fd60a97fce6 100644 --- a/src/include/binder/bound_explain.h +++ b/src/include/binder/bound_explain.h @@ -7,15 +7,19 @@ namespace binder { class BoundExplain : public BoundStatement { public: - explicit BoundExplain(std::unique_ptr statementToExplain) + explicit BoundExplain( + std::unique_ptr statementToExplain, common::ExplainType explainType) : BoundStatement{common::StatementType::EXPLAIN, BoundStatementResult::createSingleStringColumnResult()}, - statementToExplain{std::move(statementToExplain)} {} + statementToExplain{std::move(statementToExplain)}, explainType{explainType} {} inline BoundStatement* getStatementToExplain() const { return statementToExplain.get(); } + inline common::ExplainType getExplainType() const { return explainType; } + private: std::unique_ptr statementToExplain; + common::ExplainType explainType; }; } // namespace binder diff --git a/src/include/common/types/types.h b/src/include/common/types/types.h index 37c3b34bd47..3cf427b5a43 100644 --- a/src/include/common/types/types.h +++ b/src/include/common/types/types.h @@ -37,6 +37,11 @@ constexpr struct_field_idx_t INVALID_STRUCT_FIELD_IDX = UINT64_MAX; using tuple_idx_t = uint64_t; constexpr uint32_t UNDEFINED_CAST_COST = UINT32_MAX; +enum class ExplainType : uint8_t { + PROFILE = 0, + PHYSICAL_PLAN = 1, +}; + // System representation for a variable-sized overflow value. struct overflow_value_t { // the size of the overflow buffer can be calculated as: diff --git a/src/include/main/plan_printer.h b/src/include/main/plan_printer.h index 7e828fbaccf..61e065d3022 100644 --- a/src/include/main/plan_printer.h +++ b/src/include/main/plan_printer.h @@ -88,7 +88,8 @@ class OpProfileTree { class PlanPrinter { public: - PlanPrinter(processor::PhysicalPlan* physicalPlan, std::unique_ptr profiler); + PlanPrinter(processor::PhysicalPlan* physicalPlan, common::Profiler* profiler) + : physicalPlan{physicalPlan}, profiler{profiler} {}; nlohmann::json printPlanToJson(); @@ -104,7 +105,7 @@ class PlanPrinter { private: processor::PhysicalPlan* physicalPlan; - std::unique_ptr profiler; + common::Profiler* profiler; }; } // namespace main diff --git a/src/include/main/prepared_statement.h b/src/include/main/prepared_statement.h index ce834397033..ddf86287de6 100644 --- a/src/include/main/prepared_statement.h +++ b/src/include/main/prepared_statement.h @@ -1,5 +1,10 @@ #pragma once +#include +#include +#include +#include + #include "common/api.h" #include "kuzu_fwd.h" #include "query_summary.h" @@ -43,10 +48,11 @@ class PreparedStatement { inline std::unordered_map> getParameterMap() { return parameterMap; } - ~PreparedStatement(); private: - common::StatementType statementType; + bool isProfile(); + +private: bool success = true; bool readOnly = false; std::string errMsg; diff --git a/src/include/main/query_summary.h b/src/include/main/query_summary.h index b52b0de4a39..d84cc7de11b 100644 --- a/src/include/main/query_summary.h +++ b/src/include/main/query_summary.h @@ -1,9 +1,7 @@ #pragma once #include "common/api.h" -#include "json_fwd.hpp" #include "kuzu_fwd.h" -#include "plan_printer.h" namespace kuzu { namespace main { @@ -13,9 +11,7 @@ namespace main { */ struct PreparedSummary { double compilingTime = 0; - // Only used for printing by shell. - bool isExplain = false; - bool isProfile = false; + common::StatementType statementType; }; /** @@ -34,29 +30,17 @@ class QuerySummary { * @return query execution time in milliseconds. */ KUZU_API double getExecutionTime() const; - bool getIsProfile() const; - std::ostringstream& getPlanAsOstream(); - /** - * @return physical plan for query in string format. - */ - KUZU_API std::string getPlan(); + void setPreparedSummary(PreparedSummary preparedSummary_); /** * @return true if the query is executed with EXPLAIN. */ - inline bool isExplain() const { return preparedSummary.isExplain; } - -private: - nlohmann::json& printPlanToJson(); + bool isExplain() const; private: double executionTime = 0; PreparedSummary preparedSummary; - // Remove these two field once we have refactored the profiler using the existing pipeline - // design. - std::unique_ptr planInJson; - std::ostringstream planInOstream; }; } // namespace main diff --git a/src/include/parser/explain_statement.h b/src/include/parser/explain_statement.h index d7b1bd7b9c2..39f3414d5ed 100644 --- a/src/include/parser/explain_statement.h +++ b/src/include/parser/explain_statement.h @@ -7,14 +7,18 @@ namespace parser { class ExplainStatement : public Statement { public: - explicit ExplainStatement(std::unique_ptr statementToExplain) - : Statement{common::StatementType::EXPLAIN}, statementToExplain{ - std::move(statementToExplain)} {} + explicit ExplainStatement( + std::unique_ptr statementToExplain, common::ExplainType explainType) + : Statement{common::StatementType::EXPLAIN}, + statementToExplain{std::move(statementToExplain)}, explainType{explainType} {} inline Statement* getStatementToExplain() const { return statementToExplain.get(); } + inline common::ExplainType getExplainType() const { return explainType; } + private: std::unique_ptr statementToExplain; + common::ExplainType explainType; }; } // namespace parser diff --git a/src/include/planner/logical_plan/logical_operator/logical_explain.h b/src/include/planner/logical_plan/logical_operator/logical_explain.h index 38b1b09a7ec..ce84efda3b5 100644 --- a/src/include/planner/logical_plan/logical_operator/logical_explain.h +++ b/src/include/planner/logical_plan/logical_operator/logical_explain.h @@ -1,6 +1,7 @@ #pragma once #include "base_logical_operator.h" +#include "parser/explain_statement.h" namespace kuzu { namespace planner { @@ -8,10 +9,14 @@ namespace planner { class LogicalExplain : public LogicalOperator { public: LogicalExplain(std::shared_ptr child, - std::shared_ptr outputExpression) - : LogicalOperator{LogicalOperatorType::EXPLAIN, child}, outputExpression{ - std::move(outputExpression)} {} - + std::shared_ptr outputExpression, common::ExplainType explainType, + binder::expression_vector outputExpressionsToExplain) + : LogicalOperator{LogicalOperatorType::EXPLAIN, child}, outputExpression{std::move( + outputExpression)}, + explainType{explainType}, outputExpressionsToExplain{ + std::move(outputExpressionsToExplain)} {} + + void computeSchema(); void computeFactorizedSchema() override; void computeFlatSchema() override; @@ -21,12 +26,21 @@ class LogicalExplain : public LogicalOperator { inline std::string getExpressionsForPrinting() const override { return "Explain"; } + inline common::ExplainType getExplainType() const { return explainType; } + + inline binder::expression_vector getOutputExpressionsToExplain() const { + return outputExpressionsToExplain; + } + inline std::unique_ptr copy() override { - return std::make_unique(children[0], outputExpression); + return std::make_unique( + children[0], outputExpression, explainType, outputExpressionsToExplain); } -protected: +private: std::shared_ptr outputExpression; + common::ExplainType explainType; + binder::expression_vector outputExpressionsToExplain; }; } // namespace planner diff --git a/src/include/processor/mapper/plan_mapper.h b/src/include/processor/mapper/plan_mapper.h index 45e78768e4a..ed699fa37ea 100644 --- a/src/include/processor/mapper/plan_mapper.h +++ b/src/include/processor/mapper/plan_mapper.h @@ -139,6 +139,10 @@ class PlanMapper { static std::vector getExpressionsDataPos( const binder::expression_vector& expressions, const planner::Schema& schema); + std::unique_ptr appendResultCollectorIfNotCopy( + std::unique_ptr lastOperator, + binder::expression_vector expressionsToCollect, planner::Schema* schema); + public: storage::StorageManager& storageManager; storage::MemoryManager* memoryManager; diff --git a/src/include/processor/operator/physical_operator.h b/src/include/processor/operator/physical_operator.h index 382754b1594..d022f0ccce4 100644 --- a/src/include/processor/operator/physical_operator.h +++ b/src/include/processor/operator/physical_operator.h @@ -41,6 +41,7 @@ enum class PhysicalOperatorType : uint8_t { MULTIPLICITY_REDUCER, PATH_PROPERTY_PROBE, PROJECTION, + PROFILE, RECURSIVE_JOIN, RENAME_PROPERTY, RENAME_TABLE, diff --git a/src/include/processor/operator/profile.h b/src/include/processor/operator/profile.h new file mode 100644 index 00000000000..160f438479e --- /dev/null +++ b/src/include/processor/operator/profile.h @@ -0,0 +1,46 @@ +#pragma once + +#include "processor/operator/physical_operator.h" + +namespace kuzu { +namespace processor { + +struct ProfileSharedState { + std::unique_ptr plan; + + explicit ProfileSharedState(std::unique_ptr plan); +}; + +struct ProfileLocalState { + common::ValueVector* outputVector; + bool hasExecuted = false; +}; + +class Profile : public PhysicalOperator { +public: + Profile(DataPos outputPos, std::shared_ptr sharedState, + ProfileLocalState localState, uint32_t id, const std::string& paramsString, + std::unique_ptr child) + : PhysicalOperator{PhysicalOperatorType::PROFILE, std::move(child), id, paramsString}, + outputPos{outputPos}, sharedState{std::move(sharedState)}, localState{ + std::move(localState)} {} + + inline bool isSource() const override { return true; } + + void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override; + + bool getNextTuplesInternal(ExecutionContext* context) override; + + std::unique_ptr clone() override { + return std::make_unique( + outputPos, sharedState, localState, id, paramsString, children[0]->clone()); + } + +private: + DataPos outputPos; + std::shared_ptr sharedState; + ProfileLocalState localState; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/main/connection.cpp b/src/main/connection.cpp index 85aa6c0ed3b..7220c4d4f6c 100644 --- a/src/main/connection.cpp +++ b/src/main/connection.cpp @@ -6,6 +6,7 @@ #include "main/database.h" #include "main/plan_printer.h" #include "optimizer/optimizer.h" +#include "parser/explain_statement.h" #include "parser/parser.h" #include "planner/logical_plan/logical_plan_util.h" #include "planner/planner.h" @@ -160,13 +161,10 @@ std::unique_ptr Connection::prepareNoLock( try { // parsing auto statement = Parser::parseQuery(query); - preparedStatement->preparedSummary.isProfile = statement->isProfile(); - preparedStatement->preparedSummary.isExplain = - statement->getStatementType() == StatementType::EXPLAIN; // binding auto binder = Binder(*database->catalog); auto boundStatement = binder.bind(*statement); - preparedStatement->statementType = boundStatement->getStatementType(); + preparedStatement->preparedSummary.statementType = boundStatement->getStatementType(); preparedStatement->readOnly = binder::StatementReadWriteAnalyzer().isReadOnly(*boundStatement); preparedStatement->parameterMap = binder.getParameterMap(); @@ -355,7 +353,7 @@ std::unique_ptr Connection::executeAndAutoCommitIfNecessaryNoLock( auto executionContext = std::make_unique(clientContext->numThreadsForExecution, profiler.get(), database->memoryManager.get(), database->bufferManager.get(), clientContext.get()); - profiler->enabled = preparedStatement->preparedSummary.isProfile; + profiler->enabled = preparedStatement->isProfile(); auto executingTimer = TimeMetric(true /* enable */); executingTimer.start(); std::shared_ptr resultFT; @@ -372,10 +370,6 @@ std::unique_ptr Connection::executeAndAutoCommitIfNecessaryNoLock( queryResult->initResultTableAndIterator(std::move(resultFT), preparedStatement->statementResult->getColumns(), preparedStatement->statementResult->getExpressionsToCollectPerColumn()); - auto planPrinter = std::make_unique(physicalPlan.get(), std::move(profiler)); - queryResult->querySummary->planInJson = - std::make_unique(planPrinter->printPlanToJson()); - queryResult->querySummary->planInOstream = planPrinter->printPlanToOstream(); return queryResult; } diff --git a/src/main/plan_printer.cpp b/src/main/plan_printer.cpp index ed4ba459cf6..8290ffe9fc1 100644 --- a/src/main/plan_printer.cpp +++ b/src/main/plan_printer.cpp @@ -297,10 +297,6 @@ uint32_t OpProfileTree::calculateRowHeight(uint32_t rowIdx) const { return height + 2; } -PlanPrinter::PlanPrinter( - processor::PhysicalPlan* physicalPlan, std::unique_ptr profiler) - : physicalPlan{physicalPlan}, profiler{std::move(profiler)} {} - nlohmann::json PlanPrinter::printPlanToJson() { return toJson(physicalPlan->lastOperator.get(), *profiler); } diff --git a/src/main/prepared_statement.cpp b/src/main/prepared_statement.cpp index 441ec445909..22e781d4d00 100644 --- a/src/main/prepared_statement.cpp +++ b/src/main/prepared_statement.cpp @@ -2,13 +2,14 @@ #include "binder/bound_statement_result.h" #include "common/statement_type.h" +#include "planner/logical_plan/logical_operator/logical_explain.h" #include "planner/logical_plan/logical_plan.h" namespace kuzu { namespace main { bool PreparedStatement::allowActiveTransaction() const { - return !common::StatementTypeUtils::isDDLOrCopyCSV(statementType); + return !common::StatementTypeUtils::isDDLOrCopyCSV(preparedSummary.statementType); } bool PreparedStatement::isSuccess() const { @@ -27,7 +28,11 @@ binder::expression_vector PreparedStatement::getExpressionsToCollect() { return statementResult->getExpressionsToCollect(); } -PreparedStatement::~PreparedStatement() = default; +bool PreparedStatement::isProfile() { + return preparedSummary.statementType == common::StatementType::EXPLAIN && + reinterpret_cast(logicalPlans[0]->getLastOperator().get()) + ->getExplainType() == common::ExplainType::PROFILE; +} } // namespace main } // namespace kuzu diff --git a/src/main/query_summary.cpp b/src/main/query_summary.cpp index bf92a236f95..40246b2dd74 100644 --- a/src/main/query_summary.cpp +++ b/src/main/query_summary.cpp @@ -1,6 +1,6 @@ #include "main/query_summary.h" -#include "json.hpp" +#include "common/statement_type.h" namespace kuzu { namespace main { @@ -13,25 +13,13 @@ double QuerySummary::getExecutionTime() const { return executionTime; } -bool QuerySummary::getIsProfile() const { - return preparedSummary.isProfile; -} - -std::ostringstream& QuerySummary::getPlanAsOstream() { - return planInOstream; -} - -nlohmann::json& QuerySummary::printPlanToJson() { - return *planInJson; -} - -std::string QuerySummary::getPlan() { - return planInOstream.str(); -} - void QuerySummary::setPreparedSummary(PreparedSummary preparedSummary_) { preparedSummary = preparedSummary_; } +bool QuerySummary::isExplain() const { + return preparedSummary.statementType == common::StatementType::EXPLAIN; +} + } // namespace main } // namespace kuzu diff --git a/src/parser/transformer.cpp b/src/parser/transformer.cpp index 438b3e1146e..e8c3fce7811 100644 --- a/src/parser/transformer.cpp +++ b/src/parser/transformer.cpp @@ -32,12 +32,9 @@ std::unique_ptr Transformer::transform() { auto statement = transformOcStatement(*root.oC_Statement()); if (root.oC_AnyCypherOption()) { auto cypherOption = root.oC_AnyCypherOption(); - if (cypherOption->oC_Explain()) { - return std::make_unique(std::move(statement)); - } - if (cypherOption->oC_Profile()) { - statement->enableProfile(); - } + auto explainType = cypherOption->oC_Explain() ? common::ExplainType::PHYSICAL_PLAN : + common::ExplainType::PROFILE; + return std::make_unique(std::move(statement), explainType); } return statement; } diff --git a/src/planner/operator/logical_explain.cpp b/src/planner/operator/logical_explain.cpp index 2f303ccccfd..bc338483b5d 100644 --- a/src/planner/operator/logical_explain.cpp +++ b/src/planner/operator/logical_explain.cpp @@ -3,14 +3,27 @@ namespace kuzu { namespace planner { +void LogicalExplain::computeSchema() { + switch (explainType) { + case common::ExplainType::PROFILE: + copyChildSchema(0); + break; + case common::ExplainType::PHYSICAL_PLAN: + createEmptySchema(); + break; + default: + throw common::NotImplementedException{"LogicalExplain::computeFlatSchema"}; + } +} + void LogicalExplain::computeFlatSchema() { - createEmptySchema(); + computeSchema(); schema->createGroup(); schema->insertToGroupAndScope(outputExpression, 0); } void LogicalExplain::computeFactorizedSchema() { - createEmptySchema(); + computeSchema(); auto groupPos = schema->createGroup(); schema->insertToGroupAndScope(outputExpression, groupPos); schema->setGroupAsSingleState(groupPos); diff --git a/src/planner/planner.cpp b/src/planner/planner.cpp index 3df81a66803..4515812be6f 100644 --- a/src/planner/planner.cpp +++ b/src/planner/planner.cpp @@ -200,8 +200,9 @@ std::unique_ptr Planner::planExplain(const Catalog& catalog, auto& explain = reinterpret_cast(statement); auto statementToExplain = explain.getStatementToExplain(); auto plan = getBestPlan(catalog, nodesStatistics, relsStatistics, *statementToExplain); - auto logicalExplain = make_shared( - plan->getLastOperator(), statement.getStatementResult()->getSingleExpressionToCollect()); + auto logicalExplain = make_shared(plan->getLastOperator(), + statement.getStatementResult()->getSingleExpressionToCollect(), explain.getExplainType(), + explain.getStatementToExplain()->getStatementResult()->getExpressionsToCollect()); plan->setLastOperator(std::move(logicalExplain)); return plan; } diff --git a/src/processor/mapper/map_explain.cpp b/src/processor/mapper/map_explain.cpp index 980e7b1ed45..99d7235518b 100644 --- a/src/processor/mapper/map_explain.cpp +++ b/src/processor/mapper/map_explain.cpp @@ -2,6 +2,7 @@ #include "main/plan_printer.h" #include "planner/logical_plan/logical_operator/logical_explain.h" #include "processor/mapper/plan_mapper.h" +#include "processor/operator/profile.h" #include "processor/operator/table_scan/factorized_table_scan.h" using namespace kuzu::planner; @@ -9,24 +10,40 @@ using namespace kuzu::planner; namespace kuzu { namespace processor { +static DataPos getOutputPos(LogicalExplain* logicalExplain) { + auto outSchema = logicalExplain->getSchema(); + auto outputExpression = logicalExplain->getOutputExpression(); + return DataPos(outSchema->getExpressionPos(*outputExpression)); +} + std::unique_ptr PlanMapper::mapLogicalExplainToPhysical( planner::LogicalOperator* logicalOperator) { auto logicalExplain = (LogicalExplain*)logicalOperator; - auto lastPhysicalOP = mapLogicalOperatorToPhysical(logicalExplain->getChild(0)); - auto physicalPlan = std::make_unique(std::move(lastPhysicalOP)); - auto planPrinter = std::make_unique( - physicalPlan.get(), std::make_unique()); - auto planInString = planPrinter->printPlanToOstream().str(); - auto factorizedTable = - FactorizedTableUtils::getFactorizedTableForOutputMsg(planInString, memoryManager); - auto ftSharedState = std::make_shared( - std::move(factorizedTable), common::DEFAULT_VECTOR_CAPACITY); - auto outSchema = logicalExplain->getSchema(); - auto outputExpression = logicalExplain->getOutputExpression(); - auto outputVectorPos = DataPos(outSchema->getExpressionPos(*outputExpression)); - return std::make_unique(std::vector{outputVectorPos}, - std::vector{0} /* colIndicesToScan */, ftSharedState, getOperatorID(), - logicalExplain->getExpressionsForPrinting()); + auto lastLogicalOP = logicalExplain->getChild(0); + auto lastPhysicalOP = mapLogicalOperatorToPhysical(lastLogicalOP); + lastPhysicalOP = appendResultCollectorIfNotCopy(std::move(lastPhysicalOP), + logicalExplain->getOutputExpressionsToExplain(), lastLogicalOP->getSchema()); + auto outputVectorPos = getOutputPos(logicalExplain); + auto physicalPlanToExplain = std::make_unique(std::move(lastPhysicalOP)); + if (logicalExplain->getExplainType() == common::ExplainType::PROFILE) { + auto childOP = physicalPlanToExplain->lastOperator->clone(); + auto sharedState = std::make_shared(std::move(physicalPlanToExplain)); + return std::make_unique(outputVectorPos, std::move(sharedState), + ProfileLocalState{}, getOperatorID(), logicalExplain->getExpressionsForPrinting(), + std::move(childOP)); + } else { + auto profiler = std::make_unique(); + auto planPrinter = + std::make_unique(physicalPlanToExplain.get(), profiler.get()); + auto explainStr = planPrinter->printPlanToOstream().str(); + auto factorizedTable = + FactorizedTableUtils::getFactorizedTableForOutputMsg(explainStr, memoryManager); + auto ftSharedState = std::make_shared( + std::move(factorizedTable), common::DEFAULT_VECTOR_CAPACITY); + return std::make_unique(std::vector{outputVectorPos}, + std::vector{0} /* colIndicesToScan */, ftSharedState, getOperatorID(), + logicalExplain->getExpressionsForPrinting()); + } } } // namespace processor diff --git a/src/processor/mapper/plan_mapper.cpp b/src/processor/mapper/plan_mapper.cpp index 31af2055708..720f4473f85 100644 --- a/src/processor/mapper/plan_mapper.cpp +++ b/src/processor/mapper/plan_mapper.cpp @@ -14,13 +14,8 @@ namespace processor { std::unique_ptr PlanMapper::mapLogicalPlanToPhysical( LogicalPlan* logicalPlan, const binder::expression_vector& expressionsToCollect) { auto lastOperator = mapLogicalOperatorToPhysical(logicalPlan->getLastOperator()); - // We have a special code path for executing copy rel and copy npy, so we don't need to append - // the resultCollector. - if (lastOperator->getOperatorType() != PhysicalOperatorType::COPY_REL && - lastOperator->getOperatorType() != PhysicalOperatorType::COPY_NPY) { - lastOperator = appendResultCollector( - expressionsToCollect, logicalPlan->getSchema(), std::move(lastOperator)); - } + lastOperator = appendResultCollectorIfNotCopy( + std::move(lastOperator), expressionsToCollect, logicalPlan->getSchema()); return make_unique(std::move(lastOperator)); } @@ -198,5 +193,17 @@ std::vector PlanMapper::getExpressionsDataPos( return result; } +std::unique_ptr PlanMapper::appendResultCollectorIfNotCopy( + std::unique_ptr lastOperator, binder::expression_vector expressionsToCollect, + Schema* schema) { + // We have a special code path for executing copy rel and copy npy, so we don't need to append + // the resultCollector. + if (lastOperator->getOperatorType() != PhysicalOperatorType::COPY_REL && + lastOperator->getOperatorType() != PhysicalOperatorType::COPY_NPY) { + lastOperator = appendResultCollector(expressionsToCollect, schema, std::move(lastOperator)); + } + return lastOperator; +} + } // namespace processor } // namespace kuzu diff --git a/src/processor/operator/CMakeLists.txt b/src/processor/operator/CMakeLists.txt index f9f2eb2e3ec..7efadf51bbe 100644 --- a/src/processor/operator/CMakeLists.txt +++ b/src/processor/operator/CMakeLists.txt @@ -21,6 +21,7 @@ add_library(kuzu_processor_operator multiplicity_reducer.cpp physical_operator.cpp projection.cpp + profile.cpp result_collector.cpp scan_node_id.cpp semi_masker.cpp diff --git a/src/processor/operator/physical_operator.cpp b/src/processor/operator/physical_operator.cpp index 13d3038d897..316309ce51d 100644 --- a/src/processor/operator/physical_operator.cpp +++ b/src/processor/operator/physical_operator.cpp @@ -173,6 +173,9 @@ std::string PhysicalOperatorUtils::operatorTypeToString(PhysicalOperatorType ope case PhysicalOperatorType::IN_QUERY_CALL: { return "IN_QUERY_CALL"; } + case PhysicalOperatorType::PROFILE: { + return "PROFILE"; + } default: throw common::NotImplementedException("physicalOperatorTypeToString()"); } diff --git a/src/processor/operator/profile.cpp b/src/processor/operator/profile.cpp new file mode 100644 index 00000000000..a6d48e781f3 --- /dev/null +++ b/src/processor/operator/profile.cpp @@ -0,0 +1,35 @@ +#include "processor/operator/profile.h" + +#include "main/plan_printer.h" +#include "processor/physical_plan.h" + +namespace kuzu { +namespace processor { + +ProfileSharedState::ProfileSharedState(std::unique_ptr plan) + : plan{std::move(plan)} {} + +void Profile::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { + localState.outputVector = resultSet->getValueVector(outputPos).get(); +} + +bool Profile::getNextTuplesInternal(ExecutionContext* context) { + if (localState.hasExecuted) { + return false; + } + localState.hasExecuted = true; + common::ku_string_t profileStr; + auto planPrinter = + std::make_unique(sharedState->plan.get(), context->profiler); + auto planInString = planPrinter->printPlanToOstream().str(); + common::StringVector::addString( + localState.outputVector, profileStr, planInString.c_str(), planInString.length()); + auto selVector = localState.outputVector->state->selVector; + selVector->selectedSize = 1; + localState.outputVector->setValue( + selVector->selectedPositions[0], profileStr); + return true; +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/processor.cpp b/src/processor/processor.cpp index 6997723c8e6..32cbb0a6867 100644 --- a/src/processor/processor.cpp +++ b/src/processor/processor.cpp @@ -78,7 +78,8 @@ void QueryProcessor::decomposePlanIntoTasks( case PhysicalOperatorType::CREATE_REL: case PhysicalOperatorType::DELETE_NODE: case PhysicalOperatorType::DELETE_REL: - case PhysicalOperatorType::STANDALONE_CALL: { + case PhysicalOperatorType::STANDALONE_CALL: + case PhysicalOperatorType::PROFILE: { parentTask->setSingleThreadedTask(); } break; default: diff --git a/test/c_api/query_result_test.cpp b/test/c_api/query_result_test.cpp index 06ca3ada59e..d829b3468e3 100644 --- a/test/c_api/query_result_test.cpp +++ b/test/c_api/query_result_test.cpp @@ -1,3 +1,5 @@ +#include + #include "c_api_test/c_api_test.h" using namespace kuzu::main; diff --git a/test/include/test_runner/test_parser.h b/test/include/test_runner/test_parser.h index ecc5122ac42..8e6f6d4a311 100644 --- a/test/include/test_runner/test_parser.h +++ b/test/include/test_runner/test_parser.h @@ -1,4 +1,5 @@ #include +#include #include #include "common/file_utils.h" diff --git a/test/main/csv_output_test.cpp b/test/main/csv_output_test.cpp index 2cf0993a364..a47bf5ef524 100644 --- a/test/main/csv_output_test.cpp +++ b/test/main/csv_output_test.cpp @@ -1,3 +1,4 @@ +#include #include #include "main_test_helper/main_test_helper.h" diff --git a/test/test_files/tinysnb/explain/explain.test b/test/test_files/tinysnb/explain/explain.test index 74c993da587..e8ce06b07e8 100644 --- a/test/test_files/tinysnb/explain/explain.test +++ b/test/test_files/tinysnb/explain/explain.test @@ -22,3 +22,19 @@ -LOG ExplainQuery -STATEMENT EXPLAIN MATCH (p:npytable) RETURN p.id ---- ok + +-LOG ProfileDDL +-STATEMENT Profile create node table npytable1 (id INT64,i32 INT32, PRIMARY KEY(id)); +---- ok + +-LOG ProfileCopy +-STATEMENT Profile copy npytable1 from ("${KUZU_ROOT_DIRECTORY}/dataset/npy-20k/id_int64.npy", "${KUZU_ROOT_DIRECTORY}/dataset/npy-20k/two_dim_float.npy") BY COLUMN; +---- ok + +-LOG ProfileCall +-STATEMENT Profile CALL threads=5 +---- ok + +-LOG ProfileQuery +-STATEMENT Profile MATCH (p:npytable) RETURN p.id +---- ok diff --git a/test/test_runner/test_runner.cpp b/test/test_runner/test_runner.cpp index 284489effde..4c2f268cdf0 100644 --- a/test/test_runner/test_runner.cpp +++ b/test/test_runner/test_runner.cpp @@ -1,5 +1,7 @@ #include "test_runner/test_runner.h" +#include + #include "common/string_utils.h" #include "spdlog/spdlog.h" diff --git a/tools/benchmark/benchmark.cpp b/tools/benchmark/benchmark.cpp index cc78050a8e2..fcad0d42adc 100644 --- a/tools/benchmark/benchmark.cpp +++ b/tools/benchmark/benchmark.cpp @@ -1,8 +1,8 @@ #include "benchmark.h" #include +#include -#include "json.hpp" #include "spdlog/spdlog.h" #include "test_helper.h" @@ -51,15 +51,11 @@ void Benchmark::logQueryInfo( void Benchmark::log(uint32_t runNum, QueryResult& queryResult) const { auto querySummary = queryResult.getQuerySummary(); auto actualOutput = testing::TestHelper::convertResultToString(queryResult); - std::string plan = "Plan: \n" + querySummary->printPlanToJson().dump(4); spdlog::info("Run number: {}", runNum); spdlog::info("Compiling time {}", querySummary->getCompilingTime()); spdlog::info("Execution time {}", querySummary->getExecutionTime()); verify(actualOutput); spdlog::info(""); - if (config.enableProfile) { - spdlog::info("{}", plan); - } if (!config.outputPath.empty()) { std::ofstream logFile(config.outputPath + "/" + name + "_log.txt", std::ios_base::app); logQueryInfo(logFile, runNum, actualOutput); @@ -67,14 +63,6 @@ void Benchmark::log(uint32_t runNum, QueryResult& queryResult) const { logFile << "Execution time: " << querySummary->getExecutionTime() << std::endl << std::endl; logFile.flush(); logFile.close(); - if (config.enableProfile) { - std::ofstream profileFile( - config.outputPath + "/" + name + "_profile.txt", std::ios_base::app); - logQueryInfo(profileFile, runNum, actualOutput); - profileFile << plan << std::endl << std::endl; - profileFile.flush(); - profileFile.close(); - } } } diff --git a/tools/shell/embedded_shell.cpp b/tools/shell/embedded_shell.cpp index 2e2619132e8..5c40fc09e3b 100644 --- a/tools/shell/embedded_shell.cpp +++ b/tools/shell/embedded_shell.cpp @@ -385,15 +385,6 @@ void EmbeddedShell::printExecutionResult(QueryResult& queryResult) const { } printf("Time: %.2fms (compiling), %.2fms (executing)\n", querySummary->getCompilingTime(), querySummary->getExecutionTime()); - - if (querySummary->getIsProfile()) { - // print plan with profiling metrics - printf("==============================================\n"); - printf("=============== Profiler Summary =============\n"); - printf("==============================================\n"); - printf(">> plan\n"); - printf("%s", querySummary->getPlanAsOstream().str().c_str()); - } } }