From 8d12790753a8d200e63da06714087634a111ef1a Mon Sep 17 00:00:00 2001 From: xiyang Date: Wed, 21 Jun 2023 14:51:51 -0400 Subject: [PATCH] add path semi mask --- src/include/common/constants.h | 2 +- .../optimizer/acc_hash_join_optimizer.h | 10 +- .../optimizer/logical_operator_visitor.h | 6 + .../projection_push_down_optimizer.h | 2 +- src/include/planner/join_order_enumerator.h | 4 +- .../logical_operator/base_logical_extend.h | 4 +- .../logical_operator/base_logical_operator.h | 1 + .../logical_operator/logical_extend.h | 3 +- .../logical_recursive_extend.h | 42 +++++-- .../logical_operator/logical_semi_masker.h | 20 ++- src/include/processor/mapper/plan_mapper.h | 4 +- .../recursive_extend/recursive_join.h | 2 +- src/include/processor/operator/semi_masker.h | 118 ++++++++++++++---- src/optimizer/acc_hash_join_optimizer.cpp | 59 +++++++-- src/optimizer/logical_operator_visitor.cpp | 6 + src/optimizer/optimizer.cpp | 6 +- .../projection_push_down_optimizer.cpp | 13 +- src/planner/join_order/CMakeLists.txt | 1 + src/planner/join_order/append_extend.cpp | 42 ++++--- src/planner/join_order/append_join.cpp | 88 +++++++++++++ src/planner/join_order_enumerator.cpp | 77 ------------ .../operator/base_logical_operator.cpp | 3 + src/processor/mapper/CMakeLists.txt | 1 + src/processor/mapper/map_acc_hash_join.cpp | 2 +- src/processor/mapper/map_hash_join.cpp | 2 +- src/processor/mapper/map_intersect.cpp | 2 +- .../mapper/map_path_property_probe.cpp | 93 ++++++++++++++ src/processor/mapper/map_recursive_extend.cpp | 81 +----------- src/processor/mapper/map_semi_masker.cpp | 52 ++++---- src/processor/mapper/plan_mapper.cpp | 3 + src/processor/operator/semi_masker.cpp | 75 ++++++++--- 31 files changed, 540 insertions(+), 284 deletions(-) create mode 100644 src/planner/join_order/append_join.cpp create mode 100644 src/processor/mapper/map_path_property_probe.cpp diff --git a/src/include/common/constants.h b/src/include/common/constants.h index 09661dfea5..24cd654504 100644 --- a/src/include/common/constants.h +++ b/src/include/common/constants.h @@ -143,7 +143,7 @@ struct PlannerKnobs { static constexpr uint64_t BUILD_PENALTY = 2; // Avoid doing probe to build SIP if we have to accumulate a probe side that is much bigger than // build side. Also avoid doing build to probe SIP if probe side is not much bigger than build. - static constexpr uint64_t ACC_HJ_PROBE_BUILD_RATIO = 5; + static constexpr uint64_t SIP_RATIO = 5; }; struct ClientContextConstants { diff --git a/src/include/optimizer/acc_hash_join_optimizer.h b/src/include/optimizer/acc_hash_join_optimizer.h index 826b694beb..b6f7db76dc 100644 --- a/src/include/optimizer/acc_hash_join_optimizer.h +++ b/src/include/optimizer/acc_hash_join_optimizer.h @@ -22,6 +22,8 @@ class HashJoinSIPOptimizer : public LogicalOperatorVisitor { void visitIntersect(planner::LogicalOperator* op) override; + void visitPathPropertyProbe(planner::LogicalOperator* op) override; + bool isProbeSideQualified(planner::LogicalOperator* probeRoot); std::vector resolveOperatorsToApplySemiMask( @@ -35,8 +37,12 @@ class HashJoinSIPOptimizer : public LogicalOperatorVisitor { std::vector resolveShortestPathExtendToApplySemiMask( const binder::Expression& nodeID, planner::LogicalOperator* root); - std::shared_ptr appendSemiMask( - std::vector ops, + std::shared_ptr appendNodeSemiMasker( + std::vector opsToApplySemiMask, + std::shared_ptr child); + std::shared_ptr appendPathSemiMasker( + std::shared_ptr pathExpression, + std::vector opsToApplySemiMask, std::shared_ptr child); std::shared_ptr appendAccumulate( std::shared_ptr child); diff --git a/src/include/optimizer/logical_operator_visitor.h b/src/include/optimizer/logical_operator_visitor.h index f475a2eb79..fc88bcf8d7 100644 --- a/src/include/optimizer/logical_operator_visitor.h +++ b/src/include/optimizer/logical_operator_visitor.h @@ -45,6 +45,12 @@ class LogicalOperatorVisitor { return op; } + virtual void visitPathPropertyProbe(planner::LogicalOperator* op) {} + virtual std::shared_ptr visitPathPropertyProbeReplace( + std::shared_ptr op) { + return op; + } + virtual void visitHashJoin(planner::LogicalOperator* op) {} virtual std::shared_ptr visitHashJoinReplace( std::shared_ptr op) { diff --git a/src/include/optimizer/projection_push_down_optimizer.h b/src/include/optimizer/projection_push_down_optimizer.h index 9a7e93b200..d8a3485f98 100644 --- a/src/include/optimizer/projection_push_down_optimizer.h +++ b/src/include/optimizer/projection_push_down_optimizer.h @@ -19,7 +19,7 @@ class ProjectionPushDownOptimizer : public LogicalOperatorVisitor { private: void visitOperator(planner::LogicalOperator* op); - void visitRecursiveExtend(planner::LogicalOperator* op) override; + void visitPathPropertyProbe(planner::LogicalOperator* op) override; void visitExtend(planner::LogicalOperator* op) override; void visitAccumulate(planner::LogicalOperator* op) override; void visitFilter(planner::LogicalOperator* op) override; diff --git a/src/include/planner/join_order_enumerator.h b/src/include/planner/join_order_enumerator.h index 9895c76988..38478dca36 100644 --- a/src/include/planner/join_order_enumerator.h +++ b/src/include/planner/join_order_enumerator.h @@ -93,9 +93,9 @@ class JoinOrderEnumerator { void createRecursivePlan(std::shared_ptr boundNode, std::shared_ptr recursiveNode, std::shared_ptr recursiveRel, ExtendDirection direction, LogicalPlan& plan); - void createRecursiveNodePropertyScanPlan( + void createPathNodePropertyScanPlan( std::shared_ptr recursiveNode, LogicalPlan& plan); - void createRecursiveRelPropertyScanPlan(std::shared_ptr recursiveNode, + void createPathRelPropertyScanPlan(std::shared_ptr recursiveNode, std::shared_ptr nbrNode, std::shared_ptr recursiveRel, ExtendDirection direction, LogicalPlan& plan); diff --git a/src/include/planner/logical_plan/logical_operator/base_logical_extend.h b/src/include/planner/logical_plan/logical_operator/base_logical_extend.h index 9c7e68d4d5..15017f88c0 100644 --- a/src/include/planner/logical_plan/logical_operator/base_logical_extend.h +++ b/src/include/planner/logical_plan/logical_operator/base_logical_extend.h @@ -12,8 +12,8 @@ class BaseLogicalExtend : public LogicalOperator { BaseLogicalExtend(LogicalOperatorType operatorType, std::shared_ptr boundNode, std::shared_ptr nbrNode, std::shared_ptr rel, - ExtendDirection direction, std::vector> children) - : LogicalOperator{operatorType, std::move(children)}, boundNode{std::move(boundNode)}, + ExtendDirection direction, std::shared_ptr child) + : LogicalOperator{operatorType, std::move(child)}, boundNode{std::move(boundNode)}, nbrNode{std::move(nbrNode)}, rel{std::move(rel)}, direction{direction} {} inline std::shared_ptr getBoundNode() const { return boundNode; } diff --git a/src/include/planner/logical_plan/logical_operator/base_logical_operator.h b/src/include/planner/logical_plan/logical_operator/base_logical_operator.h index ae9aafea0e..1a912edf03 100644 --- a/src/include/planner/logical_plan/logical_operator/base_logical_operator.h +++ b/src/include/planner/logical_plan/logical_operator/base_logical_operator.h @@ -30,6 +30,7 @@ enum class LogicalOperatorType : uint8_t { LIMIT, MULTIPLICITY_REDUCER, ORDER_BY, + PATH_PROPERTY_PROBE, PROJECTION, RECURSIVE_EXTEND, RENAME_TABLE, diff --git a/src/include/planner/logical_plan/logical_operator/logical_extend.h b/src/include/planner/logical_plan/logical_operator/logical_extend.h index 6a72d041b5..0478aff89b 100644 --- a/src/include/planner/logical_plan/logical_operator/logical_extend.h +++ b/src/include/planner/logical_plan/logical_operator/logical_extend.h @@ -12,8 +12,7 @@ class LogicalExtend : public BaseLogicalExtend { ExtendDirection direction, binder::expression_vector properties, bool hasAtMostOneNbr, std::shared_ptr child) : BaseLogicalExtend{LogicalOperatorType::EXTEND, std::move(boundNode), std::move(nbrNode), - std::move(rel), direction, - std::vector>{std::move(child)}}, + std::move(rel), direction, std::move(child)}, properties{std::move(properties)}, hasAtMostOneNbr{hasAtMostOneNbr} {} f_group_pos_set getGroupsPosToFlatten() override; diff --git a/src/include/planner/logical_plan/logical_operator/logical_recursive_extend.h b/src/include/planner/logical_plan/logical_operator/logical_recursive_extend.h index 59b0b4001d..579c56312f 100644 --- a/src/include/planner/logical_plan/logical_operator/logical_recursive_extend.h +++ b/src/include/planner/logical_plan/logical_operator/logical_recursive_extend.h @@ -2,6 +2,7 @@ #include "base_logical_extend.h" #include "recursive_join_type.h" +#include "side_way_info_passing.h" namespace kuzu { namespace planner { @@ -11,10 +12,9 @@ class LogicalRecursiveExtend : public BaseLogicalExtend { LogicalRecursiveExtend(std::shared_ptr boundNode, std::shared_ptr nbrNode, std::shared_ptr rel, ExtendDirection direction, RecursiveJoinType joinType, - std::vector> children, - std::shared_ptr recursiveChild) + std::shared_ptr child, std::shared_ptr recursiveChild) : BaseLogicalExtend{LogicalOperatorType::RECURSIVE_EXTEND, std::move(boundNode), - std::move(nbrNode), std::move(rel), direction, std::move(children)}, + std::move(nbrNode), std::move(rel), direction, std::move(child)}, joinType{joinType}, recursiveChild{std::move(recursiveChild)} {} f_group_pos_set getGroupsPosToFlatten() override; @@ -27,13 +27,8 @@ class LogicalRecursiveExtend : public BaseLogicalExtend { inline std::shared_ptr getRecursiveChild() const { return recursiveChild; } inline std::unique_ptr copy() override { - std::vector> copiedChildren; - copiedChildren.reserve(children.size()); - for (auto& child : children) { - copiedChildren.push_back(child->copy()); - } return std::make_unique(boundNode, nbrNode, rel, direction, - joinType, std::move(copiedChildren), recursiveChild->copy()); + joinType, children[0]->copy(), recursiveChild->copy()); } private: @@ -41,6 +36,35 @@ class LogicalRecursiveExtend : public BaseLogicalExtend { std::shared_ptr recursiveChild; }; +class LogicalPathPropertyProbe : public LogicalOperator { +public: + LogicalPathPropertyProbe(std::shared_ptr recursiveRel, + std::shared_ptr probeChild, std::shared_ptr nodeChild, + std::shared_ptr relChild) + : LogicalOperator{LogicalOperatorType::PATH_PROPERTY_PROBE, + std::vector>{ + std::move(probeChild), std::move(nodeChild), std::move(relChild)}}, + recursiveRel{std::move(recursiveRel)}, sip{SidewaysInfoPassing::NONE} {} + + void computeFactorizedSchema() override { copyChildSchema(0); } + void computeFlatSchema() override { copyChildSchema(0); } + + std::string getExpressionsForPrinting() const override { return recursiveRel->toString(); } + + inline std::shared_ptr getRel() const { return recursiveRel; } + inline void setSIP(SidewaysInfoPassing sip_) { sip = sip_; } + inline SidewaysInfoPassing getSIP() const { return sip; } + + inline std::unique_ptr copy() override { + return std::make_unique( + recursiveRel, children[0]->copy(), children[1]->copy(), children[2]->copy()); + } + +private: + std::shared_ptr recursiveRel; + SidewaysInfoPassing sip; +}; + class LogicalScanFrontier : public LogicalOperator { public: LogicalScanFrontier(std::shared_ptr node) diff --git a/src/include/planner/logical_plan/logical_operator/logical_semi_masker.h b/src/include/planner/logical_plan/logical_operator/logical_semi_masker.h index faba5c71d5..0c28bd1503 100644 --- a/src/include/planner/logical_plan/logical_operator/logical_semi_masker.h +++ b/src/include/planner/logical_plan/logical_operator/logical_semi_masker.h @@ -6,26 +6,36 @@ namespace kuzu { namespace planner { +enum class SemiMaskType : uint8_t { + NODE = 0, + PATH = 1, +}; + class LogicalSemiMasker : public LogicalOperator { public: - LogicalSemiMasker(std::shared_ptr node, - std::vector ops, std::shared_ptr child) - : LogicalOperator{LogicalOperatorType::SEMI_MASKER, std::move(child)}, - node{std::move(node)}, ops{std::move(ops)} {} + LogicalSemiMasker(SemiMaskType type, std::shared_ptr key, + std::shared_ptr node, std::vector ops, + std::shared_ptr child) + : LogicalOperator{LogicalOperatorType::SEMI_MASKER, std::move(child)}, type{type}, + key{std::move(key)}, node{std::move(node)}, ops{std::move(ops)} {} inline void computeFactorizedSchema() override { copyChildSchema(0); } inline void computeFlatSchema() override { copyChildSchema(0); } inline std::string getExpressionsForPrinting() const override { return node->toString(); } + inline SemiMaskType getType() const { return type; } + inline std::shared_ptr getKey() const { return key; } inline std::shared_ptr getNode() const { return node; } inline std::vector getOperators() const { return ops; } inline std::unique_ptr copy() override { - return make_unique(node, ops, children[0]->copy()); + throw common::RuntimeException("LogicalSemiMasker::copy() should not be called."); } private: + SemiMaskType type; + std::shared_ptr key; std::shared_ptr node; std::vector ops; }; diff --git a/src/include/processor/mapper/plan_mapper.h b/src/include/processor/mapper/plan_mapper.h index 9a6eaa3f14..79f8e6bb69 100644 --- a/src/include/processor/mapper/plan_mapper.h +++ b/src/include/processor/mapper/plan_mapper.h @@ -42,6 +42,8 @@ class PlanMapper { planner::LogicalOperator* logicalOperator); std::unique_ptr mapLogicalRecursiveExtendToPhysical( planner::LogicalOperator* logicalOperator); + std::unique_ptr mapLogicalPathPropertyProbeToPhysical( + planner::LogicalOperator* logicalOperator); std::unique_ptr mapLogicalFlattenToPhysical( planner::LogicalOperator* logicalOperator); std::unique_ptr mapLogicalFilterToPhysical( @@ -126,7 +128,7 @@ class PlanMapper { planner::Schema* outSchema, std::unique_ptr prevOperator, const std::string& paramsString); - static void mapAccHashJoin(PhysicalOperator* probe); + static void mapSIPJoin(PhysicalOperator* probe); static std::vector getExpressionsDataPos( const binder::expression_vector& expressions, const planner::Schema& schema); diff --git a/src/include/processor/operator/recursive_extend/recursive_join.h b/src/include/processor/operator/recursive_extend/recursive_join.h index 99f66a21fb..8c24b98a14 100644 --- a/src/include/processor/operator/recursive_extend/recursive_join.h +++ b/src/include/processor/operator/recursive_extend/recursive_join.h @@ -58,7 +58,7 @@ struct RecursiveJoinVectors { common::ValueVector* srcNodeIDVector = nullptr; common::ValueVector* dstNodeIDVector = nullptr; common::ValueVector* pathLengthVector = nullptr; - common::ValueVector* pathVector = nullptr; // STRUCT(LIST(STRUCT), LIST(INTERNAL_ID)) + common::ValueVector* pathVector = nullptr; // STRUCT(LIST(NODE), LIST(REL)) common::ValueVector* pathNodesVector = nullptr; // LIST(STRUCT) common::ValueVector* pathNodesIDDataVector = nullptr; // INTERNAL_ID common::ValueVector* pathRelsVector = nullptr; // LIST(STRUCT) diff --git a/src/include/processor/operator/semi_masker.h b/src/include/processor/operator/semi_masker.h index 8104fc4aca..39fcb495af 100644 --- a/src/include/processor/operator/semi_masker.h +++ b/src/include/processor/operator/semi_masker.h @@ -6,59 +6,125 @@ namespace kuzu { namespace processor { -// Multiple maskers can point to the same SemiMask, thus we associate each masker with an idx -// to indicate the execution sequence of its pipeline. Also, the maskerIdx is used as a flag to -// indicate if a value in the mask is masked or not, as each masker will increment the selected -// value in the mask by 1. More details are described in NodeTableSemiMask. -using mask_with_idx = std::pair; +class BaseSemiMasker; + +class SemiMaskerInfo { + friend class BaseSemiMasker; + +public: + // Multiple maskers can point to the same SemiMask, thus we associate each masker with an idx + // to indicate the execution sequence of its pipeline. Also, the maskerIdx is used as a flag to + // indicate if a value in the mask is masked or not, as each masker will increment the selected + // value in the mask by 1. More details are described in NodeTableSemiMask. + using mask_with_idx = std::pair; + + SemiMaskerInfo(const DataPos& keyPos, + std::unordered_map> masksPerTable) + : keyPos{keyPos}, masksPerTable{std::move(masksPerTable)} {} + SemiMaskerInfo(const SemiMaskerInfo& other) + : keyPos{other.keyPos}, masksPerTable{other.masksPerTable} {} + + inline const std::vector& getSingleTableMasks() const { + assert(masksPerTable.size() == 1); + return masksPerTable.begin()->second; + } + + inline const std::vector& getTableMasks(common::table_id_t tableID) const { + assert(masksPerTable.contains(tableID)); + return masksPerTable.at(tableID); + } + + inline std::unique_ptr copy() const { + return std::make_unique(*this); + } + +private: + DataPos keyPos; + std::unordered_map> masksPerTable; +}; class BaseSemiMasker : public PhysicalOperator { protected: - BaseSemiMasker(const DataPos& keyDataPos, - std::unordered_map> masksPerTable, - std::unique_ptr child, uint32_t id, const std::string& paramsString) + BaseSemiMasker(std::unique_ptr info, std::unique_ptr child, + uint32_t id, const std::string& paramsString) : PhysicalOperator{PhysicalOperatorType::SEMI_MASKER, std::move(child), id, paramsString}, - keyDataPos{keyDataPos}, masksPerTable{std::move(masksPerTable)} {} + info{std::move(info)} {} void initGlobalStateInternal(ExecutionContext* context) override; void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override; protected: - DataPos keyDataPos; - std::unordered_map> masksPerTable; - std::shared_ptr keyValueVector; + std::unique_ptr info; + common::ValueVector* keyVector; }; class SingleTableSemiMasker : public BaseSemiMasker { public: - SingleTableSemiMasker(const DataPos& keyDataPos, - std::unordered_map> masksPerTable, + SingleTableSemiMasker(std::unique_ptr info, std::unique_ptr child, uint32_t id, const std::string& paramsString) - : BaseSemiMasker{keyDataPos, std::move(masksPerTable), std::move(child), id, paramsString} { - } + : BaseSemiMasker{std::move(info), std::move(child), id, paramsString} {} - bool getNextTuplesInternal(ExecutionContext* context) override; + bool getNextTuplesInternal(ExecutionContext* context) final; - inline std::unique_ptr clone() override { + inline std::unique_ptr clone() final { return std::make_unique( - keyDataPos, masksPerTable, children[0]->clone(), id, paramsString); + info->copy(), children[0]->clone(), id, paramsString); } }; class MultiTableSemiMasker : public BaseSemiMasker { public: - MultiTableSemiMasker(const DataPos& keyDataPos, - std::unordered_map> masksPerTable, + MultiTableSemiMasker(std::unique_ptr info, std::unique_ptr child, uint32_t id, const std::string& paramsString) - : BaseSemiMasker{keyDataPos, std::move(masksPerTable), std::move(child), id, paramsString} { - } + : BaseSemiMasker{std::move(info), std::move(child), id, paramsString} {} - bool getNextTuplesInternal(ExecutionContext* context) override; + bool getNextTuplesInternal(ExecutionContext* context) final; - inline std::unique_ptr clone() override { + inline std::unique_ptr clone() final { return std::make_unique( - keyDataPos, masksPerTable, children[0]->clone(), id, paramsString); + info->copy(), children[0]->clone(), id, paramsString); + } +}; + +class PathSemiMasker : public BaseSemiMasker { +protected: + PathSemiMasker(std::unique_ptr info, std::unique_ptr child, + uint32_t id, const std::string& paramsString) + : BaseSemiMasker{std::move(info), std::move(child), id, paramsString} {} + + void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final; + +protected: + common::ValueVector* pathNodesVector; + common::ValueVector* pathNodesIDDataVector; +}; + +class PathSingleTableSemiMasker : public PathSemiMasker { +public: + PathSingleTableSemiMasker(std::unique_ptr info, + std::unique_ptr child, uint32_t id, const std::string& paramsString) + : PathSemiMasker{std::move(info), std::move(child), id, paramsString} {} + + bool getNextTuplesInternal(ExecutionContext* context) final; + + inline std::unique_ptr clone() final { + return std::make_unique( + info->copy(), children[0]->clone(), id, paramsString); + } +}; + +class PathMultipleTableSemiMasker : public PathSemiMasker { +public: + PathMultipleTableSemiMasker(std::unique_ptr info, + std::unique_ptr child, uint32_t id, const std::string& paramsString) + : PathSemiMasker{std::move(info), std::move(child), id, paramsString} {} + + bool getNextTuplesInternal(ExecutionContext* context) final; + + inline std::unique_ptr clone() final { + return std::make_unique( + info->copy(), children[0]->clone(), id, paramsString); } }; diff --git a/src/optimizer/acc_hash_join_optimizer.cpp b/src/optimizer/acc_hash_join_optimizer.cpp index 5401e436cd..8e14beb36c 100644 --- a/src/optimizer/acc_hash_join_optimizer.cpp +++ b/src/optimizer/acc_hash_join_optimizer.cpp @@ -46,7 +46,7 @@ bool HashJoinSIPOptimizer::tryProbeToBuildHJSIP(planner::LogicalOperator* op) { for (auto& nodeID : hashJoin->getJoinNodeIDs()) { auto ops = resolveOperatorsToApplySemiMask(*nodeID, buildRoot.get()); if (!ops.empty()) { - probeRoot = appendSemiMask(ops, probeRoot); + probeRoot = appendNodeSemiMasker(ops, probeRoot); hasSemiMaskApplied = true; } } @@ -85,7 +85,7 @@ bool HashJoinSIPOptimizer::tryBuildToProbeHJSIP(planner::LogicalOperator* op) { for (auto& nodeID : hashJoin->getJoinNodeIDs()) { auto ops = resolveOperatorsToApplySemiMask(*nodeID, probeRoot.get()); if (!ops.empty()) { - buildRoot = appendSemiMask(ops, buildRoot); + buildRoot = appendNodeSemiMasker(ops, buildRoot); } } hashJoin->setSIP(planner::SidewaysInfoPassing::BUILD_TO_PROBE); @@ -112,7 +112,7 @@ void HashJoinSIPOptimizer::visitIntersect(planner::LogicalOperator* op) { } } if (!ops.empty()) { - probeRoot = appendSemiMask(ops, probeRoot); + probeRoot = appendNodeSemiMasker(ops, probeRoot); hasSemiMaskApplied = true; } } @@ -123,6 +123,32 @@ void HashJoinSIPOptimizer::visitIntersect(planner::LogicalOperator* op) { intersect->setChild(0, appendAccumulate(probeRoot)); } +void HashJoinSIPOptimizer::visitPathPropertyProbe(planner::LogicalOperator* op) { + auto pathPropertyProbe = (LogicalPathPropertyProbe*)op; + if (pathPropertyProbe->getSIP() == planner::SidewaysInfoPassing::PROHIBIT_PROBE_TO_BUILD) { + return; + } + if (pathPropertyProbe->getNumChildren() == 1) { // No path being tracked. + return; + } + auto recursiveRel = pathPropertyProbe->getRel(); + auto nodeID = recursiveRel->getRecursiveInfo()->node->getInternalIDProperty(); + std::vector opsToApplySemiMask; + for (auto op_ : + resolveOperatorsToApplySemiMask(*nodeID, pathPropertyProbe->getChild(1).get())) { + opsToApplySemiMask.push_back(op_); + } + for (auto op_ : + resolveOperatorsToApplySemiMask(*nodeID, pathPropertyProbe->getChild(2).get())) { + opsToApplySemiMask.push_back(op_); + } + assert(opsToApplySemiMask.size() == 2); + auto semiMask = + appendPathSemiMasker(recursiveRel, opsToApplySemiMask, pathPropertyProbe->getChild(0)); + pathPropertyProbe->setSIP(planner::SidewaysInfoPassing::PROBE_TO_BUILD); + pathPropertyProbe->setChild(0, appendAccumulate(semiMask)); +} + // Probe side is qualified if it is selective. bool HashJoinSIPOptimizer::isProbeSideQualified(planner::LogicalOperator* probeRoot) { if (probeRoot->getOperatorType() == LogicalOperatorType::ACCUMULATE) { @@ -177,10 +203,11 @@ HashJoinSIPOptimizer::resolveShortestPathExtendToApplySemiMask( return result; } -std::shared_ptr HashJoinSIPOptimizer::appendSemiMask( - std::vector ops, std::shared_ptr child) { - assert(!ops.empty()); - auto op = ops[0]; +std::shared_ptr HashJoinSIPOptimizer::appendNodeSemiMasker( + std::vector opsToApplySemiMask, + std::shared_ptr child) { + assert(!opsToApplySemiMask.empty()); + auto op = opsToApplySemiMask[0]; std::shared_ptr node; switch (op->getOperatorType()) { case LogicalOperatorType::SCAN_NODE: { @@ -194,7 +221,23 @@ std::shared_ptr HashJoinSIPOptimizer::appendSemiMask( default: throw common::NotImplementedException("HashJoinSIPOptimizer::appendSemiMask"); } - auto semiMasker = std::make_shared(node, ops, child); + auto semiMasker = std::make_shared( + SemiMaskType::NODE, node->getInternalIDProperty(), node, opsToApplySemiMask, child); + semiMasker->computeFlatSchema(); + return semiMasker; +} + +std::shared_ptr HashJoinSIPOptimizer::appendPathSemiMasker( + std::shared_ptr pathExpression, + std::vector opsToApplySemiMask, + std::shared_ptr child) { + assert(!opsToApplySemiMask.empty()); + auto op = opsToApplySemiMask[0]; + assert(op->getOperatorType() == planner::LogicalOperatorType::SCAN_NODE); + auto scanNode = (LogicalScanNode*)op; + auto node = scanNode->getNode(); + auto semiMasker = std::make_shared( + SemiMaskType::PATH, pathExpression, node, opsToApplySemiMask, child); semiMasker->computeFlatSchema(); return semiMasker; } diff --git a/src/optimizer/logical_operator_visitor.cpp b/src/optimizer/logical_operator_visitor.cpp index 56b9012044..b07a04ea79 100644 --- a/src/optimizer/logical_operator_visitor.cpp +++ b/src/optimizer/logical_operator_visitor.cpp @@ -22,6 +22,9 @@ void LogicalOperatorVisitor::visitOperatorSwitch(planner::LogicalOperator* op) { case LogicalOperatorType::RECURSIVE_EXTEND: { visitRecursiveExtend(op); } break; + case LogicalOperatorType::PATH_PROPERTY_PROBE: { + visitPathPropertyProbe(op); + } break; case LogicalOperatorType::HASH_JOIN: { visitHashJoin(op); } break; @@ -99,6 +102,9 @@ std::shared_ptr LogicalOperatorVisitor::visitOperatorR case LogicalOperatorType::RECURSIVE_EXTEND: { return visitRecursiveExtendReplace(op); } + case LogicalOperatorType::PATH_PROPERTY_PROBE: { + return visitPathPropertyProbeReplace(op); + } case LogicalOperatorType::HASH_JOIN: { return visitHashJoinReplace(op); } diff --git a/src/optimizer/optimizer.cpp b/src/optimizer/optimizer.cpp index 3a626fee38..7e0054b977 100644 --- a/src/optimizer/optimizer.cpp +++ b/src/optimizer/optimizer.cpp @@ -22,13 +22,13 @@ void Optimizer::optimize(planner::LogicalPlan* plan) { auto filterPushDownOptimizer = FilterPushDownOptimizer(); filterPushDownOptimizer.rewrite(plan); + auto projectionPushDownOptimizer = ProjectionPushDownOptimizer(); + projectionPushDownOptimizer.rewrite(plan); + // HashJoinSIPOptimizer should be applied after optimizers that manipulate hash join. auto hashJoinSIPOptimizer = HashJoinSIPOptimizer(); hashJoinSIPOptimizer.rewrite(plan); - auto projectionPushDownOptimizer = ProjectionPushDownOptimizer(); - projectionPushDownOptimizer.rewrite(plan); - auto factorizationRewriter = FactorizationRewriter(); factorizationRewriter.rewrite(plan); diff --git a/src/optimizer/projection_push_down_optimizer.cpp b/src/optimizer/projection_push_down_optimizer.cpp index 235c98bb49..2a9ff9e8a6 100644 --- a/src/optimizer/projection_push_down_optimizer.cpp +++ b/src/optimizer/projection_push_down_optimizer.cpp @@ -37,17 +37,20 @@ void ProjectionPushDownOptimizer::visitOperator(LogicalOperator* op) { op->computeFlatSchema(); } -void ProjectionPushDownOptimizer::visitRecursiveExtend(LogicalOperator* op) { - auto recursiveExtend = (LogicalRecursiveExtend*)op; +void ProjectionPushDownOptimizer::visitPathPropertyProbe(planner::LogicalOperator* op) { + auto pathPropertyProbe = (LogicalPathPropertyProbe*)op; + assert( + pathPropertyProbe->getChild(0)->getOperatorType() == LogicalOperatorType::RECURSIVE_EXTEND); + auto recursiveExtend = (LogicalRecursiveExtend*)pathPropertyProbe->getChild(0).get(); auto boundNodeID = recursiveExtend->getBoundNode()->getInternalIDProperty(); collectExpressionsInUse(boundNodeID); auto rel = recursiveExtend->getRel(); auto recursiveInfo = rel->getRecursiveInfo(); if (!variablesInUse.contains(rel)) { recursiveExtend->setJoinType(planner::RecursiveJoinType::TRACK_NONE); - // Remove build size - recursiveExtend->setChildren( - std::vector>{recursiveExtend->getChild(0)}); + // TODO(Xiyang): we should remove pathPropertyProbe if we don't need to track path + pathPropertyProbe->setChildren( + std::vector>{pathPropertyProbe->getChild(0)}); } else { // Pre-append projection to rel property build. expression_vector properties; diff --git a/src/planner/join_order/CMakeLists.txt b/src/planner/join_order/CMakeLists.txt index e3e90cee47..fcc43fc4ce 100644 --- a/src/planner/join_order/CMakeLists.txt +++ b/src/planner/join_order/CMakeLists.txt @@ -1,6 +1,7 @@ add_library(kuzu_planner_join_order OBJECT append_extend.cpp + append_join.cpp cardinality_estimator.cpp cost_model.cpp join_order_util.cpp) diff --git a/src/planner/join_order/append_extend.cpp b/src/planner/join_order/append_extend.cpp index 47d169b9c9..fbdfac7136 100644 --- a/src/planner/join_order/append_extend.cpp +++ b/src/planner/join_order/append_extend.cpp @@ -50,34 +50,42 @@ void JoinOrderEnumerator::appendRecursiveExtend(std::shared_ptr ExtendDirection direction, LogicalPlan& plan) { auto recursiveInfo = rel->getRecursiveInfo(); queryPlanner->appendAccumulate(plan); - std::vector> children; - children.push_back(plan.getLastOperator()); - // Recursive node property scan plan - auto recursiveNodePropertyScanPlan = std::make_unique(); - createRecursiveNodePropertyScanPlan(recursiveInfo->node, *recursiveNodePropertyScanPlan); - children.push_back(recursiveNodePropertyScanPlan->getLastOperator()); - // Recursive rel property scan plan - auto recursiveRelPropertyScanPlan = std::make_unique(); - createRecursiveRelPropertyScanPlan( - recursiveInfo->node, nbrNode, recursiveInfo->rel, direction, *recursiveRelPropertyScanPlan); - children.push_back(recursiveRelPropertyScanPlan->getLastOperator()); - // Recursive plan + // Create recursive plan auto recursivePlan = std::make_unique(); createRecursivePlan( boundNode, recursiveInfo->node, recursiveInfo->rel, direction, *recursivePlan); + // Create recursive extend auto extend = std::make_shared(boundNode, nbrNode, rel, direction, - RecursiveJoinType::TRACK_PATH, std::move(children), recursivePlan->getLastOperator()); + RecursiveJoinType::TRACK_PATH, plan.getLastOperator(), recursivePlan->getLastOperator()); queryPlanner->appendFlattens(extend->getGroupsPosToFlatten(), plan); extend->setChild(0, plan.getLastOperator()); extend->computeFactorizedSchema(); + // Create path node property scan plan + auto pathNodePropertyScanPlan = std::make_unique(); + createPathNodePropertyScanPlan(recursiveInfo->node, *pathNodePropertyScanPlan); + // Create path rel property scan plan + auto pathRelPropertyScanPlan = std::make_unique(); + createPathRelPropertyScanPlan( + recursiveInfo->node, nbrNode, recursiveInfo->rel, direction, *pathRelPropertyScanPlan); + // Create path property probe + auto pathPropertyProbe = std::make_shared(rel, extend, + pathNodePropertyScanPlan->getLastOperator(), pathRelPropertyScanPlan->getLastOperator()); + pathPropertyProbe->computeFactorizedSchema(); + // Check for sip + auto ratio = plan.getCardinality() / pathRelPropertyScanPlan->getCardinality(); + if (ratio > common::PlannerKnobs::SIP_RATIO) { + pathPropertyProbe->setSIP(SidewaysInfoPassing::PROHIBIT_PROBE_TO_BUILD); + } + // Update cost auto extensionRate = queryPlanner->cardinalityEstimator->getExtensionRate(*rel, *boundNode); plan.setCost(CostModel::computeRecursiveExtendCost(rel->getUpperBound(), extensionRate, plan)); + // Update cardinality auto hasAtMostOneNbr = extendHasAtMostOneNbrGuarantee(*rel, *boundNode, direction, catalog); if (!hasAtMostOneNbr) { - auto group = extend->getSchema()->getGroup(nbrNode->getInternalIDProperty()); + auto group = pathPropertyProbe->getSchema()->getGroup(nbrNode->getInternalIDProperty()); group->setMultiplier(extensionRate); } - plan.setLastOperator(std::move(extend)); + plan.setLastOperator(std::move(pathPropertyProbe)); } void JoinOrderEnumerator::createRecursivePlan(std::shared_ptr boundNode, @@ -90,7 +98,7 @@ void JoinOrderEnumerator::createRecursivePlan(std::shared_ptr bo appendNonRecursiveExtend(boundNode, recursiveNode, recursiveRel, direction, properties, plan); } -void JoinOrderEnumerator::createRecursiveNodePropertyScanPlan( +void JoinOrderEnumerator::createPathNodePropertyScanPlan( std::shared_ptr recursiveNode, LogicalPlan& plan) { appendScanNodeID(recursiveNode, plan); expression_vector properties; @@ -100,7 +108,7 @@ void JoinOrderEnumerator::createRecursiveNodePropertyScanPlan( queryPlanner->appendScanNodePropIfNecessary(properties, recursiveNode, plan); } -void JoinOrderEnumerator::createRecursiveRelPropertyScanPlan( +void JoinOrderEnumerator::createPathRelPropertyScanPlan( std::shared_ptr recursiveNode, std::shared_ptr nbrNode, std::shared_ptr recursiveRel, ExtendDirection direction, LogicalPlan& plan) { appendScanNodeID(recursiveNode, plan); diff --git a/src/planner/join_order/append_join.cpp b/src/planner/join_order/append_join.cpp new file mode 100644 index 0000000000..4ff05cdf0e --- /dev/null +++ b/src/planner/join_order/append_join.cpp @@ -0,0 +1,88 @@ +#include "planner/join_order/cost_model.h" +#include "planner/join_order_enumerator.h" +#include "planner/logical_plan/logical_operator/logical_hash_join.h" +#include "planner/logical_plan/logical_operator/logical_intersect.h" +#include "planner/query_planner.h" + +using namespace kuzu::common; + +namespace kuzu { +namespace planner { + +void JoinOrderEnumerator::appendHashJoin(const expression_vector& joinNodeIDs, JoinType joinType, + LogicalPlan& probePlan, LogicalPlan& buildPlan) { + auto hashJoin = make_shared( + joinNodeIDs, joinType, probePlan.getLastOperator(), buildPlan.getLastOperator()); + // Apply flattening to probe side + auto groupsPosToFlattenOnProbeSide = hashJoin->getGroupsPosToFlattenOnProbeSide(); + queryPlanner->appendFlattens(groupsPosToFlattenOnProbeSide, probePlan); + hashJoin->setChild(0, probePlan.getLastOperator()); + // Apply flattening to build side + queryPlanner->appendFlattens(hashJoin->getGroupsPosToFlattenOnBuildSide(), buildPlan); + hashJoin->setChild(1, buildPlan.getLastOperator()); + hashJoin->computeFactorizedSchema(); + // Check for sip + auto ratio = probePlan.getCardinality() / buildPlan.getCardinality(); + if (ratio > common::PlannerKnobs::SIP_RATIO) { + hashJoin->setSIP(SidewaysInfoPassing::PROHIBIT_PROBE_TO_BUILD); + } else { + hashJoin->setSIP(SidewaysInfoPassing::PROHIBIT_BUILD_TO_PROBE); + } + // Update cost + probePlan.setCost(CostModel::computeHashJoinCost(joinNodeIDs, probePlan, buildPlan)); + // Update cardinality + probePlan.setCardinality( + queryPlanner->cardinalityEstimator->estimateHashJoin(joinNodeIDs, probePlan, buildPlan)); + probePlan.setLastOperator(std::move(hashJoin)); +} + +void JoinOrderEnumerator::appendMarkJoin(const expression_vector& joinNodeIDs, + const std::shared_ptr& mark, LogicalPlan& probePlan, LogicalPlan& buildPlan) { + auto hashJoin = make_shared( + joinNodeIDs, mark, probePlan.getLastOperator(), buildPlan.getLastOperator()); + // Apply flattening to probe side + queryPlanner->appendFlattens(hashJoin->getGroupsPosToFlattenOnProbeSide(), probePlan); + hashJoin->setChild(0, probePlan.getLastOperator()); + // Apply flattening to build side + queryPlanner->appendFlattens(hashJoin->getGroupsPosToFlattenOnBuildSide(), buildPlan); + hashJoin->setChild(1, buildPlan.getLastOperator()); + hashJoin->computeFactorizedSchema(); + // update cost. Mark join does not change cardinality. + probePlan.setCost(CostModel::computeMarkJoinCost(joinNodeIDs, probePlan, buildPlan)); + probePlan.setLastOperator(std::move(hashJoin)); +} + +void JoinOrderEnumerator::appendIntersect(const std::shared_ptr& intersectNodeID, + binder::expression_vector& boundNodeIDs, LogicalPlan& probePlan, + std::vector>& buildPlans) { + assert(boundNodeIDs.size() == buildPlans.size()); + std::vector> buildChildren; + binder::expression_vector keyNodeIDs; + for (auto i = 0u; i < buildPlans.size(); ++i) { + keyNodeIDs.push_back(boundNodeIDs[i]); + buildChildren.push_back(buildPlans[i]->getLastOperator()); + } + auto intersect = make_shared(intersectNodeID, std::move(keyNodeIDs), + probePlan.getLastOperator(), std::move(buildChildren)); + queryPlanner->appendFlattens(intersect->getGroupsPosToFlattenOnProbeSide(), probePlan); + intersect->setChild(0, probePlan.getLastOperator()); + for (auto i = 0u; i < buildPlans.size(); ++i) { + queryPlanner->appendFlattens( + intersect->getGroupsPosToFlattenOnBuildSide(i), *buildPlans[i]); + intersect->setChild(i + 1, buildPlans[i]->getLastOperator()); + auto ratio = probePlan.getCardinality() / buildPlans[i]->getCardinality(); + if (ratio > common::PlannerKnobs::SIP_RATIO) { + intersect->setSIP(SidewaysInfoPassing::PROHIBIT_PROBE_TO_BUILD); + } + } + intersect->computeFactorizedSchema(); + // update cost + probePlan.setCost(CostModel::computeIntersectCost(probePlan, buildPlans)); + // update cardinality + probePlan.setCardinality( + queryPlanner->cardinalityEstimator->estimateIntersect(boundNodeIDs, probePlan, buildPlans)); + probePlan.setLastOperator(std::move(intersect)); +} + +} // namespace planner +} // namespace kuzu diff --git a/src/planner/join_order_enumerator.cpp b/src/planner/join_order_enumerator.cpp index 0c2aa07292..3ec05433b2 100644 --- a/src/planner/join_order_enumerator.cpp +++ b/src/planner/join_order_enumerator.cpp @@ -3,10 +3,7 @@ #include "planner/join_order/cost_model.h" #include "planner/logical_plan/logical_operator/logical_cross_product.h" #include "planner/logical_plan/logical_operator/logical_ftable_scan.h" -#include "planner/logical_plan/logical_operator/logical_hash_join.h" -#include "planner/logical_plan/logical_operator/logical_intersect.h" #include "planner/logical_plan/logical_operator/logical_scan_node.h" -#include "planner/projection_planner.h" #include "planner/query_planner.h" using namespace kuzu::common; @@ -471,80 +468,6 @@ void JoinOrderEnumerator::planJoin(const expression_vector& joinNodeIDs, JoinTyp } } -void JoinOrderEnumerator::appendHashJoin(const expression_vector& joinNodeIDs, JoinType joinType, - LogicalPlan& probePlan, LogicalPlan& buildPlan) { - auto hashJoin = make_shared( - joinNodeIDs, joinType, probePlan.getLastOperator(), buildPlan.getLastOperator()); - // Apply flattening to probe side - auto groupsPosToFlattenOnProbeSide = hashJoin->getGroupsPosToFlattenOnProbeSide(); - queryPlanner->appendFlattens(groupsPosToFlattenOnProbeSide, probePlan); - hashJoin->setChild(0, probePlan.getLastOperator()); - // Apply flattening to build side - queryPlanner->appendFlattens(hashJoin->getGroupsPosToFlattenOnBuildSide(), buildPlan); - hashJoin->setChild(1, buildPlan.getLastOperator()); - hashJoin->computeFactorizedSchema(); - auto ratio = probePlan.getCardinality() / buildPlan.getCardinality(); - if (ratio > common::PlannerKnobs::ACC_HJ_PROBE_BUILD_RATIO) { - hashJoin->setSIP(SidewaysInfoPassing::PROHIBIT_PROBE_TO_BUILD); - } else { - hashJoin->setSIP(SidewaysInfoPassing::PROHIBIT_BUILD_TO_PROBE); - } - // update cost - probePlan.setCost(CostModel::computeHashJoinCost(joinNodeIDs, probePlan, buildPlan)); - // update cardinality - probePlan.setCardinality( - queryPlanner->cardinalityEstimator->estimateHashJoin(joinNodeIDs, probePlan, buildPlan)); - probePlan.setLastOperator(std::move(hashJoin)); -} - -void JoinOrderEnumerator::appendMarkJoin(const expression_vector& joinNodeIDs, - const std::shared_ptr& mark, LogicalPlan& probePlan, LogicalPlan& buildPlan) { - auto hashJoin = make_shared( - joinNodeIDs, mark, probePlan.getLastOperator(), buildPlan.getLastOperator()); - // Apply flattening to probe side - queryPlanner->appendFlattens(hashJoin->getGroupsPosToFlattenOnProbeSide(), probePlan); - hashJoin->setChild(0, probePlan.getLastOperator()); - // Apply flattening to build side - queryPlanner->appendFlattens(hashJoin->getGroupsPosToFlattenOnBuildSide(), buildPlan); - hashJoin->setChild(1, buildPlan.getLastOperator()); - hashJoin->computeFactorizedSchema(); - // update cost. Mark join does not change cardinality. - probePlan.setCost(CostModel::computeMarkJoinCost(joinNodeIDs, probePlan, buildPlan)); - probePlan.setLastOperator(std::move(hashJoin)); -} - -void JoinOrderEnumerator::appendIntersect(const std::shared_ptr& intersectNodeID, - binder::expression_vector& boundNodeIDs, LogicalPlan& probePlan, - std::vector>& buildPlans) { - assert(boundNodeIDs.size() == buildPlans.size()); - std::vector> buildChildren; - binder::expression_vector keyNodeIDs; - for (auto i = 0u; i < buildPlans.size(); ++i) { - keyNodeIDs.push_back(boundNodeIDs[i]); - buildChildren.push_back(buildPlans[i]->getLastOperator()); - } - auto intersect = make_shared(intersectNodeID, std::move(keyNodeIDs), - probePlan.getLastOperator(), std::move(buildChildren)); - queryPlanner->appendFlattens(intersect->getGroupsPosToFlattenOnProbeSide(), probePlan); - intersect->setChild(0, probePlan.getLastOperator()); - for (auto i = 0u; i < buildPlans.size(); ++i) { - queryPlanner->appendFlattens( - intersect->getGroupsPosToFlattenOnBuildSide(i), *buildPlans[i]); - intersect->setChild(i + 1, buildPlans[i]->getLastOperator()); - if (probePlan.getCardinality() / buildPlans[i]->getCardinality() > - common::PlannerKnobs::ACC_HJ_PROBE_BUILD_RATIO) { - intersect->setSIP(SidewaysInfoPassing::PROHIBIT_PROBE_TO_BUILD); - } - } - intersect->computeFactorizedSchema(); - // update cost - probePlan.setCost(CostModel::computeIntersectCost(probePlan, buildPlans)); - // update cardinality - probePlan.setCardinality( - queryPlanner->cardinalityEstimator->estimateIntersect(boundNodeIDs, probePlan, buildPlans)); - probePlan.setLastOperator(std::move(intersect)); -} - void JoinOrderEnumerator::appendCrossProduct(LogicalPlan& probePlan, LogicalPlan& buildPlan) { auto crossProduct = make_shared(probePlan.getLastOperator(), buildPlan.getLastOperator()); diff --git a/src/planner/operator/base_logical_operator.cpp b/src/planner/operator/base_logical_operator.cpp index dd56ba7fdd..d45b500da9 100644 --- a/src/planner/operator/base_logical_operator.cpp +++ b/src/planner/operator/base_logical_operator.cpp @@ -79,6 +79,9 @@ std::string LogicalOperatorUtils::logicalOperatorTypeToString(LogicalOperatorTyp case LogicalOperatorType::ORDER_BY: { return "ORDER_BY"; } + case LogicalOperatorType::PATH_PROPERTY_PROBE: { + return "PATH_PROPERTY_PROBE"; + } case LogicalOperatorType::PROJECTION: { return "PROJECTION"; } diff --git a/src/processor/mapper/CMakeLists.txt b/src/processor/mapper/CMakeLists.txt index b3e62e3774..964566a8d5 100644 --- a/src/processor/mapper/CMakeLists.txt +++ b/src/processor/mapper/CMakeLists.txt @@ -18,6 +18,7 @@ add_library(kuzu_processor_mapper map_limit.cpp map_multiplicity_reducer.cpp map_order_by.cpp + map_path_property_probe.cpp map_projection.cpp map_recursive_extend.cpp map_scan_frontier.cpp diff --git a/src/processor/mapper/map_acc_hash_join.cpp b/src/processor/mapper/map_acc_hash_join.cpp index 06ad9d2f57..95a0de5c35 100644 --- a/src/processor/mapper/map_acc_hash_join.cpp +++ b/src/processor/mapper/map_acc_hash_join.cpp @@ -15,7 +15,7 @@ static FactorizedTableScan* getTableScanForAccHashJoin(PhysicalOperator* probe) return (FactorizedTableScan*)op; } -void PlanMapper::mapAccHashJoin(kuzu::processor::PhysicalOperator* probe) { +void PlanMapper::mapSIPJoin(kuzu::processor::PhysicalOperator* probe) { auto tableScan = getTableScanForAccHashJoin(probe); auto resultCollector = tableScan->moveUnaryChild(); probe->addChild(std::move(resultCollector)); diff --git a/src/processor/mapper/map_hash_join.cpp b/src/processor/mapper/map_hash_join.cpp index 52d23ddba1..a724237553 100644 --- a/src/processor/mapper/map_hash_join.cpp +++ b/src/processor/mapper/map_hash_join.cpp @@ -96,7 +96,7 @@ std::unique_ptr PlanMapper::mapLogicalHashJoinToPhysical( hashJoin->requireFlatProbeKeys(), probeDataInfo, std::move(probeSidePrevOperator), std::move(hashJoinBuild), getOperatorID(), paramsString); if (hashJoin->getSIP() == planner::SidewaysInfoPassing::PROBE_TO_BUILD) { - mapAccHashJoin(hashJoinProbe.get()); + mapSIPJoin(hashJoinProbe.get()); } return hashJoinProbe; } diff --git a/src/processor/mapper/map_intersect.cpp b/src/processor/mapper/map_intersect.cpp index 16b8a7c688..234e06b433 100644 --- a/src/processor/mapper/map_intersect.cpp +++ b/src/processor/mapper/map_intersect.cpp @@ -50,7 +50,7 @@ std::unique_ptr PlanMapper::mapLogicalIntersectToPhysical( auto intersect = make_unique(outputDataPos, intersectDataInfos, sharedStates, std::move(children), getOperatorID(), logicalIntersect->getExpressionsForPrinting()); if (logicalIntersect->getSIP() == SidewaysInfoPassing::PROBE_TO_BUILD) { - mapAccHashJoin(intersect.get()); + mapSIPJoin(intersect.get()); } return intersect; } diff --git a/src/processor/mapper/map_path_property_probe.cpp b/src/processor/mapper/map_path_property_probe.cpp new file mode 100644 index 0000000000..45f492a08b --- /dev/null +++ b/src/processor/mapper/map_path_property_probe.cpp @@ -0,0 +1,93 @@ +#include "common/string_utils.h" +#include "planner/logical_plan/logical_operator/logical_recursive_extend.h" +#include "processor/mapper/plan_mapper.h" +#include "processor/operator/hash_join/hash_join_build.h" +#include "processor/operator/recursive_extend/path_property_probe.h" + +using namespace kuzu::binder; +using namespace kuzu::planner; + +namespace kuzu { +namespace processor { + +static std::vector getColIdxToScan( + const expression_vector& payloads, uint32_t numKeys, const common::LogicalType& structType) { + std::unordered_map propertyNameToColumnIdx; + for (auto i = 0u; i < payloads.size(); ++i) { + assert(payloads[i]->expressionType == common::PROPERTY); + auto propertyName = ((PropertyExpression*)payloads[i].get())->getPropertyName(); + common::StringUtils::toUpper(propertyName); + propertyNameToColumnIdx.insert({propertyName, i + numKeys}); + } + auto nodeStructFields = common::StructType::getFields(&structType); + std::vector colIndicesToScan; + for (auto i = 1u; i < nodeStructFields.size(); ++i) { + auto field = nodeStructFields[i]; + colIndicesToScan.push_back(propertyNameToColumnIdx.at(field->getName())); + } + return colIndicesToScan; +} + +std::unique_ptr PlanMapper::mapLogicalPathPropertyProbeToPhysical( + planner::LogicalOperator* logicalOperator) { + auto logicalProbe = (LogicalPathPropertyProbe*)logicalOperator; + if (logicalProbe->getNumChildren() == 1) { + return mapLogicalOperatorToPhysical(logicalProbe->getChild(0)); + } + auto rel = logicalProbe->getRel(); + auto recursiveInfo = rel->getRecursiveInfo(); + // Map build node property + auto nodeBuildPrevOperator = mapLogicalOperatorToPhysical(logicalProbe->getChild(1)); + auto nodeBuildSchema = logicalProbe->getChild(1)->getSchema(); + auto nodeKeys = expression_vector{recursiveInfo->node->getInternalIDProperty()}; + auto nodePayloads = + ExpressionUtil::excludeExpressions(nodeBuildSchema->getExpressionsInScope(), nodeKeys); + auto nodeBuildInfo = createHashBuildInfo(*nodeBuildSchema, nodeKeys, nodePayloads); + auto nodeHashTable = std::make_unique( + *memoryManager, nodeBuildInfo->getNumKeys(), nodeBuildInfo->getTableSchema()->copy()); + auto nodeBuildSharedState = std::make_shared(std::move(nodeHashTable)); + auto nodeBuild = make_unique( + std::make_unique(nodeBuildSchema), nodeBuildSharedState, + std::move(nodeBuildInfo), std::move(nodeBuildPrevOperator), getOperatorID(), ""); + // Map build rel property + auto relBuildPrvOperator = mapLogicalOperatorToPhysical(logicalProbe->getChild(2)); + auto relBuildSchema = logicalProbe->getChild(2)->getSchema(); + auto relKeys = expression_vector{recursiveInfo->rel->getInternalIDProperty()}; + auto relPayloads = + ExpressionUtil::excludeExpressions(relBuildSchema->getExpressionsInScope(), relKeys); + auto relBuildInfo = createHashBuildInfo(*relBuildSchema, relKeys, relPayloads); + auto relHashTable = std::make_unique( + *memoryManager, relBuildInfo->getNumKeys(), relBuildInfo->getTableSchema()->copy()); + auto relBuildSharedState = std::make_shared(std::move(relHashTable)); + auto relBuild = std::make_unique( + std::make_unique(relBuildSchema), relBuildSharedState, + std::move(relBuildInfo), std::move(relBuildPrvOperator), getOperatorID(), ""); + // Map child + auto prevOperator = mapLogicalOperatorToPhysical(logicalOperator->getChild(0)); + // Map probe + auto relDataType = rel->getDataType(); + auto nodesField = common::StructType::getField(&relDataType, common::InternalKeyword::NODES); + auto nodeStructType = common::VarListType::getChildType(nodesField->getType()); + auto nodeColIndicesToScan = getColIdxToScan(nodePayloads, nodeKeys.size(), *nodeStructType); + auto relsField = common::StructType::getField(&relDataType, common::InternalKeyword::RELS); + auto relStructType = common::VarListType::getChildType(relsField->getType()); + auto relColIndicesToScan = getColIdxToScan(relPayloads, relKeys.size(), *relStructType); + auto pathPos = DataPos{logicalProbe->getSchema()->getExpressionPos(*rel)}; + auto pathProbeInfo = std::make_unique( + pathPos, std::move(nodeColIndicesToScan), std::move(relColIndicesToScan)); + auto pathProbeSharedState = + std::make_shared(nodeBuildSharedState, relBuildSharedState); + std::vector> children; + children.push_back(std::move(prevOperator)); + children.push_back(std::move(nodeBuild)); + children.push_back(std::move(relBuild)); + auto pathPropertyProbe = std::make_unique( + std::move(pathProbeInfo), pathProbeSharedState, std::move(children), getOperatorID(), ""); + if (logicalProbe->getSIP() == planner::SidewaysInfoPassing::PROBE_TO_BUILD) { + mapSIPJoin(pathPropertyProbe.get()); + } + return pathPropertyProbe; +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/mapper/map_recursive_extend.cpp b/src/processor/mapper/map_recursive_extend.cpp index b6ba71038e..311d57404f 100644 --- a/src/processor/mapper/map_recursive_extend.cpp +++ b/src/processor/mapper/map_recursive_extend.cpp @@ -1,10 +1,6 @@ -#include "common/string_utils.h" #include "planner/logical_plan/logical_operator/logical_recursive_extend.h" #include "processor/mapper/plan_mapper.h" -#include "processor/operator/hash_join/hash_join_build.h" -#include "processor/operator/recursive_extend/path_property_probe.h" #include "processor/operator/recursive_extend/recursive_join.h" -#include "processor/operator/table_scan/factorized_table_scan.h" using namespace kuzu::binder; using namespace kuzu::planner; @@ -22,24 +18,6 @@ static std::shared_ptr createSharedState( return std::make_shared(std::move(semiMasks)); } -static std::vector getColIdxToScan( - const expression_vector& payloads, uint32_t numKeys, const common::LogicalType& structType) { - std::unordered_map propertyNameToColumnIdx; - for (auto i = 0u; i < payloads.size(); ++i) { - assert(payloads[i]->expressionType == common::PROPERTY); - auto propertyName = ((PropertyExpression*)payloads[i].get())->getPropertyName(); - common::StringUtils::toUpper(propertyName); - propertyNameToColumnIdx.insert({propertyName, i + numKeys}); - } - auto nodeStructFields = common::StructType::getFields(&structType); - std::vector colIndicesToScan; - for (auto i = 1u; i < nodeStructFields.size(); ++i) { - auto field = nodeStructFields[i]; - colIndicesToScan.push_back(propertyNameToColumnIdx.at(field->getName())); - } - return colIndicesToScan; -} - std::unique_ptr PlanMapper::mapLogicalRecursiveExtendToPhysical( planner::LogicalOperator* logicalOperator) { auto extend = (LogicalRecursiveExtend*)logicalOperator; @@ -58,8 +36,6 @@ std::unique_ptr PlanMapper::mapLogicalRecursiveExtendToPhysica recursivePlanSchema->getExpressionPos(*recursiveInfo->node->getInternalIDProperty())); auto recursiveEdgeIDPos = DataPos( recursivePlanSchema->getExpressionPos(*recursiveInfo->rel->getInternalIDProperty())); - // Map child plan - auto prevOperator = mapLogicalOperatorToPhysical(logicalOperator->getChild(0)); // Generate RecursiveJoin auto outSchema = extend->getSchema(); auto inSchema = extend->getChild(0)->getSchema(); @@ -74,64 +50,11 @@ std::unique_ptr PlanMapper::mapLogicalRecursiveExtendToPhysica auto dataInfo = std::make_unique(boundNodeIDPos, nbrNodeIDPos, nbrNode->getTableIDsSet(), lengthPos, std::move(recursivePlanResultSetDescriptor), recursiveDstNodeIDPos, recursiveInfo->node->getTableIDsSet(), recursiveEdgeIDPos, pathPos); - auto recursiveJoin = std::make_unique(rel->getLowerBound(), rel->getUpperBound(), + auto prevOperator = mapLogicalOperatorToPhysical(logicalOperator->getChild(0)); + return std::make_unique(rel->getLowerBound(), rel->getUpperBound(), rel->getRelType(), extend->getJoinType(), sharedState, std::move(dataInfo), std::move(prevOperator), getOperatorID(), extend->getExpressionsForPrinting(), std::move(recursiveRoot)); - switch (extend->getJoinType()) { - case planner::RecursiveJoinType::TRACK_PATH: { - // Map build node property - auto nodeBuildPrevOperator = mapLogicalOperatorToPhysical(extend->getChild(1)); - auto nodeBuildSchema = extend->getChild(1)->getSchema(); - auto nodeKeys = expression_vector{recursiveInfo->node->getInternalIDProperty()}; - auto nodePayloads = - ExpressionUtil::excludeExpressions(nodeBuildSchema->getExpressionsInScope(), nodeKeys); - auto nodeBuildInfo = createHashBuildInfo(*nodeBuildSchema, nodeKeys, nodePayloads); - auto nodeHashTable = std::make_unique( - *memoryManager, nodeBuildInfo->getNumKeys(), nodeBuildInfo->getTableSchema()->copy()); - auto nodeBuildSharedState = std::make_shared(std::move(nodeHashTable)); - auto nodeBuild = make_unique( - std::make_unique(nodeBuildSchema), nodeBuildSharedState, - std::move(nodeBuildInfo), std::move(nodeBuildPrevOperator), getOperatorID(), ""); - // Map build rel property - auto relBuildPrvOperator = mapLogicalOperatorToPhysical(extend->getChild(2)); - auto relBuildSchema = extend->getChild(2)->getSchema(); - auto relKeys = expression_vector{recursiveInfo->rel->getInternalIDProperty()}; - auto relPayloads = - ExpressionUtil::excludeExpressions(relBuildSchema->getExpressionsInScope(), relKeys); - auto relBuildInfo = createHashBuildInfo(*relBuildSchema, relKeys, relPayloads); - auto relHashTable = std::make_unique( - *memoryManager, relBuildInfo->getNumKeys(), relBuildInfo->getTableSchema()->copy()); - auto relBuildSharedState = std::make_shared(std::move(relHashTable)); - auto relBuild = std::make_unique( - std::make_unique(relBuildSchema), relBuildSharedState, - std::move(relBuildInfo), std::move(relBuildPrvOperator), getOperatorID(), ""); - // Map probe - auto relDataType = rel->getDataType(); - auto nodesField = - common::StructType::getField(&relDataType, common::InternalKeyword::NODES); - auto nodeStructType = common::VarListType::getChildType(nodesField->getType()); - auto nodeColIndicesToScan = getColIdxToScan(nodePayloads, nodeKeys.size(), *nodeStructType); - auto relsField = common::StructType::getField(&relDataType, common::InternalKeyword::RELS); - auto relStructType = common::VarListType::getChildType(relsField->getType()); - auto relColIndicesToScan = getColIdxToScan(relPayloads, relKeys.size(), *relStructType); - auto pathProbeInfo = std::make_unique( - pathPos, std::move(nodeColIndicesToScan), std::move(relColIndicesToScan)); - auto pathProbeSharedState = std::make_shared( - nodeBuildSharedState, relBuildSharedState); - std::vector> children; - children.push_back(std::move(recursiveJoin)); - children.push_back(std::move(nodeBuild)); - children.push_back(std::move(relBuild)); - return std::make_unique(std::move(pathProbeInfo), pathProbeSharedState, - std::move(children), getOperatorID(), ""); - } - case planner::RecursiveJoinType::TRACK_NONE: { - return recursiveJoin; - } - default: - throw common::NotImplementedException("PlanMapper::mapLogicalRecursiveExtendToPhysical"); - } } } // namespace processor diff --git a/src/processor/mapper/map_semi_masker.cpp b/src/processor/mapper/map_semi_masker.cpp index fb2c78e8e0..d379b8afd2 100644 --- a/src/processor/mapper/map_semi_masker.cpp +++ b/src/processor/mapper/map_semi_masker.cpp @@ -11,16 +11,16 @@ namespace processor { std::unique_ptr PlanMapper::mapLogicalSemiMaskerToPhysical( LogicalOperator* logicalOperator) { - auto logicalSemiMasker = (LogicalSemiMasker*)logicalOperator; - auto inSchema = logicalSemiMasker->getChild(0)->getSchema(); + auto semiMasker = (LogicalSemiMasker*)logicalOperator; + auto inSchema = semiMasker->getChild(0)->getSchema(); auto prevOperator = mapLogicalOperatorToPhysical(logicalOperator->getChild(0)); - auto node = logicalSemiMasker->getNode(); - auto keyDataPos = DataPos(inSchema->getExpressionPos(*node->getInternalIDProperty())); - std::unordered_map> masksPerTable; + auto node = semiMasker->getNode(); + std::unordered_map> + masksPerTable; for (auto tableID : node->getTableIDs()) { - masksPerTable.insert({tableID, std::vector{}}); + masksPerTable.insert({tableID, std::vector{}}); } - for (auto& op : logicalSemiMasker->getOperators()) { + for (auto& op : semiMasker->getOperators()) { auto physicalOp = logicalOpToPhysicalOpMap.at(op); switch (physicalOp->getOperatorType()) { case PhysicalOperatorType::SCAN_NODE_ID: { @@ -32,13 +32,6 @@ std::unique_ptr PlanMapper::mapLogicalSemiMaskerToPhysical( tableState->getSemiMask(), 0 /* initial mask idx */); } } break; - case PhysicalOperatorType::PATH_PROPERTY_PROBE: { - auto recursiveJoin = (RecursiveJoin*)physicalOp->getChild(0); - for (auto& semiMask : recursiveJoin->getSharedState()->semiMasks) { - auto tableID = semiMask->getNodeTable()->getTableID(); - masksPerTable.at(tableID).emplace_back(semiMask.get(), 0 /* initial mask idx */); - } - } break; case PhysicalOperatorType::RECURSIVE_JOIN: { auto recursiveJoin = (RecursiveJoin*)physicalOp; for (auto& semiMask : recursiveJoin->getSharedState()->semiMasks) { @@ -50,14 +43,29 @@ std::unique_ptr PlanMapper::mapLogicalSemiMaskerToPhysical( throw common::NotImplementedException("PlanMapper::mapLogicalSemiMaskerToPhysical"); } } - if (node->isMultiLabeled()) { - return std::make_unique(keyDataPos, std::move(masksPerTable), - std::move(prevOperator), getOperatorID(), - logicalSemiMasker->getExpressionsForPrinting()); - } else { - return std::make_unique(keyDataPos, std::move(masksPerTable), - std::move(prevOperator), getOperatorID(), - logicalSemiMasker->getExpressionsForPrinting()); + auto keyPos = DataPos(inSchema->getExpressionPos(*semiMasker->getKey())); + auto info = std::make_unique(keyPos, std::move(masksPerTable)); + switch (semiMasker->getType()) { + case planner::SemiMaskType::NODE: { + if (node->isMultiLabeled()) { + return std::make_unique(std::move(info), std::move(prevOperator), + getOperatorID(), semiMasker->getExpressionsForPrinting()); + } else { + return std::make_unique(std::move(info), std::move(prevOperator), + getOperatorID(), semiMasker->getExpressionsForPrinting()); + } + } + case planner::SemiMaskType::PATH: { + if (node->isMultiLabeled()) { + return std::make_unique(std::move(info), + std::move(prevOperator), getOperatorID(), semiMasker->getExpressionsForPrinting()); + } else { + return std::make_unique(std::move(info), + std::move(prevOperator), getOperatorID(), semiMasker->getExpressionsForPrinting()); + } + } + default: + throw common::NotImplementedException("PlanMapper::mapLogicalSemiMaskerToPhysical"); } } diff --git a/src/processor/mapper/plan_mapper.cpp b/src/processor/mapper/plan_mapper.cpp index bb134b2442..31ee05269f 100644 --- a/src/processor/mapper/plan_mapper.cpp +++ b/src/processor/mapper/plan_mapper.cpp @@ -47,6 +47,9 @@ std::unique_ptr PlanMapper::mapLogicalOperatorToPhysical( case LogicalOperatorType::RECURSIVE_EXTEND: { physicalOperator = mapLogicalRecursiveExtendToPhysical(logicalOperator.get()); } break; + case LogicalOperatorType::PATH_PROPERTY_PROBE: { + physicalOperator = mapLogicalPathPropertyProbeToPhysical(logicalOperator.get()); + } break; case LogicalOperatorType::FLATTEN: { physicalOperator = mapLogicalFlattenToPhysical(logicalOperator.get()); } break; diff --git a/src/processor/operator/semi_masker.cpp b/src/processor/operator/semi_masker.cpp index fe7004fcc6..0c635954ce 100644 --- a/src/processor/operator/semi_masker.cpp +++ b/src/processor/operator/semi_masker.cpp @@ -6,7 +6,7 @@ namespace kuzu { namespace processor { void BaseSemiMasker::initGlobalStateInternal(ExecutionContext* context) { - for (auto& [table, masks] : masksPerTable) { + for (auto& [table, masks] : info->masksPerTable) { for (auto& maskWithIdx : masks) { auto maskIdx = maskWithIdx.first->getNumMasks(); assert(maskIdx < UINT8_MAX); @@ -17,9 +17,8 @@ void BaseSemiMasker::initGlobalStateInternal(ExecutionContext* context) { } void BaseSemiMasker::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { - keyValueVector = resultSet->getValueVector(keyDataPos); - assert(keyValueVector->dataType.getLogicalTypeID() == LogicalTypeID::INTERNAL_ID); - for (auto& [table, masks] : masksPerTable) { + keyVector = resultSet->getValueVector(info->keyPos).get(); + for (auto& [table, masks] : info->masksPerTable) { for (auto& maskWithIdx : masks) { maskWithIdx.first->init(transaction); } @@ -30,16 +29,15 @@ bool SingleTableSemiMasker::getNextTuplesInternal(ExecutionContext* context) { if (!children[0]->getNextTuple(context)) { return false; } - auto numValues = - keyValueVector->state->isFlat() ? 1 : keyValueVector->state->selVector->selectedSize; - for (auto i = 0u; i < numValues; i++) { - auto pos = keyValueVector->state->selVector->selectedPositions[i]; - auto nodeID = keyValueVector->getValue(pos); - for (auto [mask, maskerIdx] : masksPerTable.begin()->second) { + auto selVector = keyVector->state->selVector.get(); + for (auto i = 0u; i < selVector->selectedSize; i++) { + auto pos = selVector->selectedPositions[i]; + auto nodeID = keyVector->getValue(pos); + for (auto& [mask, maskerIdx] : info->getSingleTableMasks()) { mask->incrementMaskValue(nodeID.offset, maskerIdx); } } - metrics->numOutputTuple.increase(numValues); + metrics->numOutputTuple.increase(selVector->selectedSize); return true; } @@ -47,16 +45,57 @@ bool MultiTableSemiMasker::getNextTuplesInternal(ExecutionContext* context) { if (!children[0]->getNextTuple(context)) { return false; } - auto numValues = - keyValueVector->state->isFlat() ? 1 : keyValueVector->state->selVector->selectedSize; - for (auto i = 0u; i < numValues; i++) { - auto pos = keyValueVector->state->selVector->selectedPositions[i]; - auto nodeID = keyValueVector->getValue(pos); - for (auto [mask, maskerIdx] : masksPerTable.at(nodeID.tableID)) { + auto selVector = keyVector->state->selVector.get(); + for (auto i = 0u; i < selVector->selectedSize; i++) { + auto pos = selVector->selectedPositions[i]; + auto nodeID = keyVector->getValue(pos); + for (auto& [mask, maskerIdx] : info->getTableMasks(nodeID.tableID)) { mask->incrementMaskValue(nodeID.offset, maskerIdx); } } - metrics->numOutputTuple.increase(numValues); + metrics->numOutputTuple.increase(selVector->selectedSize); + return true; +} + +void PathSemiMasker::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { + BaseSemiMasker::initLocalStateInternal(resultSet, context); + auto pathNodesFieldIdx = + common::StructType::getFieldIdx(&keyVector->dataType, InternalKeyword::NODES); + pathNodesVector = StructVector::getFieldVector(keyVector, pathNodesFieldIdx).get(); + auto pathNodesDataVector = ListVector::getDataVector(pathNodesVector); + auto pathNodesIDFieldIdx = + StructType::getFieldIdx(&pathNodesDataVector->dataType, InternalKeyword::ID); + pathNodesIDDataVector = + StructVector::getFieldVector(pathNodesDataVector, pathNodesIDFieldIdx).get(); +} + +bool PathSingleTableSemiMasker::getNextTuplesInternal(ExecutionContext* context) { + if (!children[0]->getNextTuple(context)) { + return false; + } + auto size = ListVector::getDataVectorSize(pathNodesVector); + for (auto i = 0u; i < size; ++i) { + auto nodeID = pathNodesIDDataVector->getValue(i); + for (auto& [mask, maskerIdx] : info->getSingleTableMasks()) { + mask->incrementMaskValue(nodeID.offset, maskerIdx); + } + } + metrics->numOutputTuple.increase(size); + return true; +} + +bool PathMultipleTableSemiMasker::getNextTuplesInternal(ExecutionContext* context) { + if (!children[0]->getNextTuple(context)) { + return false; + } + auto size = ListVector::getDataVectorSize(pathNodesVector); + for (auto i = 0u; i < size; ++i) { + auto nodeID = pathNodesIDDataVector->getValue(i); + for (auto& [mask, maskerIdx] : info->getTableMasks(nodeID.tableID)) { + mask->incrementMaskValue(nodeID.offset, maskerIdx); + } + } + metrics->numOutputTuple.increase(size); return true; }