diff --git a/src/include/optimizer/acc_hash_join_optimizer.h b/src/include/optimizer/acc_hash_join_optimizer.h index 5c82aeaaca..1d4c3e4935 100644 --- a/src/include/optimizer/acc_hash_join_optimizer.h +++ b/src/include/optimizer/acc_hash_join_optimizer.h @@ -1,3 +1,5 @@ +#pragma once + #include "logical_operator_visitor.h" #include "planner/logical_plan/logical_plan.h" @@ -6,7 +8,7 @@ namespace optimizer { // This optimizer enables the Accumulated hash join algorithm as introduced in paper "Kuzu Graph // Database Management System". -class AccHashJoinOptimizer : public LogicalOperatorVisitor { +class HashJoinSIPOptimizer : public LogicalOperatorVisitor { public: void rewrite(planner::LogicalPlan* plan); @@ -22,16 +24,22 @@ class AccHashJoinOptimizer : public LogicalOperatorVisitor { bool isProbeSideQualified(planner::LogicalOperator* probeRoot); - binder::expression_map> resolveScanNodesToApplySemiMask( - const binder::expression_vector& nodeIDCandidates, - const std::vector& roots); - - std::shared_ptr applySemiMasks( - const binder::expression_map>& nodeIDToScanNodes, - std::shared_ptr root); - void applyAccHashJoin( - const binder::expression_map>& nodeIDToScanNodes, - planner::LogicalOperator* op); + std::vector resolveOperatorsToApplySemiMask( + const binder::Expression& nodeID, planner::LogicalOperator* root); + // Find all ScanNodeIDs under root which scans parameter nodeID. Note that there might be + // multiple ScanNodeIDs matches because both node and rel table scans will trigger scanNodeIDs. + std::vector resolveScanNodeIDsToApplySemiMask( + const binder::Expression& nodeID, planner::LogicalOperator* root); + // Find all ShortestPathExtend under root which extend to parameter nodeID. There will be at + // most one match because rel table is scanned exactly once. + std::vector resolveShortestPathExtendToApplySemiMask( + const binder::Expression& nodeID, planner::LogicalOperator* root); + + std::shared_ptr appendSemiMask( + std::shared_ptr nodeID, std::vector ops, + std::shared_ptr child); + std::shared_ptr appendAccumulate( + std::shared_ptr child); }; } // namespace optimizer diff --git a/src/include/optimizer/logical_operator_collector.h b/src/include/optimizer/logical_operator_collector.h index 29b2e21437..9057514d27 100644 --- a/src/include/optimizer/logical_operator_collector.h +++ b/src/include/optimizer/logical_operator_collector.h @@ -36,5 +36,10 @@ class LogicalIndexScanNodeCollector : public LogicalOperatorCollector { void visitIndexScanNode(planner::LogicalOperator* op) override { ops.push_back(op); } }; +class LogicalRecursiveExtendCollector : public LogicalOperatorCollector { +protected: + void visitRecursiveExtend(planner::LogicalOperator* op) override { ops.push_back(op); } +}; + } // namespace optimizer } // namespace kuzu diff --git a/src/include/planner/logical_plan/logical_operator/logical_semi_masker.h b/src/include/planner/logical_plan/logical_operator/logical_semi_masker.h index 62c352745a..faba5c71d5 100644 --- a/src/include/planner/logical_plan/logical_operator/logical_semi_masker.h +++ b/src/include/planner/logical_plan/logical_operator/logical_semi_masker.h @@ -2,36 +2,32 @@ #include "base_logical_operator.h" #include "binder/expression/node_expression.h" -#include "logical_scan_node.h" namespace kuzu { namespace planner { class LogicalSemiMasker : public LogicalOperator { public: - LogicalSemiMasker(std::shared_ptr nodeID, - std::vector scanNodes, std::shared_ptr child) + LogicalSemiMasker(std::shared_ptr node, + std::vector ops, std::shared_ptr child) : LogicalOperator{LogicalOperatorType::SEMI_MASKER, std::move(child)}, - nodeID{std::move(nodeID)}, scanNodes{std::move(scanNodes)} {} + node{std::move(node)}, ops{std::move(ops)} {} inline void computeFactorizedSchema() override { copyChildSchema(0); } inline void computeFlatSchema() override { copyChildSchema(0); } - inline std::string getExpressionsForPrinting() const override { return nodeID->toString(); } + inline std::string getExpressionsForPrinting() const override { return node->toString(); } - inline std::shared_ptr getNodeID() const { return nodeID; } - inline bool isMultiLabel() const { - return ((LogicalScanNode*)scanNodes[0])->getNode()->isMultiLabeled(); - } - inline std::vector getScanNodes() const { return scanNodes; } + inline std::shared_ptr getNode() const { return node; } + inline std::vector getOperators() const { return ops; } inline std::unique_ptr copy() override { - return make_unique(nodeID, scanNodes, children[0]->copy()); + return make_unique(node, ops, children[0]->copy()); } private: - std::shared_ptr nodeID; - std::vector scanNodes; + std::shared_ptr node; + std::vector ops; }; } // namespace planner diff --git a/src/include/processor/operator/mask.h b/src/include/processor/operator/mask.h new file mode 100644 index 0000000000..93f22fd81d --- /dev/null +++ b/src/include/processor/operator/mask.h @@ -0,0 +1,143 @@ +#pragma once + +#include + +#include "storage/store/node_table.h" + +namespace kuzu { +namespace processor { + +// Note: Classes in this file are NOT thread-safe. +struct MaskUtil { + static inline common::offset_t getMorselIdx(common::offset_t offset) { + return offset >> common::DEFAULT_VECTOR_CAPACITY_LOG_2; + } +}; + +struct MaskData { + uint8_t* data; + + explicit MaskData(uint64_t size) { + dataBuffer = std::make_unique(size); + data = dataBuffer.get(); + std::fill(data, data + size, 0); + } + + inline void setMask(uint64_t pos, uint8_t maskValue) { data[pos] = maskValue; } + inline bool isMasked(uint64_t pos, uint8_t trueMaskVal) { return data[pos] == trueMaskVal; } + +private: + std::unique_ptr dataBuffer; +}; + +// MaskCollection represents multiple mask on the same domain with AND semantic. +class MaskCollection { +public: + MaskCollection() : numMasks{0} {} + + inline void init(common::offset_t maxOffset) { + if (maskData != nullptr) { // MaskCollection might be initialized repeatedly. + return; + } + maskData = std::make_unique(maxOffset + 1); + } + + inline bool isMasked(common::offset_t offset) { return maskData->isMasked(offset, numMasks); } + // Increment mask value for the given nodeOffset if its current mask value is equal to + // the specified `currentMaskValue`. + inline void incrementMaskValue(common::offset_t offset, uint8_t currentMaskValue) { + if (maskData->isMasked(offset, currentMaskValue)) { + maskData->setMask(offset, currentMaskValue + 1); + } + } + + inline uint8_t getNumMasks() const { return numMasks; } + inline void incrementNumMasks() { numMasks++; } + +private: + std::unique_ptr maskData; + uint8_t numMasks; +}; + +class NodeSemiMask { +public: + NodeSemiMask(storage::NodeTable* nodeTable) : nodeTable{nodeTable} {} + + virtual void init(transaction::Transaction* trx) = 0; + + virtual void incrementMaskValue(common::offset_t nodeOffset, uint8_t currentMaskValue) = 0; + + virtual uint8_t getNumMasks() const = 0; + virtual void incrementNumMasks() = 0; + + inline bool isEnabled() { return getNumMasks() > 0; } + +protected: + storage::NodeTable* nodeTable; +}; + +class NodeOffsetSemiMask : public NodeSemiMask { +public: + NodeOffsetSemiMask(storage::NodeTable* nodeTable) : NodeSemiMask{nodeTable} { + offsetMask = std::make_unique(); + } + + inline void init(transaction::Transaction* trx) override { + offsetMask->init(nodeTable->getMaxNodeOffset(trx) + 1); + } + + inline void incrementMaskValue(common::offset_t nodeOffset, uint8_t currentMaskValue) override { + offsetMask->incrementMaskValue(nodeOffset, currentMaskValue); + } + + inline uint8_t getNumMasks() const override { return offsetMask->getNumMasks(); } + inline void incrementNumMasks() override { offsetMask->incrementNumMasks(); } + + inline bool isNodeMasked(common::offset_t nodeOffset) { + return offsetMask->isMasked(nodeOffset); + } + +private: + std::unique_ptr offsetMask; +}; + +class NodeOffsetAndMorselSemiMask : public NodeSemiMask { +public: + NodeOffsetAndMorselSemiMask(storage::NodeTable* nodeTable) : NodeSemiMask{nodeTable} { + offsetMask = std::make_unique(); + morselMask = std::make_unique(); + } + + inline void init(transaction::Transaction* trx) override { + auto maxNodeOffset = nodeTable->getMaxNodeOffset(trx); + offsetMask->init(maxNodeOffset + 1); + morselMask->init(MaskUtil::getMorselIdx(maxNodeOffset) + 1); + } + + // Note: blindly update mask does not parallelize well, so we minimize write by first checking + // if the mask is set to true (mask value is equal to the expected currentMaskValue) or not. + inline void incrementMaskValue(uint64_t nodeOffset, uint8_t currentMaskValue) override { + offsetMask->incrementMaskValue(nodeOffset, currentMaskValue); + morselMask->incrementMaskValue(MaskUtil::getMorselIdx(nodeOffset), currentMaskValue); + } + + inline uint8_t getNumMasks() const override { return offsetMask->getNumMasks(); } + inline void incrementNumMasks() override { + offsetMask->incrementNumMasks(); + morselMask->incrementNumMasks(); + } + + inline bool isMorselMasked(common::offset_t morselIdx) { + return morselMask->isMasked(morselIdx); + } + inline bool isNodeMasked(common::offset_t nodeOffset) { + return offsetMask->isMasked(nodeOffset); + } + +private: + std::unique_ptr offsetMask; + std::unique_ptr morselMask; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/physical_operator.h b/src/include/processor/operator/physical_operator.h index a5acbcd5ab..3b6def335c 100644 --- a/src/include/processor/operator/physical_operator.h +++ b/src/include/processor/operator/physical_operator.h @@ -39,7 +39,7 @@ enum class PhysicalOperatorType : uint8_t { RENAME_PROPERTY, RENAME_TABLE, RESULT_COLLECTOR, - SCAN_BFS_LEVEL, + SCAN_FRONTIER, SCAN_NODE_ID, SCAN_NODE_PROPERTY, SCAN_REL_PROPERTY, diff --git a/src/include/processor/operator/scan_node_id.h b/src/include/processor/operator/scan_node_id.h index 4c6eefc75b..a6859e54b3 100644 --- a/src/include/processor/operator/scan_node_id.h +++ b/src/include/processor/operator/scan_node_id.h @@ -1,101 +1,36 @@ #pragma once -#include - +#include "mask.h" #include "processor/operator/physical_operator.h" -#include "storage/store/node_table.h" namespace kuzu { namespace processor { -// Note: This class is not thread-safe. -struct Mask { -public: - explicit Mask(uint64_t size) { - data = std::make_unique(size); - std::fill(data.get(), data.get() + size, 0); - } - - inline void setMask(uint64_t pos, uint8_t maskValue) { data[pos] = maskValue; } - inline bool isMasked(uint64_t pos, uint8_t trueMaskVal) { return data[pos] == trueMaskVal; } - -private: - std::unique_ptr data; -}; - -// Note: This class is not thread-safe. -struct NodeTableSemiMask { -public: - NodeTableSemiMask() : numMaskers{0} {} - - inline void initializeMaskData(common::offset_t maxNodeOffset, common::offset_t maxMorselIdx) { - if (nodeMask != nullptr) { - // Multiple semi mask might be applied to the same sacn and thus initialize repeatedly. - return; - } - assert(morselMask == nullptr && maxNodeOffset != common::INVALID_NODE_OFFSET); - nodeMask = std::make_unique(maxNodeOffset + 1); - morselMask = std::make_unique(maxMorselIdx + 1); - } - - inline bool isMorselMasked(uint64_t morselIdx) { - return morselMask->isMasked(morselIdx, numMaskers); - } - inline bool isNodeMasked(uint64_t nodeOffset) { - return nodeMask->isMasked(nodeOffset, numMaskers); - } - - // Increment mask value for the given nodeOffset if its current mask value is equal to - // the specified `currentMaskValue`. - void incrementMaskValue(uint64_t nodeOffset, uint8_t currentMaskValue); - - inline uint8_t getNumMaskers() const { return numMaskers; } - inline void incrementNumMaskers() { numMaskers++; } - -private: - std::unique_ptr nodeMask; - std::unique_ptr morselMask; - uint8_t numMaskers; -}; - // Note: This class is not thread-safe. It relies on its caller to correctly synchronize its state. -class NodeTableState { +class NodeTableScanState { public: - explicit NodeTableState(storage::NodeTable* table) + explicit NodeTableScanState(storage::NodeTable* table) : table{table}, maxNodeOffset{common::INVALID_NODE_OFFSET}, maxMorselIdx{UINT64_MAX}, - currentNodeOffset{0} { - semiMask = std::make_unique(); - } + currentNodeOffset{0}, semiMask{std::make_unique(table)} {} inline storage::NodeTable* getTable() { return table; } inline void initializeMaxOffset(transaction::Transaction* transaction) { - if (maxNodeOffset != common::INVALID_NODE_OFFSET) { - // We might initialize twice because semi mask (which is on a different pipeline that - // execute beforehand) will also try to initialize. - return; - } maxNodeOffset = table->getMaxNodeOffset(transaction); - maxMorselIdx = maxNodeOffset >> common::DEFAULT_VECTOR_CAPACITY_LOG_2; + maxMorselIdx = MaskUtil::getMorselIdx(maxNodeOffset); } - inline void initSemiMask(transaction::Transaction* transaction) { - initializeMaxOffset(transaction); - semiMask->initializeMaskData(maxNodeOffset, maxMorselIdx); - } - inline bool isSemiMaskEnabled() { return semiMask->getNumMaskers() > 0; } - inline NodeTableSemiMask* getSemiMask() { return semiMask.get(); } - inline uint8_t getNumMaskers() const { return semiMask->getNumMaskers(); } - inline void incrementNumMaskers() { semiMask->incrementNumMaskers(); } + inline bool isSemiMaskEnabled() { return semiMask->isEnabled(); } + inline NodeOffsetAndMorselSemiMask* getSemiMask() { return semiMask.get(); } std::pair getNextRangeToRead(); private: storage::NodeTable* table; - uint64_t maxNodeOffset; - uint64_t maxMorselIdx; - uint64_t currentNodeOffset; - std::unique_ptr semiMask; + common::offset_t maxNodeOffset; + common::offset_t maxMorselIdx; + common::offset_t currentNodeOffset; + std::unique_ptr semiMask; }; class ScanNodeIDSharedState { @@ -103,22 +38,24 @@ class ScanNodeIDSharedState { ScanNodeIDSharedState() : currentStateIdx{0} {}; inline void addTableState(storage::NodeTable* table) { - tableStates.push_back(std::make_unique(table)); + tableStates.push_back(std::make_unique(table)); } inline uint32_t getNumTableStates() const { return tableStates.size(); } - inline NodeTableState* getTableState(uint32_t idx) const { return tableStates[idx].get(); } + inline NodeTableScanState* getTableState(uint32_t idx) const { return tableStates[idx].get(); } inline void initialize(transaction::Transaction* transaction) { + auto numMask = tableStates[0]->getSemiMask()->getNumMasks(); for (auto& tableState : tableStates) { + assert(tableState->getSemiMask()->getNumMasks() == numMask); tableState->initializeMaxOffset(transaction); } } - std::tuple getNextRangeToRead(); + std::tuple getNextRangeToRead(); private: std::mutex mtx; - std::vector> tableStates; + std::vector> tableStates; uint32_t currentStateIdx; }; @@ -147,7 +84,7 @@ class ScanNodeID : public PhysicalOperator { } void setSelVector( - NodeTableState* tableState, common::offset_t startOffset, common::offset_t endOffset); + NodeTableScanState* tableState, common::offset_t startOffset, common::offset_t endOffset); private: DataPos outDataPos; diff --git a/src/include/processor/operator/semi_masker.h b/src/include/processor/operator/semi_masker.h index 60e5c93e82..8104fc4aca 100644 --- a/src/include/processor/operator/semi_masker.h +++ b/src/include/processor/operator/semi_masker.h @@ -1,7 +1,7 @@ #pragma once +#include "processor/operator/mask.h" #include "processor/operator/physical_operator.h" -#include "processor/operator/scan_node_id.h" namespace kuzu { namespace processor { @@ -10,63 +10,56 @@ namespace processor { // to indicate the execution sequence of its pipeline. Also, the maskerIdx is used as a flag to // indicate if a value in the mask is masked or not, as each masker will increment the selected // value in the mask by 1. More details are described in NodeTableSemiMask. -using mask_and_idx_pair = std::pair; +using mask_with_idx = std::pair; class BaseSemiMasker : public PhysicalOperator { protected: - BaseSemiMasker(const DataPos& keyDataPos, std::vector scanStates, + BaseSemiMasker(const DataPos& keyDataPos, + std::unordered_map> masksPerTable, std::unique_ptr child, uint32_t id, const std::string& paramsString) : PhysicalOperator{PhysicalOperatorType::SEMI_MASKER, std::move(child), id, paramsString}, - keyDataPos{keyDataPos}, scanStates{std::move(scanStates)} {} + keyDataPos{keyDataPos}, masksPerTable{std::move(masksPerTable)} {} + + void initGlobalStateInternal(ExecutionContext* context) override; void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override; protected: DataPos keyDataPos; - std::vector scanStates; + std::unordered_map> masksPerTable; std::shared_ptr keyValueVector; }; class SingleTableSemiMasker : public BaseSemiMasker { public: - SingleTableSemiMasker(const DataPos& keyDataPos, std::vector scanStates, + SingleTableSemiMasker(const DataPos& keyDataPos, + std::unordered_map> masksPerTable, std::unique_ptr child, uint32_t id, const std::string& paramsString) - : BaseSemiMasker{keyDataPos, std::move(scanStates), std::move(child), id, paramsString} {} - - void initGlobalStateInternal(kuzu::processor::ExecutionContext* context) override; + : BaseSemiMasker{keyDataPos, std::move(masksPerTable), std::move(child), id, paramsString} { + } bool getNextTuplesInternal(ExecutionContext* context) override; inline std::unique_ptr clone() override { - auto result = std::make_unique( - keyDataPos, scanStates, children[0]->clone(), id, paramsString); - result->maskPerScan = maskPerScan; - return result; + return std::make_unique( + keyDataPos, masksPerTable, children[0]->clone(), id, paramsString); } - -private: - std::vector maskPerScan; }; class MultiTableSemiMasker : public BaseSemiMasker { public: - MultiTableSemiMasker(const DataPos& keyDataPos, std::vector scanStates, + MultiTableSemiMasker(const DataPos& keyDataPos, + std::unordered_map> masksPerTable, std::unique_ptr child, uint32_t id, const std::string& paramsString) - : BaseSemiMasker{keyDataPos, std::move(scanStates), std::move(child), id, paramsString} {} - - void initGlobalStateInternal(kuzu::processor::ExecutionContext* context) override; + : BaseSemiMasker{keyDataPos, std::move(masksPerTable), std::move(child), id, paramsString} { + } bool getNextTuplesInternal(ExecutionContext* context) override; inline std::unique_ptr clone() override { - auto result = std::make_unique( - keyDataPos, scanStates, children[0]->clone(), id, paramsString); - result->maskerPerLabelPerScan = maskerPerLabelPerScan; - return result; + return std::make_unique( + keyDataPos, masksPerTable, children[0]->clone(), id, paramsString); } - -private: - std::vector> maskerPerLabelPerScan; }; } // namespace processor diff --git a/src/include/processor/operator/var_length_extend/bfs_state.h b/src/include/processor/operator/var_length_extend/bfs_state.h index 315845d0b6..71f83b165c 100644 --- a/src/include/processor/operator/var_length_extend/bfs_state.h +++ b/src/include/processor/operator/var_length_extend/bfs_state.h @@ -1,13 +1,15 @@ #pragma once -#include "processor/operator/result_collector.h" +#include "processor/operator/mask.h" namespace kuzu { namespace processor { enum VisitedState : uint8_t { - NOT_VISITED = 0, - VISITED = 1, + NOT_VISITED_DST = 0, + VISITED_DST = 1, + NOT_VISITED = 2, + VISITED = 3, }; struct Frontier { @@ -25,7 +27,8 @@ struct BFSMorsel { std::unique_ptr nextFrontier; // Visited state - uint64_t numVisitedNodes; + uint64_t numDstNodes; + uint64_t numVisitedDstNodes; uint8_t* visitedNodes; uint8_t* distance; @@ -38,7 +41,7 @@ struct BFSMorsel { BFSMorsel(common::offset_t maxOffset_, uint8_t upperBound_); // Reset state for a new src node. - void resetState(); + void resetState(NodeOffsetSemiMask* semiMask); // If BFS has completed. bool isComplete(); // Mark src as visited. @@ -46,6 +49,8 @@ struct BFSMorsel { // UnMark src as NOT visited to avoid outputting src which has length 0 path and thus should be // omitted. void unmarkSrc(); + // Mark node as a dst visited. + void markVisitedDst(common::offset_t offset); // Mark node as visited. void markVisited(common::offset_t offset); diff --git a/src/include/processor/operator/var_length_extend/recursive_join.h b/src/include/processor/operator/var_length_extend/recursive_join.h index a150410139..361451d15d 100644 --- a/src/include/processor/operator/var_length_extend/recursive_join.h +++ b/src/include/processor/operator/var_length_extend/recursive_join.h @@ -2,6 +2,7 @@ #include "bfs_state.h" #include "processor/operator/physical_operator.h" +#include "processor/operator/result_collector.h" #include "storage/store/node_table.h" namespace kuzu { @@ -46,18 +47,41 @@ struct BFSScanState { } }; +struct RecursiveJoinSharedState { + std::shared_ptr inputFTableSharedState; + std::unique_ptr semiMask; + + RecursiveJoinSharedState( + std::shared_ptr inputFTableSharedState, storage::NodeTable* nodeTable) + : inputFTableSharedState{std::move(inputFTableSharedState)} { + semiMask = std::make_unique(nodeTable); + } +}; + class RecursiveJoin : public PhysicalOperator { public: RecursiveJoin(uint8_t upperBound, storage::NodeTable* nodeTable, - std::shared_ptr inputFTableSharedState, + std::shared_ptr sharedState, std::vector vectorsToScanPos, std::vector colIndicesToScan, const DataPos& srcNodeIDVectorPos, const DataPos& dstNodeIDVectorPos, const DataPos& distanceVectorPos, std::unique_ptr child, uint32_t id, const std::string& paramsString, std::unique_ptr root) - : PhysicalOperator{PhysicalOperatorType::SCAN_BFS_LEVEL, std::move(child), id, + : PhysicalOperator{PhysicalOperatorType::RECURSIVE_JOIN, std::move(child), id, paramsString}, - upperBound{upperBound}, nodeTable{nodeTable}, inputFTableSharedState{std::move( - inputFTableSharedState)}, + upperBound{upperBound}, nodeTable{nodeTable}, sharedState{std::move(sharedState)}, + vectorsToScanPos{std::move(vectorsToScanPos)}, colIndicesToScan{std::move( + colIndicesToScan)}, + srcNodeIDVectorPos{srcNodeIDVectorPos}, dstNodeIDVectorPos{dstNodeIDVectorPos}, + distanceVectorPos{distanceVectorPos}, root{std::move(root)}, bfsScanState{} {} + + RecursiveJoin(uint8_t upperBound, storage::NodeTable* nodeTable, + std::shared_ptr sharedState, + std::vector vectorsToScanPos, std::vector colIndicesToScan, + const DataPos& srcNodeIDVectorPos, const DataPos& dstNodeIDVectorPos, + const DataPos& distanceVectorPos, uint32_t id, const std::string& paramsString, + std::unique_ptr root) + : PhysicalOperator{PhysicalOperatorType::RECURSIVE_JOIN, id, paramsString}, + upperBound{upperBound}, nodeTable{nodeTable}, sharedState{std::move(sharedState)}, vectorsToScanPos{std::move(vectorsToScanPos)}, colIndicesToScan{std::move( colIndicesToScan)}, srcNodeIDVectorPos{srcNodeIDVectorPos}, dstNodeIDVectorPos{dstNodeIDVectorPos}, @@ -66,14 +90,16 @@ class RecursiveJoin : public PhysicalOperator { static inline DataPos getTmpSrcNodeVectorPos() { return DataPos{0, 0}; } static inline DataPos getTmpDstNodeVectorPos() { return DataPos{1, 0}; } + inline NodeSemiMask* getSemiMask() const { return sharedState->semiMask.get(); } + void initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) override; bool getNextTuplesInternal(ExecutionContext* context) override; std::unique_ptr clone() override { - return std::make_unique(upperBound, nodeTable, inputFTableSharedState, - vectorsToScanPos, colIndicesToScan, srcNodeIDVectorPos, dstNodeIDVectorPos, - distanceVectorPos, children[0]->clone(), id, paramsString, root->clone()); + return std::make_unique(upperBound, nodeTable, sharedState, vectorsToScanPos, + colIndicesToScan, srcNodeIDVectorPos, dstNodeIDVectorPos, distanceVectorPos, id, + paramsString, root->clone()); } private: @@ -89,8 +115,9 @@ class RecursiveJoin : public PhysicalOperator { private: uint8_t upperBound; + // TODO:remove storage::NodeTable* nodeTable; - std::shared_ptr inputFTableSharedState; + std::shared_ptr sharedState; std::vector vectorsToScanPos; std::vector colIndicesToScan; DataPos srcNodeIDVectorPos; diff --git a/src/optimizer/acc_hash_join_optimizer.cpp b/src/optimizer/acc_hash_join_optimizer.cpp index c3e8e4ed68..7a989a51cf 100644 --- a/src/optimizer/acc_hash_join_optimizer.cpp +++ b/src/optimizer/acc_hash_join_optimizer.cpp @@ -4,6 +4,7 @@ #include "planner/logical_plan/logical_operator/logical_accumulate.h" #include "planner/logical_plan/logical_operator/logical_hash_join.h" #include "planner/logical_plan/logical_operator/logical_intersect.h" +#include "planner/logical_plan/logical_operator/logical_recursive_extend.h" #include "planner/logical_plan/logical_operator/logical_scan_node.h" #include "planner/logical_plan/logical_operator/logical_semi_masker.h" @@ -12,11 +13,11 @@ using namespace kuzu::planner; namespace kuzu { namespace optimizer { -void AccHashJoinOptimizer::rewrite(planner::LogicalPlan* plan) { +void HashJoinSIPOptimizer::rewrite(planner::LogicalPlan* plan) { visitOperator(plan->getLastOperator().get()); } -void AccHashJoinOptimizer::visitOperator(planner::LogicalOperator* op) { +void HashJoinSIPOptimizer::visitOperator(planner::LogicalOperator* op) { // bottom up traversal for (auto i = 0u; i < op->getNumChildren(); ++i) { visitOperator(op->getChild(i).get()); @@ -24,14 +25,14 @@ void AccHashJoinOptimizer::visitOperator(planner::LogicalOperator* op) { visitOperatorSwitch(op); } -void AccHashJoinOptimizer::visitHashJoin(planner::LogicalOperator* op) { - if (tryProbeToBuildHJSIP(op)) { // Try probe to build SIP first. +void HashJoinSIPOptimizer::visitHashJoin(planner::LogicalOperator* op) { + if (tryBuildToProbeHJSIP(op)) { // Try build to probe SIP first. return; } - tryBuildToProbeHJSIP(op); + tryProbeToBuildHJSIP(op); } -bool AccHashJoinOptimizer::tryProbeToBuildHJSIP(planner::LogicalOperator* op) { +bool HashJoinSIPOptimizer::tryProbeToBuildHJSIP(planner::LogicalOperator* op) { auto hashJoin = (LogicalHashJoin*)op; if (hashJoin->getSIP() == planner::SidewaysInfoPassing::PROHIBIT_PROBE_TO_BUILD) { return false; @@ -39,15 +40,21 @@ bool AccHashJoinOptimizer::tryProbeToBuildHJSIP(planner::LogicalOperator* op) { if (!isProbeSideQualified(op->getChild(0).get())) { return false; } - std::vector buildRoots; - buildRoots.push_back(hashJoin->getChild(1).get()); - auto scanNodes = resolveScanNodesToApplySemiMask(hashJoin->getJoinNodeIDs(), buildRoots); - if (scanNodes.empty()) { + auto probeRoot = hashJoin->getChild(0); + auto buildRoot = hashJoin->getChild(1); + auto hasSemiMaskApplied = false; + for (auto& nodeID : hashJoin->getJoinNodeIDs()) { + auto ops = resolveOperatorsToApplySemiMask(*nodeID, buildRoot.get()); + if (!ops.empty()) { + probeRoot = appendSemiMask(nodeID, ops, probeRoot); + hasSemiMaskApplied = true; + } + } + if (!hasSemiMaskApplied) { return false; } - // apply accumulated hash join hashJoin->setSIP(SidewaysInfoPassing::PROBE_TO_BUILD); - applyAccHashJoin(scanNodes, op); + hashJoin->setChild(0, appendAccumulate(probeRoot)); return true; } @@ -62,7 +69,7 @@ static bool subPlanContainsFilter(LogicalOperator* root) { return true; } -bool AccHashJoinOptimizer::tryBuildToProbeHJSIP(planner::LogicalOperator* op) { +bool HashJoinSIPOptimizer::tryBuildToProbeHJSIP(planner::LogicalOperator* op) { auto hashJoin = (LogicalHashJoin*)op; if (hashJoin->getSIP() == planner::SidewaysInfoPassing::PROHIBIT_BUILD_TO_PROBE) { return false; @@ -73,18 +80,20 @@ bool AccHashJoinOptimizer::tryBuildToProbeHJSIP(planner::LogicalOperator* op) { if (!subPlanContainsFilter(hashJoin->getChild(1).get())) { return false; } - std::vector roots; - roots.push_back(hashJoin->getChild(0).get()); - auto scanNodes = resolveScanNodesToApplySemiMask(hashJoin->getJoinNodeIDs(), roots); - if (scanNodes.empty()) { - return false; + auto probeRoot = hashJoin->getChild(0); + auto buildRoot = hashJoin->getChild(1); + for (auto& nodeID : hashJoin->getJoinNodeIDs()) { + auto ops = resolveOperatorsToApplySemiMask(*nodeID, probeRoot.get()); + if (!ops.empty()) { + buildRoot = appendSemiMask(nodeID, ops, buildRoot); + } } hashJoin->setSIP(planner::SidewaysInfoPassing::BUILD_TO_PROBE); - hashJoin->setChild(1, applySemiMasks(scanNodes, op->getChild(1))); + hashJoin->setChild(1, buildRoot); return true; } -void AccHashJoinOptimizer::visitIntersect(planner::LogicalOperator* op) { +void HashJoinSIPOptimizer::visitIntersect(planner::LogicalOperator* op) { auto intersect = (LogicalIntersect*)op; if (intersect->getSIP() == planner::SidewaysInfoPassing::PROHIBIT_PROBE_TO_BUILD) { return; @@ -92,20 +101,30 @@ void AccHashJoinOptimizer::visitIntersect(planner::LogicalOperator* op) { if (!isProbeSideQualified(op->getChild(0).get())) { return; } - std::vector buildRoots; - for (auto i = 1; i < intersect->getNumChildren(); ++i) { - buildRoots.push_back(intersect->getChild(i).get()); + auto probeRoot = intersect->getChild(0); + auto hasSemiMaskApplied = false; + for (auto& nodeID : intersect->getKeyNodeIDs()) { + std::vector ops; + for (auto i = 1; i < intersect->getNumChildren(); ++i) { + auto buildRoot = intersect->getChild(1); + for (auto& op_ : resolveOperatorsToApplySemiMask(*nodeID, buildRoot.get())) { + ops.push_back(op_); + } + } + if (!ops.empty()) { + probeRoot = appendSemiMask(nodeID, ops, probeRoot); + hasSemiMaskApplied = true; + } } - auto scanNodes = resolveScanNodesToApplySemiMask(intersect->getKeyNodeIDs(), buildRoots); - if (scanNodes.empty()) { + if (!hasSemiMaskApplied) { return; } intersect->setSIP(SidewaysInfoPassing::PROBE_TO_BUILD); - applyAccHashJoin(scanNodes, op); + intersect->setChild(0, appendAccumulate(probeRoot)); } // Probe side is qualified if it is selective. -bool AccHashJoinOptimizer::isProbeSideQualified(planner::LogicalOperator* probeRoot) { +bool HashJoinSIPOptimizer::isProbeSideQualified(planner::LogicalOperator* probeRoot) { if (probeRoot->getOperatorType() == LogicalOperatorType::ACCUMULATE) { // No Acc hash join if probe side has already been accumulated. This can be solved. return false; @@ -114,54 +133,80 @@ bool AccHashJoinOptimizer::isProbeSideQualified(planner::LogicalOperator* probeR return subPlanContainsFilter(probeRoot); } -binder::expression_map> -AccHashJoinOptimizer::resolveScanNodesToApplySemiMask( - const binder::expression_vector& nodeIDCandidates, - const std::vector& roots) { - binder::expression_map> nodeIDToScanOperatorsMap; - for (auto& root : roots) { - auto scanNodesCollector = LogicalScanNodeCollector(); - scanNodesCollector.collect(root); - for (auto& op : scanNodesCollector.getOperators()) { - auto scanNode = (LogicalScanNode*)op; - auto nodeID = scanNode->getNode()->getInternalIDProperty(); - if (!nodeIDToScanOperatorsMap.contains(nodeID)) { - nodeIDToScanOperatorsMap.insert({nodeID, std::vector{}}); - } - nodeIDToScanOperatorsMap.at(nodeID).push_back(op); - } +std::vector HashJoinSIPOptimizer::resolveOperatorsToApplySemiMask( + const binder::Expression& nodeID, planner::LogicalOperator* root) { + std::vector result; + for (auto& op : resolveScanNodeIDsToApplySemiMask(nodeID, root)) { + result.push_back(op); } - // Match node ID candidate with scanNode operators. - binder::expression_map> result; - for (auto& nodeID : nodeIDCandidates) { - if (!nodeIDToScanOperatorsMap.contains(nodeID)) { - // No scan on the build side to push semi mask to. - continue; + for (auto& op : resolveShortestPathExtendToApplySemiMask(nodeID, root)) { + result.push_back(op); + } + return result; +} + +std::vector HashJoinSIPOptimizer::resolveScanNodeIDsToApplySemiMask( + const binder::Expression& nodeID, planner::LogicalOperator* root) { + std::vector result; + auto scanNodesCollector = LogicalScanNodeCollector(); + scanNodesCollector.collect(root); + for (auto& op : scanNodesCollector.getOperators()) { + auto scanNode = (LogicalScanNode*)op; + auto node = scanNode->getNode(); + if (nodeID.getUniqueName() == node->getInternalIDProperty()->getUniqueName()) { + result.push_back(op); } - result.insert({nodeID, nodeIDToScanOperatorsMap.at(nodeID)}); } return result; } -std::shared_ptr AccHashJoinOptimizer::applySemiMasks( - const binder::expression_map>& nodeIDToScanNodes, - std::shared_ptr root) { - auto currentRoot = root; - for (auto& [nodeID, scanNodes] : nodeIDToScanNodes) { - auto semiMasker = std::make_shared(nodeID, scanNodes, currentRoot); - semiMasker->computeFlatSchema(); - currentRoot = semiMasker; +std::vector +HashJoinSIPOptimizer::resolveShortestPathExtendToApplySemiMask( + const binder::Expression& nodeID, planner::LogicalOperator* root) { + std::vector result; + auto recursiveJoinCollector = LogicalRecursiveExtendCollector(); + recursiveJoinCollector.collect(root); + for (auto& op : recursiveJoinCollector.getOperators()) { + auto recursiveJoin = (LogicalRecursiveExtend*)op; + if (recursiveJoin->getRel()->getRelType() == common::QueryRelType::SHORTEST) { + auto node = recursiveJoin->getNbrNode(); + if (nodeID.getUniqueName() == node->getInternalIDProperty()->getUniqueName()) { + result.push_back(op); + return result; + } + } } - return currentRoot; + return result; +} + +std::shared_ptr HashJoinSIPOptimizer::appendSemiMask( + std::shared_ptr nodeID, std::vector ops, + std::shared_ptr child) { + assert(!ops.empty()); + auto op = ops[0]; + std::shared_ptr node; + switch (op->getOperatorType()) { + case LogicalOperatorType::SCAN_NODE: { + auto scanNode = (LogicalScanNode*)op; + node = scanNode->getNode(); + } break; + case LogicalOperatorType::RECURSIVE_EXTEND: { + auto recursiveExtend = (LogicalRecursiveExtend*)op; + node = recursiveExtend->getNbrNode(); + } break; + default: + throw common::NotImplementedException("HashJoinSIPOptimizer::appendSemiMask"); + } + auto semiMasker = std::make_shared(node, ops, child); + semiMasker->computeFlatSchema(); + return semiMasker; } -void AccHashJoinOptimizer::applyAccHashJoin( - const binder::expression_map>& nodeIDToScanNodes, - planner::LogicalOperator* op) { - auto currentRoot = applySemiMasks(nodeIDToScanNodes, op->getChild(0)); - auto accumulate = std::make_shared(std::move(currentRoot)); +std::shared_ptr HashJoinSIPOptimizer::appendAccumulate( + std::shared_ptr child) { + auto accumulate = std::make_shared(std::move(child)); accumulate->computeFlatSchema(); - op->setChild(0, std::move(accumulate)); + return accumulate; } } // namespace optimizer diff --git a/src/optimizer/optimizer.cpp b/src/optimizer/optimizer.cpp index 3aa46e62e2..f85d49b54b 100644 --- a/src/optimizer/optimizer.cpp +++ b/src/optimizer/optimizer.cpp @@ -22,8 +22,8 @@ void Optimizer::optimize(planner::LogicalPlan* plan) { filterPushDownOptimizer.rewrite(plan); // ASP optimizer should be applied after optimizers that manipulate hash join. - auto accHashJoinOptimizer = AccHashJoinOptimizer(); - accHashJoinOptimizer.rewrite(plan); + auto hashJoinSIPOptimizer = HashJoinSIPOptimizer(); + hashJoinSIPOptimizer.rewrite(plan); auto projectionPushDownOptimizer = ProjectionPushDownOptimizer(); projectionPushDownOptimizer.rewrite(plan); diff --git a/src/processor/mapper/CMakeLists.txt b/src/processor/mapper/CMakeLists.txt index 34435bd431..4a1ddb4658 100644 --- a/src/processor/mapper/CMakeLists.txt +++ b/src/processor/mapper/CMakeLists.txt @@ -21,6 +21,7 @@ add_library(kuzu_processor_mapper map_projection.cpp map_scan_node.cpp map_scan_node_property.cpp + map_semi_masker.cpp map_set.cpp map_skip.cpp map_union.cpp diff --git a/src/processor/mapper/map_acc_hash_join.cpp b/src/processor/mapper/map_acc_hash_join.cpp index 03487d0b6b..06ad9d2f57 100644 --- a/src/processor/mapper/map_acc_hash_join.cpp +++ b/src/processor/mapper/map_acc_hash_join.cpp @@ -1,7 +1,4 @@ -#include "planner/logical_plan/logical_operator/logical_semi_masker.h" #include "processor/mapper/plan_mapper.h" -#include "processor/operator/scan_node_id.h" -#include "processor/operator/semi_masker.h" #include "processor/operator/table_scan/factorized_table_scan.h" using namespace kuzu::planner; @@ -24,27 +21,5 @@ void PlanMapper::mapAccHashJoin(kuzu::processor::PhysicalOperator* probe) { probe->addChild(std::move(resultCollector)); } -std::unique_ptr PlanMapper::mapLogicalSemiMaskerToPhysical( - LogicalOperator* logicalOperator) { - auto logicalSemiMasker = (LogicalSemiMasker*)logicalOperator; - auto inSchema = logicalSemiMasker->getChild(0)->getSchema(); - auto prevOperator = mapLogicalOperatorToPhysical(logicalOperator->getChild(0)); - std::vector scanStates; - for (auto& op : logicalSemiMasker->getScanNodes()) { - auto physicalScanNode = (ScanNodeID*)logicalOpToPhysicalOpMap.at(op); - scanStates.push_back(physicalScanNode->getSharedState()); - } - auto keyDataPos = DataPos(inSchema->getExpressionPos(*logicalSemiMasker->getNodeID())); - if (logicalSemiMasker->isMultiLabel()) { - return std::make_unique(keyDataPos, std::move(scanStates), - std::move(prevOperator), getOperatorID(), - logicalSemiMasker->getExpressionsForPrinting()); - } else { - return std::make_unique(keyDataPos, std::move(scanStates), - std::move(prevOperator), getOperatorID(), - logicalSemiMasker->getExpressionsForPrinting()); - } -} - } // namespace processor } // namespace kuzu diff --git a/src/processor/mapper/map_extend.cpp b/src/processor/mapper/map_extend.cpp index 1f0105a0ef..ec11375f09 100644 --- a/src/processor/mapper/map_extend.cpp +++ b/src/processor/mapper/map_extend.cpp @@ -171,10 +171,11 @@ std::unique_ptr PlanMapper::mapLogicalRecursiveExtendToPhysica emptyPropertyIDs, tmpSrcNodePos, std::vector{tmpDstNodePos}, std::move(scanFrontier), getOperatorID(), emptyParamString); } - return std::make_unique(upperBound, nodeTable, sharedInputFTable, - outDataPoses, colIndicesToScan, inNodeIDVectorPos, outNodeIDVectorPos, - distanceVectorPos, std::move(resultCollector), getOperatorID(), - extend->getExpressionsForPrinting(), std::move(scanRelTable)); + auto sharedState = std::make_shared(sharedInputFTable, nodeTable); + return std::make_unique(upperBound, nodeTable, sharedState, outDataPoses, + colIndicesToScan, inNodeIDVectorPos, outNodeIDVectorPos, distanceVectorPos, + std::move(resultCollector), getOperatorID(), extend->getExpressionsForPrinting(), + std::move(scanRelTable)); } } diff --git a/src/processor/mapper/map_semi_masker.cpp b/src/processor/mapper/map_semi_masker.cpp new file mode 100644 index 0000000000..689735297e --- /dev/null +++ b/src/processor/mapper/map_semi_masker.cpp @@ -0,0 +1,56 @@ +#include "planner/logical_plan/logical_operator/logical_semi_masker.h" +#include "processor/mapper/plan_mapper.h" +#include "processor/operator/scan_node_id.h" +#include "processor/operator/semi_masker.h" +#include "processor/operator/var_length_extend/recursive_join.h" + +using namespace kuzu::planner; + +namespace kuzu { +namespace processor { + +std::unique_ptr PlanMapper::mapLogicalSemiMaskerToPhysical( + LogicalOperator* logicalOperator) { + auto logicalSemiMasker = (LogicalSemiMasker*)logicalOperator; + auto inSchema = logicalSemiMasker->getChild(0)->getSchema(); + auto prevOperator = mapLogicalOperatorToPhysical(logicalOperator->getChild(0)); + auto node = logicalSemiMasker->getNode(); + auto keyDataPos = DataPos(inSchema->getExpressionPos(*node->getInternalIDProperty())); + std::unordered_map> masksPerTable; + for (auto tableID : node->getTableIDs()) { + masksPerTable.insert({tableID, std::vector{}}); + } + for (auto& op : logicalSemiMasker->getOperators()) { + auto physicalOp = logicalOpToPhysicalOpMap.at(op); + switch (physicalOp->getOperatorType()) { + case PhysicalOperatorType::SCAN_NODE_ID: { + auto scanNodeID = (ScanNodeID*)physicalOp; + for (auto i = 0u; i < scanNodeID->getSharedState()->getNumTableStates(); ++i) { + auto tableState = scanNodeID->getSharedState()->getTableState(i); + auto tableID = tableState->getTable()->getTableID(); + masksPerTable.at(tableID).emplace_back(tableState->getSemiMask(), 0); + } + } break; + case PhysicalOperatorType::RECURSIVE_JOIN: { + auto recursiveJoin = (RecursiveJoin*)physicalOp; + assert(!node->isMultiLabeled()); + auto tableID = node->getSingleTableID(); + masksPerTable.at(tableID).emplace_back(recursiveJoin->getSemiMask(), 0); + } break; + default: + throw common::NotImplementedException("PlanMapper::mapLogicalSemiMaskerToPhysical"); + } + } + if (node->isMultiLabeled()) { + return std::make_unique(keyDataPos, std::move(masksPerTable), + std::move(prevOperator), getOperatorID(), + logicalSemiMasker->getExpressionsForPrinting()); + } else { + return std::make_unique(keyDataPos, std::move(masksPerTable), + std::move(prevOperator), getOperatorID(), + logicalSemiMasker->getExpressionsForPrinting()); + } +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/physical_operator.cpp b/src/processor/operator/physical_operator.cpp index 800a0735ab..9a338d8cc8 100644 --- a/src/processor/operator/physical_operator.cpp +++ b/src/processor/operator/physical_operator.cpp @@ -101,8 +101,8 @@ std::string PhysicalOperatorUtils::operatorTypeToString(PhysicalOperatorType ope case PhysicalOperatorType::RESULT_COLLECTOR: { return "RESULT_COLLECTOR"; } - case PhysicalOperatorType::SCAN_BFS_LEVEL: { - return "SCAN_BFS_LEVEL"; + case PhysicalOperatorType::SCAN_FRONTIER: { + return "SCAN_FRONTIER"; } case PhysicalOperatorType::SCAN_NODE_ID: { return "SCAN_NODE_ID"; diff --git a/src/processor/operator/scan_node_id.cpp b/src/processor/operator/scan_node_id.cpp index 5e0785a999..ff15fae30c 100644 --- a/src/processor/operator/scan_node_id.cpp +++ b/src/processor/operator/scan_node_id.cpp @@ -5,25 +5,13 @@ using namespace kuzu::common; namespace kuzu { namespace processor { -// Note: blindly update mask does not parallelize well, so we minimize write by first checking -// if the mask is set to true (mask value is equal to the expected currentMaskValue) or not. -void NodeTableSemiMask::incrementMaskValue(uint64_t nodeOffset, uint8_t currentMaskValue) { - if (nodeMask->isMasked(nodeOffset, currentMaskValue)) { - nodeMask->setMask(nodeOffset, currentMaskValue + 1); - } - auto morselIdx = nodeOffset >> DEFAULT_VECTOR_CAPACITY_LOG_2; - if (morselMask->isMasked(morselIdx, currentMaskValue)) { - morselMask->setMask(morselIdx, currentMaskValue + 1); - } -} - -std::pair NodeTableState::getNextRangeToRead() { +std::pair NodeTableScanState::getNextRangeToRead() { // Note: we use maxNodeOffset=UINT64_MAX to represent an empty table. if (currentNodeOffset > maxNodeOffset || maxNodeOffset == INVALID_NODE_OFFSET) { return std::make_pair(currentNodeOffset, currentNodeOffset); } if (isSemiMaskEnabled()) { - auto currentMorselIdx = currentNodeOffset >> DEFAULT_VECTOR_CAPACITY_LOG_2; + auto currentMorselIdx = MaskUtil::getMorselIdx(currentNodeOffset); assert(currentNodeOffset % DEFAULT_VECTOR_CAPACITY == 0); while (currentMorselIdx <= maxMorselIdx && !semiMask->isMorselMasked(currentMorselIdx)) { currentMorselIdx++; @@ -36,7 +24,7 @@ std::pair NodeTableState::getNextRangeToRead() { return std::make_pair(startOffset, startOffset + range); } -std::tuple ScanNodeIDSharedState::getNextRangeToRead() { +std::tuple ScanNodeIDSharedState::getNextRangeToRead() { std::unique_lock lck{mtx}; if (currentStateIdx == tableStates.size()) { return std::make_tuple(nullptr, INVALID_NODE_OFFSET, INVALID_NODE_OFFSET); @@ -80,7 +68,7 @@ bool ScanNodeID::getNextTuplesInternal(ExecutionContext* context) { } void ScanNodeID::setSelVector( - NodeTableState* tableState, offset_t startOffset, offset_t endOffset) { + NodeTableScanState* tableState, offset_t startOffset, offset_t endOffset) { if (tableState->isSemiMaskEnabled()) { outValueVector->state->selVector->resetSelectorToValuePosBuffer(); // Fill selected positions based on node mask for nodes between the given startOffset and diff --git a/src/processor/operator/semi_masker.cpp b/src/processor/operator/semi_masker.cpp index 4470452fc6..9f690c28c0 100644 --- a/src/processor/operator/semi_masker.cpp +++ b/src/processor/operator/semi_masker.cpp @@ -5,25 +5,24 @@ using namespace kuzu::common; namespace kuzu { namespace processor { +void BaseSemiMasker::initGlobalStateInternal(ExecutionContext* context) { + for (auto& [table, masks] : masksPerTable) { + for (auto& maskWithIdx : masks) { + auto maskIdx = maskWithIdx.first->getNumMasks(); + assert(maskIdx < UINT8_MAX); + maskWithIdx.first->incrementNumMasks(); + maskWithIdx.second = maskIdx; + } + } +} + void BaseSemiMasker::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { keyValueVector = resultSet->getValueVector(keyDataPos); assert(keyValueVector->dataType.typeID == INTERNAL_ID); -} - -static mask_and_idx_pair initSemiMaskForTableState( - NodeTableState* tableState, transaction::Transaction* trx) { - tableState->initSemiMask(trx); - auto maskerIdx = tableState->getNumMaskers(); - assert(maskerIdx < UINT8_MAX); - tableState->incrementNumMaskers(); - return std::make_pair(tableState->getSemiMask(), maskerIdx); -} - -void SingleTableSemiMasker::initGlobalStateInternal(kuzu::processor::ExecutionContext* context) { - for (auto& scanState : scanStates) { - assert(scanState->getNumTableStates() == 1); - auto tableState = scanState->getTableState(0); - maskPerScan.push_back(initSemiMaskForTableState(tableState, context->transaction)); + for (auto& [table, masks] : masksPerTable) { + for (auto& maskWithIdx : masks) { + maskWithIdx.first->init(transaction); + } } } @@ -33,10 +32,10 @@ bool SingleTableSemiMasker::getNextTuplesInternal(ExecutionContext* context) { } auto numValues = keyValueVector->state->isFlat() ? 1 : keyValueVector->state->selVector->selectedSize; - for (auto [mask, maskerIdx] : maskPerScan) { - for (auto i = 0u; i < numValues; i++) { - auto pos = keyValueVector->state->selVector->selectedPositions[i]; - auto nodeID = keyValueVector->getValue(pos); + for (auto i = 0u; i < numValues; i++) { + auto pos = keyValueVector->state->selVector->selectedPositions[i]; + auto nodeID = keyValueVector->getValue(pos); + for (auto [mask, maskerIdx] : masksPerTable.begin()->second) { mask->incrementMaskValue(nodeID.offset, maskerIdx); } } @@ -44,30 +43,16 @@ bool SingleTableSemiMasker::getNextTuplesInternal(ExecutionContext* context) { return true; } -void MultiTableSemiMasker::initGlobalStateInternal(kuzu::processor::ExecutionContext* context) { - for (auto& scanState : scanStates) { - assert(scanState->getNumTableStates() > 1); - std::unordered_map maskerPerLabel; - for (auto i = 0u; i < scanState->getNumTableStates(); ++i) { - auto tableState = scanState->getTableState(i); - maskerPerLabel.insert({tableState->getTable()->getTableID(), - initSemiMaskForTableState(tableState, context->transaction)}); - } - maskerPerLabelPerScan.push_back(std::move(maskerPerLabel)); - } -} - bool MultiTableSemiMasker::getNextTuplesInternal(ExecutionContext* context) { if (!children[0]->getNextTuple(context)) { return false; } auto numValues = keyValueVector->state->isFlat() ? 1 : keyValueVector->state->selVector->selectedSize; - for (auto& maskerPerLabel : maskerPerLabelPerScan) { - for (auto i = 0u; i < numValues; i++) { - auto pos = keyValueVector->state->selVector->selectedPositions[i]; - auto nodeID = keyValueVector->getValue(pos); - auto [mask, maskerIdx] = maskerPerLabel.at(nodeID.tableID); + for (auto i = 0u; i < numValues; i++) { + auto pos = keyValueVector->state->selVector->selectedPositions[i]; + auto nodeID = keyValueVector->getValue(pos); + for (auto [mask, maskerIdx] : masksPerTable.at(nodeID.tableID)) { mask->incrementMaskValue(nodeID.offset, maskerIdx); } } diff --git a/src/processor/operator/var_length_extend/bfs_state.cpp b/src/processor/operator/var_length_extend/bfs_state.cpp index 2b754259d9..2681499bf9 100644 --- a/src/processor/operator/var_length_extend/bfs_state.cpp +++ b/src/processor/operator/var_length_extend/bfs_state.cpp @@ -12,16 +12,29 @@ BFSMorsel::BFSMorsel(common::offset_t maxOffset_, uint8_t upperBound_) { visitedNodes = visitedNodesBuffer.get(); distanceBuffer = std::make_unique(maxOffset + 1 * sizeof(uint8_t)); distance = distanceBuffer.get(); - resetState(); } -void BFSMorsel::resetState() { +void BFSMorsel::resetState(NodeOffsetSemiMask* semiMask) { currentLevel = 0; nextNodeIdxToExtend = 0; currentFrontier->resetState(); nextFrontier->resetState(); - numVisitedNodes = 0; - std::fill(visitedNodes, visitedNodes + maxOffset + 1, (uint8_t)VisitedState::NOT_VISITED); + numDstNodes = 0; + numVisitedDstNodes = 0; + if (semiMask->isEnabled()) { + for (auto offset = 0u; offset < maxOffset + 1; ++offset) { + if (semiMask->isNodeMasked(offset)) { + visitedNodes[offset] = VisitedState::NOT_VISITED_DST; + numDstNodes++; + } else { + visitedNodes[offset] = VisitedState::NOT_VISITED; + } + } + } else { + std::fill( + visitedNodes, visitedNodes + maxOffset + 1, (uint8_t)VisitedState::NOT_VISITED_DST); + numDstNodes = maxOffset + 1; + } } bool BFSMorsel::isComplete() { @@ -31,30 +44,40 @@ bool BFSMorsel::isComplete() { if (currentLevel == upperBound) { // upper limit reached. return true; } - if (numVisitedNodes == maxOffset) { // all destinations have been reached. + if (numVisitedDstNodes == numDstNodes) { // all destinations have been reached. return true; } return false; } void BFSMorsel::markSrc(common::offset_t offset) { - visitedNodes[offset] = VISITED; - distance[offset] = 0; - numVisitedNodes++; + if (visitedNodes[offset] == NOT_VISITED_DST) { + visitedNodes[offset] = VISITED_DST; + distance[offset] = 0; + numVisitedDstNodes++; + } currentFrontier->nodeOffsets.push_back(offset); srcOffset = offset; } void BFSMorsel::unmarkSrc() { - visitedNodes[srcOffset] = NOT_VISITED; - numVisitedNodes--; + if (visitedNodes[srcOffset] == VISITED_DST) { + visitedNodes[srcOffset] = NOT_VISITED_DST; + numVisitedDstNodes--; + } +} + +void BFSMorsel::markVisitedDst(common::offset_t offset) { + assert(visitedNodes[offset] == NOT_VISITED_DST); + visitedNodes[offset] = VISITED_DST; + distance[offset] = currentLevel + 1; + numVisitedDstNodes++; + nextFrontier->nodeOffsets.push_back(offset); } void BFSMorsel::markVisited(common::offset_t offset) { assert(visitedNodes[offset] == NOT_VISITED); visitedNodes[offset] = VISITED; - distance[offset] = currentLevel + 1; - numVisitedNodes++; nextFrontier->nodeOffsets.push_back(offset); } diff --git a/src/processor/operator/var_length_extend/recursive_join.cpp b/src/processor/operator/var_length_extend/recursive_join.cpp index cec027d381..2b72df6234 100644 --- a/src/processor/operator/var_length_extend/recursive_join.cpp +++ b/src/processor/operator/var_length_extend/recursive_join.cpp @@ -20,6 +20,7 @@ void RecursiveJoin::initLocalStateInternal(ResultSet* resultSet_, ExecutionConte dstNodeIDVector = resultSet->getValueVector(dstNodeIDVectorPos); distanceVector = resultSet->getValueVector(distanceVectorPos); bfsMorsel = std::make_unique(maxNodeOffset, upperBound); + bfsMorsel->resetState(sharedState->semiMask.get()); initLocalRecursivePlan(context); bfsScanState.resetState(); } @@ -37,9 +38,9 @@ void RecursiveJoin::initLocalStateInternal(ResultSet* resultSet_, ExecutionConte // vector to output and returns true. Otherwise, we compute a new BFS. bool RecursiveJoin::getNextTuplesInternal(ExecutionContext* context) { while (true) { - if (bfsScanState.numScanned < bfsMorsel->numVisitedNodes) { // Phase 2 + if (bfsScanState.numScanned < bfsMorsel->numVisitedDstNodes) { // Phase 2 auto numToScan = std::min(common::DEFAULT_VECTOR_CAPACITY, - bfsMorsel->numVisitedNodes - bfsScanState.numScanned); + bfsMorsel->numVisitedDstNodes - bfsScanState.numScanned); scanDstNodes(numToScan); bfsScanState.numScanned += numToScan; return true; @@ -51,13 +52,13 @@ bool RecursiveJoin::getNextTuplesInternal(ExecutionContext* context) { } bool RecursiveJoin::computeBFS(ExecutionContext* context) { - auto inputFTableMorsel = inputFTableSharedState->getMorsel(1); + auto inputFTableMorsel = sharedState->inputFTableSharedState->getMorsel(1); if (inputFTableMorsel->numTuples == 0) { return false; } - inputFTableSharedState->getTable()->scan(vectorsToScan, inputFTableMorsel->startTupleIdx, - inputFTableMorsel->numTuples, colIndicesToScan); - bfsMorsel->resetState(); + sharedState->inputFTableSharedState->getTable()->scan(vectorsToScan, + inputFTableMorsel->startTupleIdx, inputFTableMorsel->numTuples, colIndicesToScan); + bfsMorsel->resetState(sharedState->semiMask.get()); auto nodeID = srcNodeIDVector->getValue( srcNodeIDVector->state->selVector->selectedPositions[0]); bfsMorsel->markSrc(nodeID.offset); @@ -84,7 +85,9 @@ void RecursiveJoin::updateVisitedState() { for (auto i = 0u; i < tmpDstNodeIDVector->state->selVector->selectedSize; ++i) { auto pos = tmpDstNodeIDVector->state->selVector->selectedPositions[i]; auto nodeID = tmpDstNodeIDVector->getValue(pos); - if (visitedNodes[nodeID.offset] == VisitedState::NOT_VISITED) { + if (visitedNodes[nodeID.offset] == VisitedState::NOT_VISITED_DST) { + bfsMorsel->markVisitedDst(nodeID.offset); + } else if (visitedNodes[nodeID.offset] == VisitedState::NOT_VISITED) { bfsMorsel->markVisited(nodeID.offset); } } @@ -121,7 +124,7 @@ void RecursiveJoin::initLocalRecursivePlan(ExecutionContext* context) { void RecursiveJoin::scanDstNodes(size_t sizeToScan) { auto size = 0; while (sizeToScan != size) { - if (bfsMorsel->visitedNodes[bfsScanState.currentOffset] == NOT_VISITED) { + if (bfsMorsel->visitedNodes[bfsScanState.currentOffset] != VISITED_DST) { bfsScanState.currentOffset++; continue; } diff --git a/test/test_files/shortest_path/bfs_sssp.test b/test/test_files/shortest_path/bfs_sssp.test index 817e60faf2..bf3a8160a8 100644 --- a/test/test_files/shortest_path/bfs_sssp.test +++ b/test/test_files/shortest_path/bfs_sssp.test @@ -1,16 +1,3 @@ --NAME SSPWithExtend --QUERY MATCH (c:person)<-[:knows* SHORTEST 1..30]-(a:person)-[r:knows* SHORTEST 1..30]->(b:person), (b)-[:knows]->(c) WHERE a.fName = 'Alice' AND b.ID < 6 AND c.ID > 5 RETURN a.fName, b.fName, c.fName ----- 1 -Alice|Bob|Elizabeth - --NAME MultiPart --QUERY MATCH (c:person)<-[:knows]-(a:person) WHERE a.fName = 'Alice' WITH a MATCH (a)-[r:knows* SHORTEST 1..30]->(b:person) WHERE b.ID > 6 RETURN b.fName, r._length, COUNT(*) ----- 4 -Elizabeth|2|3 -Farooq|3|3 -Greg|3|3 -Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|3|3 - -NAME SingleSourceAllDestinationsSSP -QUERY MATCH (a:person)-[r:knows* SHORTEST 1..30]->(b:person) WHERE a.fName = 'Alice' RETURN a.fName, b.fName, r._length ---- 7 @@ -79,3 +66,15 @@ Farooq|Dan|2 Farooq|Elizabeth|1 Farooq|Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|1 +-NAME SSPWithExtend +-QUERY MATCH (c:person)<-[:knows* SHORTEST 1..30]-(a:person)-[r:knows* SHORTEST 1..30]->(b:person), (b)-[:knows]->(c) WHERE a.fName = 'Alice' AND b.ID < 6 AND c.ID > 5 RETURN a.fName, b.fName, c.fName +---- 1 +Alice|Bob|Elizabeth + +-NAME MultiPart +-QUERY MATCH (c:person)<-[:knows]-(a:person) WHERE a.fName = 'Alice' WITH a MATCH (a)-[r:knows* SHORTEST 1..30]->(b:person) WHERE b.ID > 6 RETURN b.fName, r._length, COUNT(*) +---- 4 +Elizabeth|2|3 +Farooq|3|3 +Greg|3|3 +Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|3|3