diff --git a/src/include/planner/join_order_enumerator.h b/src/include/planner/join_order_enumerator.h index 581752e2d3..9be0b30c31 100644 --- a/src/include/planner/join_order_enumerator.h +++ b/src/include/planner/join_order_enumerator.h @@ -86,10 +86,7 @@ class JoinOrderEnumerator { std::shared_ptr nbrNode, std::shared_ptr rel, common::RelDirection direction, const binder::expression_vector& properties, LogicalPlan& plan); - void appendVariableLengthExtend(std::shared_ptr boundNode, - std::shared_ptr nbrNode, std::shared_ptr rel, - common::RelDirection direction, LogicalPlan& plan); - void appendShortestPathExtend(std::shared_ptr boundNode, + void appendRecursiveExtend(std::shared_ptr boundNode, std::shared_ptr nbrNode, std::shared_ptr rel, common::RelDirection direction, LogicalPlan& plan); diff --git a/src/include/planner/logical_plan/logical_operator/logical_recursive_extend.h b/src/include/planner/logical_plan/logical_operator/logical_recursive_extend.h index 67488ea40f..9c81a40ea9 100644 --- a/src/include/planner/logical_plan/logical_operator/logical_recursive_extend.h +++ b/src/include/planner/logical_plan/logical_operator/logical_recursive_extend.h @@ -5,8 +5,6 @@ namespace kuzu { namespace planner { -// TODO(Xiyang): we should have a single LogicalRecursiveExtend once we migrate VariableLengthExtend -// to use the same infrastructure as shortest path. class LogicalRecursiveExtend : public BaseLogicalExtend { public: LogicalRecursiveExtend(std::shared_ptr boundNode, @@ -17,42 +15,11 @@ class LogicalRecursiveExtend : public BaseLogicalExtend { f_group_pos_set getGroupsPosToFlatten() override; - void computeFlatSchema() override; -}; - -class LogicalVariableLengthExtend : public LogicalRecursiveExtend { -public: - LogicalVariableLengthExtend(std::shared_ptr boundNode, - std::shared_ptr nbrNode, std::shared_ptr rel, - common::RelDirection direction, bool hasAtMostOneNbr, - std::shared_ptr child) - : LogicalRecursiveExtend{std::move(boundNode), std::move(nbrNode), std::move(rel), - direction, std::move(child)}, - hasAtMostOneNbr{hasAtMostOneNbr} {} - - void computeFactorizedSchema() override; - - inline std::unique_ptr copy() override { - return std::make_unique( - boundNode, nbrNode, rel, direction, hasAtMostOneNbr, children[0]->copy()); - } - -private: - bool hasAtMostOneNbr; -}; - -class LogicalShortestPathExtend : public LogicalRecursiveExtend { -public: - LogicalShortestPathExtend(std::shared_ptr boundNode, - std::shared_ptr nbrNode, std::shared_ptr rel, - common::RelDirection direction, std::shared_ptr child) - : LogicalRecursiveExtend{std::move(boundNode), std::move(nbrNode), std::move(rel), - direction, std::move(child)} {} - void computeFactorizedSchema() override; + void computeFlatSchema() override; inline std::unique_ptr copy() override { - return std::make_unique( + return std::make_unique( boundNode, nbrNode, rel, direction, children[0]->copy()); } }; diff --git a/src/include/processor/operator/mask.h b/src/include/processor/operator/mask.h index 93f22fd81d..f648673db0 100644 --- a/src/include/processor/operator/mask.h +++ b/src/include/processor/operator/mask.h @@ -36,6 +36,7 @@ class MaskCollection { MaskCollection() : numMasks{0} {} inline void init(common::offset_t maxOffset) { + std::unique_lock lck{mtx}; if (maskData != nullptr) { // MaskCollection might be initialized repeatedly. return; } @@ -55,6 +56,7 @@ class MaskCollection { inline void incrementNumMasks() { numMasks++; } private: + std::mutex mtx; std::unique_ptr maskData; uint8_t numMasks; }; diff --git a/src/include/processor/operator/recursive_extend/bfs_state.h b/src/include/processor/operator/recursive_extend/bfs_state.h new file mode 100644 index 0000000000..772cefe69a --- /dev/null +++ b/src/include/processor/operator/recursive_extend/bfs_state.h @@ -0,0 +1,186 @@ +#pragma once + +#include "processor/operator/mask.h" + +namespace kuzu { +namespace processor { + +enum VisitedState : uint8_t { + NOT_VISITED_DST = 0, + VISITED_DST = 1, + NOT_VISITED = 2, + VISITED = 3, +}; + +struct Frontier { + std::vector nodeOffsets; + + Frontier() = default; + virtual ~Frontier() = default; + inline virtual void resetState() { nodeOffsets.clear(); } + inline virtual uint64_t getMultiplicity(common::offset_t offset) { return 1; } +}; + +struct FrontierWithMultiplicity : public Frontier { + // Multiplicity stands for number of paths that can reach an offset. + std::unordered_map offsetToMultiplicity; + + FrontierWithMultiplicity() : Frontier() {} + inline void resetState() override { + Frontier::resetState(); + offsetToMultiplicity.clear(); + } + inline uint64_t getMultiplicity(common::offset_t offset) override { + assert(offsetToMultiplicity.contains(offset)); + return offsetToMultiplicity.at(offset); + } + inline void addOffset(common::offset_t offset, uint64_t multiplicity) { + if (offsetToMultiplicity.contains(offset)) { + offsetToMultiplicity.at(offset) += multiplicity; + } else { + offsetToMultiplicity.insert({offset, multiplicity}); + nodeOffsets.push_back(offset); + } + } + inline bool contains(common::offset_t offset) const { + return offsetToMultiplicity.contains(offset); + } +}; + +struct BaseBFSMorsel { + // Static information + common::offset_t maxOffset; + uint8_t lowerBound; + uint8_t upperBound; + // Level state + uint8_t currentLevel; + uint64_t nextNodeIdxToExtend; // next node to extend from current frontier. + std::unique_ptr currentFrontier; + std::unique_ptr nextFrontier; + // Target information. + // Target dst nodes are populated from semi mask and is expected to have small size. + // TargetDstNodeOffsets is empty if no semi mask available. Note that at the end of BFS, we may + // not visit all target dst nodes because they may simply not connect to src. + uint64_t numTargetDstNodes; + std::vector targetDstNodeOffsets; + + explicit BaseBFSMorsel(common::offset_t maxOffset, uint8_t lowerBound, uint8_t upperBound, + NodeOffsetSemiMask* semiMask) + : maxOffset{maxOffset}, lowerBound{lowerBound}, upperBound{upperBound}, currentLevel{0}, + nextNodeIdxToExtend{0}, numTargetDstNodes{0} { + if (semiMask->isEnabled()) { + for (auto offset = 0u; offset < maxOffset + 1; ++offset) { + if (semiMask->isNodeMasked(offset)) { + targetDstNodeOffsets.push_back(offset); + } + } + } + } + virtual ~BaseBFSMorsel() = default; + + // Get next node offset to extend from current level. + common::offset_t getNextNodeOffset(); + + virtual void resetState(); + virtual bool isComplete() = 0; + virtual void markSrc(common::offset_t offset) = 0; + virtual void markVisited(common::offset_t offset, uint64_t multiplicity) = 0; + virtual void finalizeCurrentLevel() = 0; + +protected: + inline bool isCurrentFrontierEmpty() const { return currentFrontier->nodeOffsets.empty(); } + inline bool isUpperBoundReached() const { return currentLevel == upperBound; } + inline bool isAllDstTarget() const { return targetDstNodeOffsets.empty(); } + void moveNextLevelAsCurrentLevel(); + virtual std::unique_ptr createFrontier() = 0; +}; + +struct ShortestPathBFSMorsel : public BaseBFSMorsel { + // Visited state + uint64_t numVisitedDstNodes; + uint8_t* visitedNodes; + // Results + std::vector dstNodeOffsets; + std::unordered_map dstNodeOffset2PathLength; + + ShortestPathBFSMorsel(common::offset_t maxOffset, uint8_t lowerBound, uint8_t upperBound, + NodeOffsetSemiMask* semiMask) + : BaseBFSMorsel{maxOffset, lowerBound, upperBound, semiMask}, numVisitedDstNodes{0} { + currentFrontier = std::make_unique(); + nextFrontier = std::make_unique(); + visitedNodesBuffer = std::make_unique(maxOffset + 1 * sizeof(uint8_t)); + visitedNodes = visitedNodesBuffer.get(); + } + + inline bool isComplete() override { + return isCurrentFrontierEmpty() || isUpperBoundReached() || isAllDstReached(); + } + inline void resetState() override { + BaseBFSMorsel::resetState(); + resetVisitedState(); + } + void markSrc(common::offset_t offset) override; + void markVisited(common::offset_t offset, uint64_t multiplicity) override; + inline void finalizeCurrentLevel() override { moveNextLevelAsCurrentLevel(); } + +private: + inline bool isAllDstReached() const { return numVisitedDstNodes == numTargetDstNodes; } + void resetVisitedState(); + inline std::unique_ptr createFrontier() override { + return std::make_unique(); + } + +private: + std::unique_ptr visitedNodesBuffer; +}; + +struct VariableLengthBFSMorsel : public BaseBFSMorsel { + // Results + std::vector dstNodeOffsets; + std::unordered_map dstNodeOffset2NumPath; + + explicit VariableLengthBFSMorsel(common::offset_t maxOffset, uint8_t lowerBound, + uint8_t upperBound, NodeOffsetSemiMask* semiMask) + : BaseBFSMorsel{maxOffset, lowerBound, upperBound, semiMask} { + currentFrontier = std::make_unique(); + nextFrontier = std::make_unique(); + } + + inline void resetState() override { + BaseBFSMorsel::resetState(); + resetNumPath(); + } + inline bool isComplete() override { return isCurrentFrontierEmpty() || isUpperBoundReached(); } + inline void markSrc(common::offset_t offset) override { + ((FrontierWithMultiplicity&)*currentFrontier).addOffset(offset, 1); + } + inline void markVisited(common::offset_t offset, uint64_t multiplicity) override { + ((FrontierWithMultiplicity&)*nextFrontier).addOffset(offset, multiplicity); + } + inline void finalizeCurrentLevel() override { + moveNextLevelAsCurrentLevel(); + updateNumPathFromCurrentFrontier(); + } + +private: + inline void resetNumPath() { + dstNodeOffsets.clear(); + dstNodeOffset2NumPath.clear(); + numTargetDstNodes = isAllDstTarget() ? maxOffset + 1 : targetDstNodeOffsets.size(); + } + inline void updateNumPath(common::offset_t offset, uint64_t numPath) { + if (!dstNodeOffset2NumPath.contains(offset)) { + dstNodeOffsets.push_back(offset); + dstNodeOffset2NumPath.insert({offset, numPath}); + } else { + dstNodeOffset2NumPath.at(offset) += numPath; + } + } + void updateNumPathFromCurrentFrontier(); + inline std::unique_ptr createFrontier() override { + return std::make_unique(); + } +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/var_length_extend/recursive_join.h b/src/include/processor/operator/recursive_extend/recursive_join.h similarity index 58% rename from src/include/processor/operator/var_length_extend/recursive_join.h rename to src/include/processor/operator/recursive_extend/recursive_join.h index 361451d15d..c293c06228 100644 --- a/src/include/processor/operator/var_length_extend/recursive_join.h +++ b/src/include/processor/operator/recursive_extend/recursive_join.h @@ -37,16 +37,6 @@ class ScanFrontier : public PhysicalOperator { bool hasExecuted; }; -struct BFSScanState { - common::offset_t currentOffset; - size_t numScanned; - - inline void resetState() { - currentOffset = 0; - numScanned = 0; - } -}; - struct RecursiveJoinSharedState { std::shared_ptr inputFTableSharedState; std::unique_ptr semiMask; @@ -58,37 +48,38 @@ struct RecursiveJoinSharedState { } }; -class RecursiveJoin : public PhysicalOperator { +class BaseRecursiveJoin : public PhysicalOperator { public: - RecursiveJoin(uint8_t upperBound, storage::NodeTable* nodeTable, + BaseRecursiveJoin(uint8_t lowerBound, uint8_t upperBound, storage::NodeTable* nodeTable, std::shared_ptr sharedState, std::vector vectorsToScanPos, std::vector colIndicesToScan, const DataPos& srcNodeIDVectorPos, const DataPos& dstNodeIDVectorPos, - const DataPos& distanceVectorPos, std::unique_ptr child, uint32_t id, - const std::string& paramsString, std::unique_ptr root) + const DataPos& tmpDstNodeIDVectorPos, std::unique_ptr child, uint32_t id, + const std::string& paramsString, std::unique_ptr recursiveRoot) : PhysicalOperator{PhysicalOperatorType::RECURSIVE_JOIN, std::move(child), id, paramsString}, - upperBound{upperBound}, nodeTable{nodeTable}, sharedState{std::move(sharedState)}, - vectorsToScanPos{std::move(vectorsToScanPos)}, colIndicesToScan{std::move( - colIndicesToScan)}, - srcNodeIDVectorPos{srcNodeIDVectorPos}, dstNodeIDVectorPos{dstNodeIDVectorPos}, - distanceVectorPos{distanceVectorPos}, root{std::move(root)}, bfsScanState{} {} + lowerBound{lowerBound}, upperBound{upperBound}, nodeTable{nodeTable}, + sharedState{std::move(sharedState)}, vectorsToScanPos{std::move(vectorsToScanPos)}, + colIndicesToScan{std::move(colIndicesToScan)}, srcNodeIDVectorPos{srcNodeIDVectorPos}, + dstNodeIDVectorPos{dstNodeIDVectorPos}, tmpDstNodeIDVectorPos{tmpDstNodeIDVectorPos}, + recursiveRoot{std::move(recursiveRoot)}, outputCursor{0} {} - RecursiveJoin(uint8_t upperBound, storage::NodeTable* nodeTable, + BaseRecursiveJoin(uint8_t lowerBound, uint8_t upperBound, storage::NodeTable* nodeTable, std::shared_ptr sharedState, std::vector vectorsToScanPos, std::vector colIndicesToScan, const DataPos& srcNodeIDVectorPos, const DataPos& dstNodeIDVectorPos, - const DataPos& distanceVectorPos, uint32_t id, const std::string& paramsString, - std::unique_ptr root) + const DataPos& tmpDstNodeIDVectorPos, uint32_t id, const std::string& paramsString, + std::unique_ptr recursiveRoot) : PhysicalOperator{PhysicalOperatorType::RECURSIVE_JOIN, id, paramsString}, - upperBound{upperBound}, nodeTable{nodeTable}, sharedState{std::move(sharedState)}, - vectorsToScanPos{std::move(vectorsToScanPos)}, colIndicesToScan{std::move( - colIndicesToScan)}, - srcNodeIDVectorPos{srcNodeIDVectorPos}, dstNodeIDVectorPos{dstNodeIDVectorPos}, - distanceVectorPos{distanceVectorPos}, root{std::move(root)}, bfsScanState{} {} + lowerBound{lowerBound}, upperBound{upperBound}, nodeTable{nodeTable}, + sharedState{std::move(sharedState)}, vectorsToScanPos{std::move(vectorsToScanPos)}, + colIndicesToScan{std::move(colIndicesToScan)}, srcNodeIDVectorPos{srcNodeIDVectorPos}, + dstNodeIDVectorPos{dstNodeIDVectorPos}, tmpDstNodeIDVectorPos{tmpDstNodeIDVectorPos}, + recursiveRoot{std::move(recursiveRoot)}, outputCursor{0} {} + + virtual ~BaseRecursiveJoin() = default; static inline DataPos getTmpSrcNodeVectorPos() { return DataPos{0, 0}; } - static inline DataPos getTmpDstNodeVectorPos() { return DataPos{1, 0}; } inline NodeSemiMask* getSemiMask() const { return sharedState->semiMask.get(); } @@ -96,50 +87,44 @@ class RecursiveJoin : public PhysicalOperator { bool getNextTuplesInternal(ExecutionContext* context) override; - std::unique_ptr clone() override { - return std::make_unique(upperBound, nodeTable, sharedState, vectorsToScanPos, - colIndicesToScan, srcNodeIDVectorPos, dstNodeIDVectorPos, distanceVectorPos, id, - paramsString, root->clone()); - } + std::unique_ptr clone() override = 0; private: - static std::unique_ptr getLocalResultSet(); + std::unique_ptr getLocalResultSet(); void initLocalRecursivePlan(ExecutionContext* context); + + virtual bool scanOutput() = 0; + // Compute BFS for a given src node. - bool computeBFS(ExecutionContext* context); - // Mark un-visited node as visited. - void updateVisitedState(); + void computeBFS(ExecutionContext* context); - void scanDstNodes(size_t sizeToScan); + void updateVisitedNodes(uint64_t multiplicity); -private: +protected: + uint8_t lowerBound; uint8_t upperBound; - // TODO:remove storage::NodeTable* nodeTable; std::shared_ptr sharedState; std::vector vectorsToScanPos; std::vector colIndicesToScan; DataPos srcNodeIDVectorPos; DataPos dstNodeIDVectorPos; - DataPos distanceVectorPos; + DataPos tmpDstNodeIDVectorPos; // Local recursive plan std::unique_ptr localResultSet; - std::unique_ptr root; + std::unique_ptr recursiveRoot; ScanFrontier* scanBFSLevel; - std::unique_ptr bfsMorsel; - - common::offset_t maxNodeOffset; + // Vectors std::vector vectorsToScan; std::shared_ptr srcNodeIDVector; std::shared_ptr dstNodeIDVector; - std::shared_ptr distanceVector; - // vector for temporary recursive join result. - std::shared_ptr tmpDstNodeIDVector; + std::shared_ptr tmpDstNodeIDVector; // temporary recursive join result. - BFSScanState bfsScanState; + std::unique_ptr bfsMorsel; + size_t outputCursor; }; } // namespace processor diff --git a/src/include/processor/operator/recursive_extend/shortest_path_recursive_join.h b/src/include/processor/operator/recursive_extend/shortest_path_recursive_join.h new file mode 100644 index 0000000000..c796f82687 --- /dev/null +++ b/src/include/processor/operator/recursive_extend/shortest_path_recursive_join.h @@ -0,0 +1,53 @@ +#pragma once + +#include "recursive_join.h" + +namespace kuzu { +namespace processor { + +class ShortestPathRecursiveJoin : public BaseRecursiveJoin { +public: + ShortestPathRecursiveJoin(uint8_t lowerBound, uint8_t upperBound, storage::NodeTable* nodeTable, + std::shared_ptr sharedState, + std::vector vectorsToScanPos, std::vector colIndicesToScan, + const DataPos& srcNodeIDVectorPos, const DataPos& dstNodeIDVectorPos, + const DataPos& distanceVectorPos, const DataPos& tmpDstNodeIDVectorPos, + std::unique_ptr child, uint32_t id, const std::string& paramsString, + std::unique_ptr recursiveRoot) + : BaseRecursiveJoin{lowerBound, upperBound, nodeTable, std::move(sharedState), + std::move(vectorsToScanPos), std::move(colIndicesToScan), srcNodeIDVectorPos, + dstNodeIDVectorPos, tmpDstNodeIDVectorPos, std::move(child), id, paramsString, + std::move(recursiveRoot)}, + distanceVectorPos{distanceVectorPos} {} + + ShortestPathRecursiveJoin(uint8_t lowerBound, uint8_t upperBound, storage::NodeTable* nodeTable, + std::shared_ptr sharedState, + std::vector vectorsToScanPos, std::vector colIndicesToScan, + const DataPos& srcNodeIDVectorPos, const DataPos& dstNodeIDVectorPos, + const DataPos& distanceVectorPos, const DataPos& tmpDstNodeIDVectorPos, uint32_t id, + const std::string& paramsString, std::unique_ptr recursiveRoot) + : BaseRecursiveJoin{lowerBound, upperBound, nodeTable, std::move(sharedState), + std::move(vectorsToScanPos), std::move(colIndicesToScan), srcNodeIDVectorPos, + dstNodeIDVectorPos, tmpDstNodeIDVectorPos, id, paramsString, + std::move(recursiveRoot)}, + distanceVectorPos{distanceVectorPos} {} + + void initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) override; + + inline std::unique_ptr clone() override { + return std::make_unique(lowerBound, upperBound, nodeTable, + sharedState, vectorsToScanPos, colIndicesToScan, srcNodeIDVectorPos, dstNodeIDVectorPos, + distanceVectorPos, tmpDstNodeIDVectorPos, id, paramsString, recursiveRoot->clone()); + } + +private: + bool scanOutput() override; + +private: + DataPos distanceVectorPos; + + std::shared_ptr distanceVector; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/recursive_extend/variable_length_recursive_join.h b/src/include/processor/operator/recursive_extend/variable_length_recursive_join.h new file mode 100644 index 0000000000..1943c6076b --- /dev/null +++ b/src/include/processor/operator/recursive_extend/variable_length_recursive_join.h @@ -0,0 +1,45 @@ +#pragma once + +#include "recursive_join.h" + +namespace kuzu { +namespace processor { + +class VariableLengthRecursiveJoin : public BaseRecursiveJoin { +public: + VariableLengthRecursiveJoin(uint8_t lowerBound, uint8_t upperBound, + storage::NodeTable* nodeTable, std::shared_ptr sharedState, + std::vector vectorsToScanPos, std::vector colIndicesToScan, + const DataPos& srcNodeIDVectorPos, const DataPos& dstNodeIDVectorPos, + const DataPos& tmpDstNodeIDVectorPos, std::unique_ptr child, uint32_t id, + const std::string& paramsString, std::unique_ptr recursiveRoot) + : BaseRecursiveJoin{lowerBound, upperBound, nodeTable, std::move(sharedState), + std::move(vectorsToScanPos), std::move(colIndicesToScan), srcNodeIDVectorPos, + dstNodeIDVectorPos, tmpDstNodeIDVectorPos, std::move(child), id, paramsString, + std::move(recursiveRoot)} {} + + VariableLengthRecursiveJoin(uint8_t lowerBound, uint8_t upperBound, + storage::NodeTable* nodeTable, std::shared_ptr sharedState, + std::vector vectorsToScanPos, std::vector colIndicesToScan, + const DataPos& srcNodeIDVectorPos, const DataPos& dstNodeIDVectorPos, + const DataPos& tmpDstNodeIDVectorPos, uint32_t id, const std::string& paramsString, + std::unique_ptr recursiveRoot) + : BaseRecursiveJoin{lowerBound, upperBound, nodeTable, std::move(sharedState), + std::move(vectorsToScanPos), std::move(colIndicesToScan), srcNodeIDVectorPos, + dstNodeIDVectorPos, tmpDstNodeIDVectorPos, id, paramsString, + std::move(recursiveRoot)} {} + + void initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) override; + + inline std::unique_ptr clone() override { + return std::make_unique(lowerBound, upperBound, nodeTable, + sharedState, vectorsToScanPos, colIndicesToScan, srcNodeIDVectorPos, dstNodeIDVectorPos, + tmpDstNodeIDVectorPos, id, paramsString, recursiveRoot->clone()); + } + +private: + bool scanOutput() override; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/var_length_extend/bfs_state.h b/src/include/processor/operator/var_length_extend/bfs_state.h deleted file mode 100644 index 71f83b165c..0000000000 --- a/src/include/processor/operator/var_length_extend/bfs_state.h +++ /dev/null @@ -1,67 +0,0 @@ -#pragma once - -#include "processor/operator/mask.h" - -namespace kuzu { -namespace processor { - -enum VisitedState : uint8_t { - NOT_VISITED_DST = 0, - VISITED_DST = 1, - NOT_VISITED = 2, - VISITED = 3, -}; - -struct Frontier { - std::vector nodeOffsets; - - inline void resetState() { nodeOffsets.clear(); } - inline uint64_t size() const { return nodeOffsets.size(); } -}; - -struct BFSMorsel { - // Level state - uint8_t currentLevel; - uint64_t nextNodeIdxToExtend; // next node to extend from current frontier. - std::unique_ptr currentFrontier; - std::unique_ptr nextFrontier; - - // Visited state - uint64_t numDstNodes; - uint64_t numVisitedDstNodes; - uint8_t* visitedNodes; - uint8_t* distance; - - // Offset of src node. - common::offset_t srcOffset; - // Maximum offset of dst nodes. - common::offset_t maxOffset; - uint8_t upperBound; - - BFSMorsel(common::offset_t maxOffset_, uint8_t upperBound_); - - // Reset state for a new src node. - void resetState(NodeOffsetSemiMask* semiMask); - // If BFS has completed. - bool isComplete(); - // Mark src as visited. - void markSrc(common::offset_t offset); - // UnMark src as NOT visited to avoid outputting src which has length 0 path and thus should be - // omitted. - void unmarkSrc(); - // Mark node as a dst visited. - void markVisitedDst(common::offset_t offset); - // Mark node as visited. - void markVisited(common::offset_t offset); - - // Get next node offset to extend from current level. - common::offset_t getNextNodeOffset(); - void moveNextLevelAsCurrentLevel(); - -private: - std::unique_ptr visitedNodesBuffer; - std::unique_ptr distanceBuffer; -}; - -} // namespace processor -} // namespace kuzu diff --git a/src/include/processor/operator/var_length_extend/var_length_adj_list_extend.h b/src/include/processor/operator/var_length_extend/var_length_adj_list_extend.h deleted file mode 100644 index 3daaaaad05..0000000000 --- a/src/include/processor/operator/var_length_extend/var_length_adj_list_extend.h +++ /dev/null @@ -1,48 +0,0 @@ -#pragma once - -#include "processor/operator/var_length_extend/var_length_extend.h" -#include "storage/storage_structure/lists/lists.h" - -namespace kuzu { -namespace processor { - -struct AdjListExtendDFSLevelInfo : DFSLevelInfo { - AdjListExtendDFSLevelInfo(uint8_t level, ExecutionContext& context); - - void reset(uint64_t parent); - - uint64_t parent; - uint64_t childrenIdx; - std::unique_ptr listSyncState; - std::unique_ptr listHandle; -}; - -class VarLengthAdjListExtend : public VarLengthExtend { -public: - VarLengthAdjListExtend(const DataPos& boundNodeDataPos, const DataPos& nbrNodeDataPos, - storage::BaseColumnOrList* adjLists, uint8_t lowerBound, uint8_t upperBound, - std::unique_ptr child, uint32_t id, const std::string& paramsString) - : VarLengthExtend(PhysicalOperatorType::VAR_LENGTH_ADJ_LIST_EXTEND, boundNodeDataPos, - nbrNodeDataPos, adjLists, lowerBound, upperBound, std::move(child), id, - paramsString) {} - - void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override; - - bool getNextTuplesInternal(ExecutionContext* context) override; - - std::unique_ptr clone() override { - return std::make_unique(boundNodeDataPos, nbrNodeDataPos, storage, - lowerBound, upperBound, children[0]->clone(), id, paramsString); - } - -private: - // This function resets the dfsLevelInfo at level and adds the dfsLevelInfo to the - // dfsStack if the parent has adjacent nodes. The function returns true if the - // parent has adjacent nodes, otherwise returns false. - bool addDFSLevelToStackIfParentExtends(uint64_t parent, uint8_t level); - - bool getNextBatchOfNbrNodes(std::shared_ptr& dfsLevel) const; -}; - -} // namespace processor -} // namespace kuzu diff --git a/src/include/processor/operator/var_length_extend/var_length_column_extend.h b/src/include/processor/operator/var_length_extend/var_length_column_extend.h deleted file mode 100644 index 5b7187363c..0000000000 --- a/src/include/processor/operator/var_length_extend/var_length_column_extend.h +++ /dev/null @@ -1,47 +0,0 @@ -#pragma once - -#include "processor/operator/physical_operator.h" -#include "processor/operator/var_length_extend/var_length_adj_list_extend.h" -#include "processor/result/result_set.h" -#include "storage/storage_structure/column.h" -#include "storage/storage_structure/lists/lists.h" - -namespace kuzu { -namespace processor { - -struct ColumnExtendDFSLevelInfo : DFSLevelInfo { - ColumnExtendDFSLevelInfo(uint8_t level, ExecutionContext& context) - : DFSLevelInfo(level, context), hasBeenExtended{false} {} - - void reset(); - - bool hasBeenExtended; -}; - -class VarLengthColumnExtend : public VarLengthExtend { -public: - VarLengthColumnExtend(const DataPos& boundNodeDataPos, const DataPos& nbrNodeDataPos, - storage::BaseColumnOrList* storage, uint8_t lowerBound, uint8_t upperBound, - std::unique_ptr child, uint32_t id, const std::string& paramsString) - : VarLengthExtend(PhysicalOperatorType::VAR_LENGTH_COLUMN_EXTEND, boundNodeDataPos, - nbrNodeDataPos, storage, lowerBound, upperBound, std::move(child), id, paramsString) { - } - - void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override; - - bool getNextTuplesInternal(ExecutionContext* context) override; - - std::unique_ptr clone() override { - return make_unique(boundNodeDataPos, nbrNodeDataPos, storage, - lowerBound, upperBound, children[0]->clone(), id, paramsString); - } - -private: - // This function resets the dfsLevelInfo at level and adds the dfsLevelInfo to the - // dfsStack if the parent has adjacent nodes. The function returns true if the - // parent has adjacent nodes, otherwise returns false. - bool addDFSLevelToStackIfParentExtends(common::ValueVector* parentValueVector, uint8_t level); -}; - -} // namespace processor -} // namespace kuzu diff --git a/src/include/processor/operator/var_length_extend/var_length_extend.h b/src/include/processor/operator/var_length_extend/var_length_extend.h deleted file mode 100644 index bfae16ad72..0000000000 --- a/src/include/processor/operator/var_length_extend/var_length_extend.h +++ /dev/null @@ -1,50 +0,0 @@ -#pragma once - -#include "processor/operator/physical_operator.h" -#include "processor/result/result_set.h" -#include "storage/storage_structure/lists/lists.h" - -namespace kuzu { -namespace processor { - -struct DFSLevelInfo { - DFSLevelInfo(uint8_t level, ExecutionContext& context) - : level{level}, hasBeenOutput{false}, children{std::make_unique( - common::INTERNAL_ID, context.memoryManager)} {}; - const uint8_t level; - bool hasBeenOutput; - std::unique_ptr children; -}; - -class VarLengthExtend : public PhysicalOperator { -public: - VarLengthExtend(PhysicalOperatorType operatorType, const DataPos& boundNodeDataPos, - const DataPos& nbrNodeDataPos, storage::BaseColumnOrList* storage, uint8_t lowerBound, - uint8_t upperBound, std::unique_ptr child, uint32_t id, - const std::string& paramsString) - : PhysicalOperator{operatorType, std::move(child), id, paramsString}, - boundNodeDataPos{boundNodeDataPos}, nbrNodeDataPos{nbrNodeDataPos}, storage{storage}, - lowerBound{lowerBound}, upperBound{upperBound} { - dfsLevelInfos.resize(upperBound); - } - - void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override; - -protected: - DataPos boundNodeDataPos; - DataPos nbrNodeDataPos; - storage::BaseColumnOrList* storage; - uint8_t lowerBound; - uint8_t upperBound; - common::ValueVector* boundNodeValueVector; - common::ValueVector* nbrNodeValueVector; - std::stack> dfsStack; - // The VarLenExtend has the invariant that at any point in time, there will be one DSFLevelInfo - // in the dfsStack for each level. dfsLevelInfos is a pool of DFSLevelInfos that holds - // upperBound many of them (which is the maximum we need at any point in time). This allows us - // to avoid creating many DFSLevelInfos as we perform the dfs. - std::vector> dfsLevelInfos; -}; - -} // namespace processor -} // namespace kuzu diff --git a/src/planner/join_order_enumerator.cpp b/src/planner/join_order_enumerator.cpp index cb1a7adbc6..e06e34b16d 100644 --- a/src/planner/join_order_enumerator.cpp +++ b/src/planner/join_order_enumerator.cpp @@ -181,11 +181,9 @@ void JoinOrderEnumerator::appendExtendAndFilter(std::shared_ptr r auto properties = queryPlanner->getPropertiesForRel(*rel); appendNonRecursiveExtend(boundNode, nbrNode, rel, direction, properties, plan); } break; - case common::QueryRelType::VARIABLE_LENGTH: { - appendVariableLengthExtend(boundNode, nbrNode, rel, direction, plan); - } break; + case common::QueryRelType::VARIABLE_LENGTH: case common::QueryRelType::SHORTEST: { - appendShortestPathExtend(boundNode, nbrNode, rel, direction, plan); + appendRecursiveExtend(boundNode, nbrNode, rel, direction, plan); } break; default: throw common::NotImplementedException("appendExtendAndFilter()"); @@ -483,29 +481,11 @@ void JoinOrderEnumerator::appendNonRecursiveExtend(std::shared_ptr boundNode, - std::shared_ptr nbrNode, std::shared_ptr rel, - common::RelDirection direction, kuzu::planner::LogicalPlan& plan) { - auto hasAtMostOneNbr = extendHasAtMostOneNbrGuarantee(*rel, *boundNode, direction, catalog); - auto extend = make_shared( - boundNode, nbrNode, rel, direction, hasAtMostOneNbr, plan.getLastOperator()); - queryPlanner->appendFlattens(extend->getGroupsPosToFlatten(), plan); - extend->setChild(0, plan.getLastOperator()); - extend->computeFactorizedSchema(); - auto extensionRate = queryPlanner->cardinalityEstimator->getExtensionRate(*rel, *boundNode); - plan.setCost(CostModel::computeRecursiveExtendCost(rel->getUpperBound(), extensionRate, plan)); - if (!hasAtMostOneNbr) { - auto group = extend->getSchema()->getGroup(nbrNode->getInternalIDProperty()); - group->setMultiplier(extensionRate); - } - plan.setLastOperator(std::move(extend)); -} - -void JoinOrderEnumerator::appendShortestPathExtend(std::shared_ptr boundNode, +void JoinOrderEnumerator::appendRecursiveExtend(std::shared_ptr boundNode, std::shared_ptr nbrNode, std::shared_ptr rel, - common::RelDirection direction, kuzu::planner::LogicalPlan& plan) { + common::RelDirection direction, LogicalPlan& plan) { auto hasAtMostOneNbr = extendHasAtMostOneNbrGuarantee(*rel, *boundNode, direction, catalog); - auto extend = std::make_shared( + auto extend = std::make_shared( boundNode, nbrNode, rel, direction, plan.getLastOperator()); queryPlanner->appendFlattens(extend->getGroupsPosToFlatten(), plan); extend->setChild(0, plan.getLastOperator()); diff --git a/src/planner/operator/logical_recursive_extend.cpp b/src/planner/operator/logical_recursive_extend.cpp index a7950cdd46..c140d9278c 100644 --- a/src/planner/operator/logical_recursive_extend.cpp +++ b/src/planner/operator/logical_recursive_extend.cpp @@ -23,26 +23,15 @@ void LogicalRecursiveExtend::computeFlatSchema() { } } -void LogicalVariableLengthExtend::computeFactorizedSchema() { - copyChildSchema(0); - auto boundGroupPos = schema->getGroupPos(boundNode->getInternalIDPropertyName()); - uint32_t nbrGroupPos = 0u; - if (hasAtMostOneNbr) { - nbrGroupPos = boundGroupPos; - } else { - assert(schema->getGroup(boundGroupPos)->isFlat()); - nbrGroupPos = schema->createGroup(); - } - schema->insertToGroupAndScope(nbrNode->getInternalIDProperty(), nbrGroupPos); -} - -void LogicalShortestPathExtend::computeFactorizedSchema() { +void LogicalRecursiveExtend::computeFactorizedSchema() { createEmptySchema(); auto childSchema = children[0]->getSchema(); SinkOperatorUtil::recomputeSchema(*childSchema, childSchema->getExpressionsInScope(), *schema); auto nbrGroupPos = schema->createGroup(); schema->insertToGroupAndScope(nbrNode->getInternalIDProperty(), nbrGroupPos); - schema->insertToGroupAndScope(rel->getInternalLengthProperty(), nbrGroupPos); + if (rel->getRelType() == common::QueryRelType::SHORTEST) { + schema->insertToGroupAndScope(rel->getInternalLengthProperty(), nbrGroupPos); + } } } // namespace planner diff --git a/src/processor/mapper/map_extend.cpp b/src/processor/mapper/map_extend.cpp index ec11375f09..7ddd134ab8 100644 --- a/src/processor/mapper/map_extend.cpp +++ b/src/processor/mapper/map_extend.cpp @@ -1,12 +1,11 @@ #include "planner/logical_plan/logical_operator/logical_extend.h" #include "planner/logical_plan/logical_operator/logical_recursive_extend.h" #include "processor/mapper/plan_mapper.h" +#include "processor/operator/recursive_extend/shortest_path_recursive_join.h" +#include "processor/operator/recursive_extend/variable_length_recursive_join.h" #include "processor/operator/scan/generic_scan_rel_tables.h" #include "processor/operator/scan/scan_rel_table_columns.h" #include "processor/operator/scan/scan_rel_table_lists.h" -#include "processor/operator/var_length_extend/recursive_join.h" -#include "processor/operator/var_length_extend/var_length_adj_list_extend.h" -#include "processor/operator/var_length_extend/var_length_column_extend.h" using namespace kuzu::binder; using namespace kuzu::common; @@ -121,62 +120,60 @@ std::unique_ptr PlanMapper::mapLogicalRecursiveExtendToPhysica DataPos(outSchema->getExpressionPos(*nbrNode->getInternalIDProperty())); auto& relsStore = storageManager.getRelsStore(); auto relTableID = rel->getSingleTableID(); - if (rel->getRelType() == common::QueryRelType::VARIABLE_LENGTH) { - if (relsStore.isSingleMultiplicityInDirection(direction, relTableID)) { - auto adjColumn = relsStore.getAdjColumn(direction, relTableID); - return make_unique(inNodeIDVectorPos, outNodeIDVectorPos, - adjColumn, rel->getLowerBound(), rel->getUpperBound(), std::move(prevOperator), - getOperatorID(), extend->getExpressionsForPrinting()); - } else { - auto adjList = relsStore.getAdjLists(direction, relTableID); - return make_unique(inNodeIDVectorPos, outNodeIDVectorPos, - adjList, rel->getLowerBound(), rel->getUpperBound(), std::move(prevOperator), - getOperatorID(), extend->getExpressionsForPrinting()); - } + auto expressions = inSchema->getExpressionsInScope(); + auto resultCollector = appendResultCollector(expressions, *inSchema, std::move(prevOperator)); + auto sharedInputFTable = resultCollector->getSharedState(); + std::vector outDataPoses; + std::vector colIndicesToScan; + for (auto i = 0u; i < expressions.size(); ++i) { + outDataPoses.emplace_back(outSchema->getExpressionPos(*expressions[i])); + colIndicesToScan.push_back(i); + } + auto& nodeStore = storageManager.getNodesStore(); + auto nodeTable = nodeStore.getNodeTable(boundNode->getSingleTableID()); + std::string emptyParamString; + // TODO(Xiyang): this hard code is fine as long as we use just 2 vectors as the intermediate + // state for recursive join. Though ideally we should construct them from schema. + auto tmpSrcNodePos = BaseRecursiveJoin::getTmpSrcNodeVectorPos(); + auto scanFrontier = + std::make_unique(tmpSrcNodePos, getOperatorID(), emptyParamString); + std::unique_ptr scanRelTable; + std::vector emptyPropertyIDs; + DataPos tmpDstNodePos{0, 0}; + if (relsStore.isSingleMultiplicityInDirection(direction, relTableID)) { + tmpDstNodePos = DataPos{0, 1}; + scanRelTable = make_unique( + relsStore.getRelTable(relTableID)->getDirectedTableData(direction), emptyPropertyIDs, + tmpSrcNodePos, std::vector{tmpDstNodePos}, std::move(scanFrontier), + getOperatorID(), emptyParamString); } else { - assert(rel->getRelType() == common::QueryRelType::SHORTEST); - auto expressions = inSchema->getExpressionsInScope(); - auto resultCollector = - appendResultCollector(expressions, *inSchema, std::move(prevOperator)); - auto sharedInputFTable = resultCollector->getSharedState(); - std::vector outDataPoses; - std::vector colIndicesToScan; - for (auto i = 0u; i < expressions.size(); ++i) { - outDataPoses.emplace_back(outSchema->getExpressionPos(*expressions[i])); - colIndicesToScan.push_back(i); - } - auto upperBound = rel->getUpperBound(); - auto& nodeStore = storageManager.getNodesStore(); - auto nodeTable = nodeStore.getNodeTable(boundNode->getSingleTableID()); + tmpDstNodePos = DataPos{0, 1}; + scanRelTable = make_unique( + relsStore.getRelTable(relTableID)->getDirectedTableData(direction), emptyPropertyIDs, + tmpSrcNodePos, std::vector{tmpDstNodePos}, std::move(scanFrontier), + getOperatorID(), emptyParamString); + } + auto sharedState = std::make_shared(sharedInputFTable, nodeTable); + switch (rel->getRelType()) { + case common::QueryRelType::SHORTEST: { auto distanceVectorPos = DataPos{outSchema->getExpressionPos(*rel->getInternalLengthProperty())}; - std::string emptyParamString; - // TODO(Xiyang): this hard code is fine as long as we use just 2 vectors as the intermediate - // state for recursive join. Though ideally we should construct them from schema. - auto tmpSrcNodePos = RecursiveJoin::getTmpSrcNodeVectorPos(); - auto tmpDstNodePos = RecursiveJoin::getTmpDstNodeVectorPos(); - auto scanFrontier = - std::make_unique(tmpSrcNodePos, getOperatorID(), emptyParamString); - std::unique_ptr scanRelTable; - std::vector emptyPropertyIDs; - if (relsStore.isSingleMultiplicityInDirection(direction, relTableID)) { - scanRelTable = make_unique( - relsStore.getRelTable(relTableID)->getDirectedTableData(direction), - emptyPropertyIDs, tmpSrcNodePos, std::vector{tmpDstNodePos}, - std::move(scanFrontier), getOperatorID(), emptyParamString); - } else { - scanRelTable = make_unique( - relsStore.getRelTable(relTableID)->getDirectedTableData(direction), - emptyPropertyIDs, tmpSrcNodePos, std::vector{tmpDstNodePos}, - std::move(scanFrontier), getOperatorID(), emptyParamString); - } - auto sharedState = std::make_shared(sharedInputFTable, nodeTable); - return std::make_unique(upperBound, nodeTable, sharedState, outDataPoses, - colIndicesToScan, inNodeIDVectorPos, outNodeIDVectorPos, distanceVectorPos, + return std::make_unique(rel->getLowerBound(), + rel->getUpperBound(), nodeTable, sharedState, outDataPoses, colIndicesToScan, + inNodeIDVectorPos, outNodeIDVectorPos, distanceVectorPos, tmpDstNodePos, std::move(resultCollector), getOperatorID(), extend->getExpressionsForPrinting(), std::move(scanRelTable)); } + case common::QueryRelType::VARIABLE_LENGTH: { + return std::make_unique(rel->getLowerBound(), + rel->getUpperBound(), nodeTable, sharedState, outDataPoses, colIndicesToScan, + inNodeIDVectorPos, outNodeIDVectorPos, tmpDstNodePos, std::move(resultCollector), + getOperatorID(), extend->getExpressionsForPrinting(), std::move(scanRelTable)); + } + default: + throw common::NotImplementedException("PlanMapper::mapLogicalRecursiveExtendToPhysical"); + } } } // namespace processor diff --git a/src/processor/mapper/map_semi_masker.cpp b/src/processor/mapper/map_semi_masker.cpp index 689735297e..428d1b3789 100644 --- a/src/processor/mapper/map_semi_masker.cpp +++ b/src/processor/mapper/map_semi_masker.cpp @@ -1,8 +1,8 @@ #include "planner/logical_plan/logical_operator/logical_semi_masker.h" #include "processor/mapper/plan_mapper.h" +#include "processor/operator/recursive_extend/recursive_join.h" #include "processor/operator/scan_node_id.h" #include "processor/operator/semi_masker.h" -#include "processor/operator/var_length_extend/recursive_join.h" using namespace kuzu::planner; @@ -32,7 +32,7 @@ std::unique_ptr PlanMapper::mapLogicalSemiMaskerToPhysical( } } break; case PhysicalOperatorType::RECURSIVE_JOIN: { - auto recursiveJoin = (RecursiveJoin*)physicalOp; + auto recursiveJoin = (BaseRecursiveJoin*)physicalOp; assert(!node->isMultiLabeled()); auto tableID = node->getSingleTableID(); masksPerTable.at(tableID).emplace_back(recursiveJoin->getSemiMask(), 0); diff --git a/src/processor/operator/CMakeLists.txt b/src/processor/operator/CMakeLists.txt index db1b81b91f..181fbbf60c 100644 --- a/src/processor/operator/CMakeLists.txt +++ b/src/processor/operator/CMakeLists.txt @@ -7,7 +7,7 @@ add_subdirectory(order_by) add_subdirectory(scan) add_subdirectory(table_scan) add_subdirectory(update) -add_subdirectory(var_length_extend) +add_subdirectory(recursive_extend) add_library(kuzu_processor_operator OBJECT diff --git a/src/processor/operator/var_length_extend/CMakeLists.txt b/src/processor/operator/recursive_extend/CMakeLists.txt similarity index 70% rename from src/processor/operator/var_length_extend/CMakeLists.txt rename to src/processor/operator/recursive_extend/CMakeLists.txt index b1c3153885..33a6e4b0d7 100644 --- a/src/processor/operator/var_length_extend/CMakeLists.txt +++ b/src/processor/operator/recursive_extend/CMakeLists.txt @@ -2,9 +2,8 @@ add_library(kuzu_processor_operator_ver_length_extend OBJECT bfs_state.cpp recursive_join.cpp - var_length_adj_list_extend.cpp - var_length_column_extend.cpp - var_length_extend.cpp) + shortest_path_recursive_join.cpp + variable_length_recursive_join.cpp) set(ALL_OBJECT_FILES ${ALL_OBJECT_FILES} $ diff --git a/src/processor/operator/recursive_extend/bfs_state.cpp b/src/processor/operator/recursive_extend/bfs_state.cpp new file mode 100644 index 0000000000..e9cb050850 --- /dev/null +++ b/src/processor/operator/recursive_extend/bfs_state.cpp @@ -0,0 +1,89 @@ +#include "processor/operator/recursive_extend/bfs_state.h" + +namespace kuzu { +namespace processor { + +common::offset_t BaseBFSMorsel::getNextNodeOffset() { + if (nextNodeIdxToExtend == currentFrontier->nodeOffsets.size()) { + return common::INVALID_NODE_OFFSET; + } + return currentFrontier->nodeOffsets[nextNodeIdxToExtend++]; +} + +void BaseBFSMorsel::moveNextLevelAsCurrentLevel() { + currentFrontier = std::move(nextFrontier); + currentLevel++; + nextNodeIdxToExtend = 0; + nextFrontier = createFrontier(); + if (currentLevel == upperBound) { // No need to sort if we are not extending further. + std::sort(currentFrontier->nodeOffsets.begin(), currentFrontier->nodeOffsets.end()); + } +} + +void BaseBFSMorsel::resetState() { + currentLevel = 0; + nextNodeIdxToExtend = 0; + currentFrontier->resetState(); + nextFrontier->resetState(); + numTargetDstNodes = 0; +} + +void ShortestPathBFSMorsel::markSrc(common::offset_t offset) { + if (visitedNodes[offset] == NOT_VISITED_DST) { + visitedNodes[offset] = VISITED_DST; + numVisitedDstNodes++; + } + currentFrontier->nodeOffsets.push_back(offset); +} + +void ShortestPathBFSMorsel::markVisited(common::offset_t offset, uint64_t multiplicity) { + if (visitedNodes[offset] == NOT_VISITED_DST) { + visitedNodes[offset] = VISITED_DST; + dstNodeOffsets.push_back(offset); + dstNodeOffset2PathLength.insert({offset, currentLevel + 1}); + numVisitedDstNodes++; + nextFrontier->nodeOffsets.push_back(offset); + } else if (visitedNodes[offset] == NOT_VISITED) { + visitedNodes[offset] = VISITED; + nextFrontier->nodeOffsets.push_back(offset); + } +} + +void ShortestPathBFSMorsel::resetVisitedState() { + numVisitedDstNodes = 0; + dstNodeOffsets.clear(); + dstNodeOffset2PathLength.clear(); + if (!isAllDstTarget()) { + std::fill(visitedNodes, visitedNodes + maxOffset + 1, (uint8_t)VisitedState::NOT_VISITED); + for (auto offset : targetDstNodeOffsets) { + visitedNodes[offset] = VisitedState::NOT_VISITED_DST; + } + numTargetDstNodes = targetDstNodeOffsets.size(); + } else { + std::fill( + visitedNodes, visitedNodes + maxOffset + 1, (uint8_t)VisitedState::NOT_VISITED_DST); + numTargetDstNodes = maxOffset + 1; + } +} + +void VariableLengthBFSMorsel::updateNumPathFromCurrentFrontier() { + if (currentLevel < lowerBound) { + return; + } + if (!isAllDstTarget() && numTargetDstNodes < currentFrontier->nodeOffsets.size()) { + // Target is smaller than current frontier size. Loop through target instead of current + // frontier. + for (auto offset : targetDstNodeOffsets) { + if (((FrontierWithMultiplicity&)*currentFrontier).contains(offset)) { + updateNumPath(offset, currentFrontier->getMultiplicity(offset)); + } + } + } else { + for (auto offset : currentFrontier->nodeOffsets) { + updateNumPath(offset, currentFrontier->getMultiplicity(offset)); + } + } +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/recursive_extend/recursive_join.cpp b/src/processor/operator/recursive_extend/recursive_join.cpp new file mode 100644 index 0000000000..a8e767dcb7 --- /dev/null +++ b/src/processor/operator/recursive_extend/recursive_join.cpp @@ -0,0 +1,127 @@ +#include "processor/operator/recursive_extend/recursive_join.h" + +namespace kuzu { +namespace processor { + +bool ScanFrontier::getNextTuplesInternal(ExecutionContext* context) { + if (!hasExecuted) { + hasExecuted = true; + return true; + } + return false; +} + +void BaseRecursiveJoin::initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) { + for (auto& dataPos : vectorsToScanPos) { + vectorsToScan.push_back(resultSet->getValueVector(dataPos).get()); + } + srcNodeIDVector = resultSet->getValueVector(srcNodeIDVectorPos); + dstNodeIDVector = resultSet->getValueVector(dstNodeIDVectorPos); + initLocalRecursivePlan(context); + outputCursor = 0; +} + +bool BaseRecursiveJoin::getNextTuplesInternal(ExecutionContext* context) { + // There are two high level steps. + // + // (1) BFS Computation phase: Grab a new source to do a BFS and compute an entire BFS starting + // from a single source; + // + // (2) Outputting phase: Once a BFS from a single source finishes, we output the results in + // pieces of vectors to the parent operator. + // + // These 2 steps are repeated iteratively until all sources to do a BFS are exhausted. The first + // if statement checks if we are in the outputting phase and if so, scans a vector to output and + // returns true. Otherwise, we compute a new BFS. + while (true) { + if (scanOutput()) { // Phase 2 + return true; + } + auto inputFTableMorsel = sharedState->inputFTableSharedState->getMorsel(1 /* morselSize */); + if (inputFTableMorsel->numTuples == 0) { // All src have been exhausted. + return false; + } + sharedState->inputFTableSharedState->getTable()->scan(vectorsToScan, + inputFTableMorsel->startTupleIdx, inputFTableMorsel->numTuples, colIndicesToScan); + bfsMorsel->resetState(); + computeBFS(context); // Phase 1 + outputCursor = 0; // Reset cursor for result scanning. + } +} + +void BaseRecursiveJoin::computeBFS(ExecutionContext* context) { + auto nodeID = srcNodeIDVector->getValue( + srcNodeIDVector->state->selVector->selectedPositions[0]); + bfsMorsel->markSrc(nodeID.offset); + while (!bfsMorsel->isComplete()) { + auto nodeOffset = bfsMorsel->getNextNodeOffset(); + if (nodeOffset != common::INVALID_NODE_OFFSET) { + auto multiplicity = bfsMorsel->currentFrontier->getMultiplicity(nodeOffset); + // Found a starting node from current frontier. + scanBFSLevel->setNodeID(common::nodeID_t{nodeOffset, nodeTable->getTableID()}); + while (recursiveRoot->getNextTuple(context)) { // Exhaust recursive plan. + updateVisitedNodes(multiplicity); + } + } else { + // Otherwise move to the next frontier. + bfsMorsel->finalizeCurrentLevel(); + } + } +} + +void BaseRecursiveJoin::updateVisitedNodes(uint64_t multiplicity) { + for (auto i = 0u; i < tmpDstNodeIDVector->state->selVector->selectedSize; ++i) { + auto pos = tmpDstNodeIDVector->state->selVector->selectedPositions[i]; + auto nodeID = tmpDstNodeIDVector->getValue(pos); + bfsMorsel->markVisited(nodeID.offset, multiplicity); + } +} + +// ResultSet for list extend, i.e. 2 data chunks each with 1 vector. +static std::unique_ptr populateResultSetWithTwoDataChunks() { + auto resultSet = std::make_unique(2); + auto dataChunk0 = std::make_shared(1); + dataChunk0->state = common::DataChunkState::getSingleValueDataChunkState(); + dataChunk0->insert(0, std::make_shared(common::INTERNAL_ID, nullptr)); + auto dataChunk1 = std::make_shared(1); + dataChunk1->insert(0, std::make_shared(common::INTERNAL_ID, nullptr)); + resultSet->insert(0, std::move(dataChunk0)); + resultSet->insert(1, std::move(dataChunk1)); + return resultSet; +} + +// ResultSet for column extend, i.e. 1 data chunk with 2 vectors. +static std::unique_ptr populateResultSetWithOneDataChunk() { + auto resultSet = std::make_unique(1); + auto dataChunk0 = std::make_shared(2); + dataChunk0->state = common::DataChunkState::getSingleValueDataChunkState(); + dataChunk0->insert(0, std::make_shared(common::INTERNAL_ID, nullptr)); + dataChunk0->insert(1, std::make_shared(common::INTERNAL_ID, nullptr)); + resultSet->insert(0, std::move(dataChunk0)); + return resultSet; +} + +std::unique_ptr BaseRecursiveJoin::getLocalResultSet() { + auto numDataChunks = tmpDstNodeIDVectorPos.dataChunkPos + 1; + if (numDataChunks == 2) { + return populateResultSetWithTwoDataChunks(); + } else { + assert(tmpDstNodeIDVectorPos.dataChunkPos == 0); + return populateResultSetWithOneDataChunk(); + } +} + +void BaseRecursiveJoin::initLocalRecursivePlan(ExecutionContext* context) { + auto op = recursiveRoot.get(); + while (!op->isSource()) { + assert(op->getNumChildren() == 1); + op = op->getChild(0); + } + scanBFSLevel = (ScanFrontier*)op; + localResultSet = getLocalResultSet(); + tmpDstNodeIDVector = localResultSet->getValueVector(tmpDstNodeIDVectorPos); + recursiveRoot->initLocalState(localResultSet.get(), context); +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/recursive_extend/shortest_path_recursive_join.cpp b/src/processor/operator/recursive_extend/shortest_path_recursive_join.cpp new file mode 100644 index 0000000000..ba0f3ac789 --- /dev/null +++ b/src/processor/operator/recursive_extend/shortest_path_recursive_join.cpp @@ -0,0 +1,36 @@ +#include "processor/operator/recursive_extend/shortest_path_recursive_join.h" + +namespace kuzu { +namespace processor { + +void ShortestPathRecursiveJoin::initLocalStateInternal( + ResultSet* resultSet_, ExecutionContext* context) { + BaseRecursiveJoin::initLocalStateInternal(resultSet_, context); + distanceVector = resultSet->getValueVector(distanceVectorPos); + auto maxNodeOffset = nodeTable->getMaxNodeOffset(transaction); + bfsMorsel = std::make_unique( + maxNodeOffset, lowerBound, upperBound, sharedState->semiMask.get()); + bfsMorsel->resetState(); +} + +bool ShortestPathRecursiveJoin::scanOutput() { + auto morsel = (ShortestPathBFSMorsel*)bfsMorsel.get(); + if (outputCursor == morsel->dstNodeOffsets.size()) { + return false; + } + auto vectorSize = 0u; + while (vectorSize != common::DEFAULT_VECTOR_CAPACITY && + outputCursor < morsel->dstNodeOffsets.size()) { + auto offset = morsel->dstNodeOffsets[outputCursor]; + dstNodeIDVector->setValue( + vectorSize, common::nodeID_t{offset, nodeTable->getTableID()}); + distanceVector->setValue(vectorSize, morsel->dstNodeOffset2PathLength.at(offset)); + vectorSize++; + outputCursor++; + } + dstNodeIDVector->state->initOriginalAndSelectedSize(vectorSize); + return true; +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/recursive_extend/variable_length_recursive_join.cpp b/src/processor/operator/recursive_extend/variable_length_recursive_join.cpp new file mode 100644 index 0000000000..40a2b5412d --- /dev/null +++ b/src/processor/operator/recursive_extend/variable_length_recursive_join.cpp @@ -0,0 +1,40 @@ +#include "processor/operator/recursive_extend/variable_length_recursive_join.h" + +namespace kuzu { +namespace processor { + +void VariableLengthRecursiveJoin::initLocalStateInternal( + ResultSet* resultSet_, ExecutionContext* context) { + BaseRecursiveJoin::initLocalStateInternal(resultSet_, context); + auto maxNodeOffset = nodeTable->getMaxNodeOffset(transaction); + bfsMorsel = std::make_unique( + maxNodeOffset, lowerBound, upperBound, sharedState->semiMask.get()); + bfsMorsel->resetState(); +} + +bool VariableLengthRecursiveJoin::scanOutput() { + auto morsel = (VariableLengthBFSMorsel*)bfsMorsel.get(); + if (outputCursor == morsel->dstNodeOffsets.size()) { + return false; + } + auto vectorSize = 0u; + while (vectorSize != common::DEFAULT_VECTOR_CAPACITY && + outputCursor < morsel->dstNodeOffsets.size()) { + auto offset = morsel->dstNodeOffsets[outputCursor]; + auto& numPath = morsel->dstNodeOffset2NumPath.at(offset); + auto numPathFitInCurrentVector = + std::min(common::DEFAULT_VECTOR_CAPACITY - vectorSize, numPath); + for (auto i = 0u; i < numPathFitInCurrentVector; ++i) { + dstNodeIDVector->setValue( + vectorSize, common::nodeID_t{offset, nodeTable->getTableID()}); + vectorSize++; + } + numPath -= numPathFitInCurrentVector; + outputCursor += numPath == 0; // No more path to scan under current cursor + } + dstNodeIDVector->state->initOriginalAndSelectedSize(vectorSize); + return true; +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/var_length_extend/bfs_state.cpp b/src/processor/operator/var_length_extend/bfs_state.cpp deleted file mode 100644 index 2681499bf9..0000000000 --- a/src/processor/operator/var_length_extend/bfs_state.cpp +++ /dev/null @@ -1,102 +0,0 @@ -#include "processor/operator/var_length_extend/bfs_state.h" - -namespace kuzu { -namespace processor { - -BFSMorsel::BFSMorsel(common::offset_t maxOffset_, uint8_t upperBound_) { - maxOffset = maxOffset_; - upperBound = upperBound_; - currentFrontier = std::make_unique(); - nextFrontier = std::make_unique(); - visitedNodesBuffer = std::make_unique(maxOffset + 1 * sizeof(uint8_t)); - visitedNodes = visitedNodesBuffer.get(); - distanceBuffer = std::make_unique(maxOffset + 1 * sizeof(uint8_t)); - distance = distanceBuffer.get(); -} - -void BFSMorsel::resetState(NodeOffsetSemiMask* semiMask) { - currentLevel = 0; - nextNodeIdxToExtend = 0; - currentFrontier->resetState(); - nextFrontier->resetState(); - numDstNodes = 0; - numVisitedDstNodes = 0; - if (semiMask->isEnabled()) { - for (auto offset = 0u; offset < maxOffset + 1; ++offset) { - if (semiMask->isNodeMasked(offset)) { - visitedNodes[offset] = VisitedState::NOT_VISITED_DST; - numDstNodes++; - } else { - visitedNodes[offset] = VisitedState::NOT_VISITED; - } - } - } else { - std::fill( - visitedNodes, visitedNodes + maxOffset + 1, (uint8_t)VisitedState::NOT_VISITED_DST); - numDstNodes = maxOffset + 1; - } -} - -bool BFSMorsel::isComplete() { - if (currentFrontier->size() == 0) { // no more to extend. - return true; - } - if (currentLevel == upperBound) { // upper limit reached. - return true; - } - if (numVisitedDstNodes == numDstNodes) { // all destinations have been reached. - return true; - } - return false; -} - -void BFSMorsel::markSrc(common::offset_t offset) { - if (visitedNodes[offset] == NOT_VISITED_DST) { - visitedNodes[offset] = VISITED_DST; - distance[offset] = 0; - numVisitedDstNodes++; - } - currentFrontier->nodeOffsets.push_back(offset); - srcOffset = offset; -} - -void BFSMorsel::unmarkSrc() { - if (visitedNodes[srcOffset] == VISITED_DST) { - visitedNodes[srcOffset] = NOT_VISITED_DST; - numVisitedDstNodes--; - } -} - -void BFSMorsel::markVisitedDst(common::offset_t offset) { - assert(visitedNodes[offset] == NOT_VISITED_DST); - visitedNodes[offset] = VISITED_DST; - distance[offset] = currentLevel + 1; - numVisitedDstNodes++; - nextFrontier->nodeOffsets.push_back(offset); -} - -void BFSMorsel::markVisited(common::offset_t offset) { - assert(visitedNodes[offset] == NOT_VISITED); - visitedNodes[offset] = VISITED; - nextFrontier->nodeOffsets.push_back(offset); -} - -common::offset_t BFSMorsel::getNextNodeOffset() { - if (nextNodeIdxToExtend == currentFrontier->size()) { - return common::INVALID_NODE_OFFSET; - } - return currentFrontier->nodeOffsets[nextNodeIdxToExtend++]; -} - -void BFSMorsel::moveNextLevelAsCurrentLevel() { - currentFrontier = std::move(nextFrontier); - currentLevel++; - nextNodeIdxToExtend = 0; - nextFrontier = std::make_unique(); - if (currentLevel == upperBound) { // No need to sort if we are not extending further. - std::sort(currentFrontier->nodeOffsets.begin(), currentFrontier->nodeOffsets.end()); - } -} - -} // namespace processor -} // namespace kuzu diff --git a/src/processor/operator/var_length_extend/recursive_join.cpp b/src/processor/operator/var_length_extend/recursive_join.cpp deleted file mode 100644 index 2b72df6234..0000000000 --- a/src/processor/operator/var_length_extend/recursive_join.cpp +++ /dev/null @@ -1,141 +0,0 @@ -#include "processor/operator/var_length_extend/recursive_join.h" - -namespace kuzu { -namespace processor { - -bool ScanFrontier::getNextTuplesInternal(ExecutionContext* context) { - if (!hasExecuted) { - hasExecuted = true; - return true; - } - return false; -} - -void RecursiveJoin::initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) { - maxNodeOffset = nodeTable->getMaxNodeOffset(transaction); - for (auto& dataPos : vectorsToScanPos) { - vectorsToScan.push_back(resultSet->getValueVector(dataPos).get()); - } - srcNodeIDVector = resultSet->getValueVector(srcNodeIDVectorPos); - dstNodeIDVector = resultSet->getValueVector(dstNodeIDVectorPos); - distanceVector = resultSet->getValueVector(distanceVectorPos); - bfsMorsel = std::make_unique(maxNodeOffset, upperBound); - bfsMorsel->resetState(sharedState->semiMask.get()); - initLocalRecursivePlan(context); - bfsScanState.resetState(); -} - -// There are two high level steps. -// -// (1) BFS Computation phase: Grab a new source to do a BFS and compute an entire BFS starting from -// a single source; -// -// (2) Outputting phase: Once a computation finishes, we output the results in pieces of vectors to -// the parent operator. -// -// These 2 steps are repeated iteratively until all sources to do a recursive computation are -// exhausted. The first if statement checks if we are in the outputting phase and if so, scans a -// vector to output and returns true. Otherwise, we compute a new BFS. -bool RecursiveJoin::getNextTuplesInternal(ExecutionContext* context) { - while (true) { - if (bfsScanState.numScanned < bfsMorsel->numVisitedDstNodes) { // Phase 2 - auto numToScan = std::min(common::DEFAULT_VECTOR_CAPACITY, - bfsMorsel->numVisitedDstNodes - bfsScanState.numScanned); - scanDstNodes(numToScan); - bfsScanState.numScanned += numToScan; - return true; - } - if (!computeBFS(context)) { // Phase 1 - return false; - } - } -} - -bool RecursiveJoin::computeBFS(ExecutionContext* context) { - auto inputFTableMorsel = sharedState->inputFTableSharedState->getMorsel(1); - if (inputFTableMorsel->numTuples == 0) { - return false; - } - sharedState->inputFTableSharedState->getTable()->scan(vectorsToScan, - inputFTableMorsel->startTupleIdx, inputFTableMorsel->numTuples, colIndicesToScan); - bfsMorsel->resetState(sharedState->semiMask.get()); - auto nodeID = srcNodeIDVector->getValue( - srcNodeIDVector->state->selVector->selectedPositions[0]); - bfsMorsel->markSrc(nodeID.offset); - while (!bfsMorsel->isComplete()) { - auto nodeOffset = bfsMorsel->getNextNodeOffset(); - if (nodeOffset != common::INVALID_NODE_OFFSET) { - // Found a starting node from current frontier. - scanBFSLevel->setNodeID(common::nodeID_t{nodeOffset, nodeTable->getTableID()}); - while (root->getNextTuple(context)) { // Exhaust recursive plan. - updateVisitedState(); - } - } else { - // Otherwise move to the next frontier. - bfsMorsel->moveNextLevelAsCurrentLevel(); - } - } - bfsMorsel->unmarkSrc(); - bfsScanState.resetState(); - return true; -} - -void RecursiveJoin::updateVisitedState() { - auto visitedNodes = bfsMorsel->visitedNodes; - for (auto i = 0u; i < tmpDstNodeIDVector->state->selVector->selectedSize; ++i) { - auto pos = tmpDstNodeIDVector->state->selVector->selectedPositions[i]; - auto nodeID = tmpDstNodeIDVector->getValue(pos); - if (visitedNodes[nodeID.offset] == VisitedState::NOT_VISITED_DST) { - bfsMorsel->markVisitedDst(nodeID.offset); - } else if (visitedNodes[nodeID.offset] == VisitedState::NOT_VISITED) { - bfsMorsel->markVisited(nodeID.offset); - } - } -} - -std::unique_ptr RecursiveJoin::getLocalResultSet() { - // Create two datachunks each with 1 nodeID value vector. - // DataChunk 1 has flat state and contains the temporary src node ID vector for recursive join. - // DataChunk 2 has unFlat state and contains the temporary dst node ID vector for recursive - // join. - auto resultSet = std::make_unique(2); - auto dataChunk0 = std::make_shared(1); - dataChunk0->state = common::DataChunkState::getSingleValueDataChunkState(); - dataChunk0->insert(0, std::make_shared(common::INTERNAL_ID, nullptr)); - auto dataChunk1 = std::make_shared(1); - dataChunk1->insert(0, std::make_shared(common::INTERNAL_ID, nullptr)); - resultSet->insert(0, std::move(dataChunk0)); - resultSet->insert(1, std::move(dataChunk1)); - return resultSet; -} - -void RecursiveJoin::initLocalRecursivePlan(ExecutionContext* context) { - auto op = root.get(); - while (!op->isSource()) { - assert(op->getNumChildren() == 1); - op = op->getChild(0); - } - scanBFSLevel = (ScanFrontier*)op; - localResultSet = getLocalResultSet(); - tmpDstNodeIDVector = localResultSet->getValueVector(getTmpDstNodeVectorPos()); - root->initLocalState(localResultSet.get(), context); -} - -void RecursiveJoin::scanDstNodes(size_t sizeToScan) { - auto size = 0; - while (sizeToScan != size) { - if (bfsMorsel->visitedNodes[bfsScanState.currentOffset] != VISITED_DST) { - bfsScanState.currentOffset++; - continue; - } - dstNodeIDVector->setValue( - size, common::nodeID_t{bfsScanState.currentOffset, nodeTable->getTableID()}); - distanceVector->setValue(size, bfsMorsel->distance[bfsScanState.currentOffset]); - size++; - bfsScanState.currentOffset++; - } - dstNodeIDVector->state->initOriginalAndSelectedSize(sizeToScan); -} - -} // namespace processor -} // namespace kuzu diff --git a/src/processor/operator/var_length_extend/var_length_adj_list_extend.cpp b/src/processor/operator/var_length_extend/var_length_adj_list_extend.cpp deleted file mode 100644 index 5465555401..0000000000 --- a/src/processor/operator/var_length_extend/var_length_adj_list_extend.cpp +++ /dev/null @@ -1,107 +0,0 @@ -#include "processor/operator/var_length_extend/var_length_adj_list_extend.h" - -#include "common/types/types.h" - -using namespace kuzu::common; -using namespace kuzu::storage; - -namespace kuzu { -namespace processor { - -AdjListExtendDFSLevelInfo::AdjListExtendDFSLevelInfo(uint8_t level, ExecutionContext& context) - : DFSLevelInfo{level, context}, parent{0}, childrenIdx{0} { - // The children ValueVector inside the DFSLevelInfo struct is not tied to a DataChunk. - // Because we use AdjLists to read data into the children valueVector, and AdjLists requires a - // DataChunkState to write how many nodes it has read, we create a new DataChunkState and assign - // it to children. - children->state = std::make_shared(); - listSyncState = std::make_unique(); - listHandle = std::make_unique(*listSyncState); -} - -void AdjListExtendDFSLevelInfo::reset(uint64_t parent_) { - // We do not explicitly reset the largeListHandle because when we call - // largeListHandle->hasMoreToRead() inside getNextBatchOfNbrNodes() and that returns false, - // largeListHandle->hasMoreToRead() resets its listSyncState. - this->parent = parent_; - this->childrenIdx = 0; - this->hasBeenOutput = false; -} - -void VarLengthAdjListExtend::initLocalStateInternal( - ResultSet* resultSet, ExecutionContext* context) { - VarLengthExtend::initLocalStateInternal(resultSet, context); - for (uint8_t i = 0; i < upperBound; i++) { - dfsLevelInfos[i] = std::make_shared(i + 1, *context); - } -} - -bool VarLengthAdjListExtend::getNextTuplesInternal(ExecutionContext* context) { - while (true) { - while (!dfsStack.empty()) { - auto dfsLevelInfo = static_pointer_cast(dfsStack.top()); - if (dfsLevelInfo->level >= lowerBound && dfsLevelInfo->level <= upperBound && - !dfsLevelInfo->hasBeenOutput) { - // It is impossible for the children to have a null value, so we don't need - // to copy the null mask to the nbrNodeValueVector. - memcpy(nbrNodeValueVector->getData(), dfsLevelInfo->children->getData(), - dfsLevelInfo->children->state->selVector->selectedSize * - Types::getDataTypeSize(dfsLevelInfo->children->dataType)); - nbrNodeValueVector->state->selVector->selectedSize = - dfsLevelInfo->children->state->selVector->selectedSize; - dfsLevelInfo->hasBeenOutput = true; - return true; - } else if (dfsLevelInfo->childrenIdx < - dfsLevelInfo->children->state->selVector->selectedSize && - dfsLevelInfo->level != upperBound) { - addDFSLevelToStackIfParentExtends( - dfsLevelInfo->children->readNodeOffset( - dfsLevelInfo->children->state->selVector - ->selectedPositions[dfsLevelInfo->childrenIdx]), - dfsLevelInfo->level + 1); - dfsLevelInfo->childrenIdx++; - } else if (getNextBatchOfNbrNodes(dfsLevelInfo)) { - dfsLevelInfo->childrenIdx = 0; - dfsLevelInfo->hasBeenOutput = false; - } else { - dfsStack.pop(); - } - } - uint64_t curIdx; - do { - if (!children[0]->getNextTuple(context)) { - return false; - } - curIdx = boundNodeValueVector->state->selVector->selectedPositions[0]; - } while (boundNodeValueVector->isNull(curIdx) || - !addDFSLevelToStackIfParentExtends( - boundNodeValueVector->readNodeOffset(curIdx), 1 /* level */)); - } -} - -bool VarLengthAdjListExtend::addDFSLevelToStackIfParentExtends(uint64_t parent, uint8_t level) { - auto dfsLevelInfo = static_pointer_cast(dfsLevelInfos[level - 1]); - dfsLevelInfo->reset(parent); - ((AdjLists*)storage) - ->initListReadingState(parent, *dfsLevelInfo->listHandle, transaction->getType()); - ((AdjLists*)storage) - ->readValues(transaction, dfsLevelInfo->children.get(), *dfsLevelInfo->listHandle); - if (dfsLevelInfo->children->state->selVector->selectedSize != 0) { - dfsStack.emplace(std::move(dfsLevelInfo)); - return true; - } - return false; -} - -bool VarLengthAdjListExtend::getNextBatchOfNbrNodes( - std::shared_ptr& dfsLevel) const { - if (dfsLevel->listHandle->hasMoreAndSwitchSourceIfNecessary()) { - ((AdjLists*)storage) - ->readValues(transaction, dfsLevel->children.get(), *dfsLevel->listHandle); - return true; - } - return false; -} - -} // namespace processor -} // namespace kuzu diff --git a/src/processor/operator/var_length_extend/var_length_column_extend.cpp b/src/processor/operator/var_length_extend/var_length_column_extend.cpp deleted file mode 100644 index 56e24d94f0..0000000000 --- a/src/processor/operator/var_length_extend/var_length_column_extend.cpp +++ /dev/null @@ -1,84 +0,0 @@ -#include "processor/operator/var_length_extend/var_length_column_extend.h" - -#include "common/types/types.h" - -using namespace kuzu::common; -using namespace kuzu::storage; - -namespace kuzu { -namespace processor { - -void ColumnExtendDFSLevelInfo::reset() { - this->hasBeenExtended = false; - this->hasBeenOutput = false; -} - -void VarLengthColumnExtend::initLocalStateInternal( - ResultSet* resultSet, ExecutionContext* context) { - VarLengthExtend::initLocalStateInternal(resultSet, context); - for (uint8_t i = 0; i < upperBound; i++) { - auto dfsLevelInfo = std::make_shared(i + 1, *context); - // Since we use boundNodeValueVector as the input valueVector and dfsLevelInfo->children as - // the output valueVector to the Column::readValues(), and this function requires that the - // input and output valueVector are in the same dataChunk. As a result, we need to put the - // dfsLevelInfo->children to the boundNodeValueVector's dataChunk. - // We can't add the dfsLevelInfo->children to the boundNodeValueVector's dataChunk in the - // constructor, because the boundNodeValueVector hasn't been initialized in the constructor. - dfsLevelInfo->children->state = boundNodeValueVector->state; - dfsLevelInfos[i] = std::move(dfsLevelInfo); - } -} - -bool VarLengthColumnExtend::getNextTuplesInternal(ExecutionContext* context) { - // This general loop structure and how we fetch more data from the child operator after the - // while(true) loop block is almost the same as that in VarLengthAdjListExtend but there are - // several differences (e.g., we have one less else if branch here), so we are not refactoring. - while (true) { - while (!dfsStack.empty()) { - auto dfsLevelInfo = static_pointer_cast(dfsStack.top()); - if (dfsLevelInfo->level >= lowerBound && dfsLevelInfo->level <= upperBound && - !dfsLevelInfo->hasBeenOutput) { - // It is impossible for the children to have a null value, so we don't need - // to copy the null mask to the nbrNodeValueVector. - auto elementSize = Types::getDataTypeSize(dfsLevelInfo->children->dataType); - memcpy(nbrNodeValueVector->getData() + - elementSize * nbrNodeValueVector->state->selVector->selectedPositions[0], - dfsLevelInfo->children->getData() + - elementSize * - dfsLevelInfo->children->state->selVector->selectedPositions[0], - elementSize); - dfsLevelInfo->hasBeenOutput = true; - return true; - } else if (!dfsLevelInfo->hasBeenExtended && dfsLevelInfo->level != upperBound) { - addDFSLevelToStackIfParentExtends( - dfsLevelInfo->children.get(), dfsLevelInfo->level + 1); - dfsLevelInfo->hasBeenExtended = true; - } else { - dfsStack.pop(); - } - } - do { - if (!children[0]->getNextTuple(context)) { - return false; - } - } while (boundNodeValueVector->isNull( - boundNodeValueVector->state->selVector->selectedPositions[0]) || - !addDFSLevelToStackIfParentExtends(boundNodeValueVector, 1 /* level */)); - } -} - -bool VarLengthColumnExtend::addDFSLevelToStackIfParentExtends( - common::ValueVector* parentValueVector, uint8_t level) { - auto dfsLevelInfo = static_pointer_cast(dfsLevelInfos[level - 1]); - dfsLevelInfo->reset(); - ((Column*)storage)->read(transaction, parentValueVector, dfsLevelInfo->children.get()); - if (!dfsLevelInfo->children->isNull( - parentValueVector->state->selVector->selectedPositions[0])) { - dfsStack.emplace(std::move(dfsLevelInfo)); - return true; - } - return false; -} - -} // namespace processor -} // namespace kuzu diff --git a/src/processor/operator/var_length_extend/var_length_extend.cpp b/src/processor/operator/var_length_extend/var_length_extend.cpp deleted file mode 100644 index e29d24fe8e..0000000000 --- a/src/processor/operator/var_length_extend/var_length_extend.cpp +++ /dev/null @@ -1,12 +0,0 @@ -#include "processor/operator/var_length_extend/var_length_extend.h" - -namespace kuzu { -namespace processor { - -void VarLengthExtend::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { - boundNodeValueVector = resultSet->getValueVector(boundNodeDataPos).get(); - nbrNodeValueVector = resultSet->getValueVector(nbrNodeDataPos).get(); -} - -} // namespace processor -} // namespace kuzu diff --git a/test/runner/CMakeLists.txt b/test/runner/CMakeLists.txt index d71091eb2c..87fd7ab4e1 100644 --- a/test/runner/CMakeLists.txt +++ b/test/runner/CMakeLists.txt @@ -10,4 +10,3 @@ add_kuzu_test(e2e_update_node_test e2e_update_node_test.cpp) add_kuzu_test(e2e_update_rel_test e2e_update_rel_test.cpp) add_kuzu_test(e2e_delete_rel_test e2e_delete_rel_test.cpp) add_kuzu_test(e2e_create_rel_test e2e_create_rel_test.cpp) -add_kuzu_test(e2e_shortest_path_test e2e_shortest_path_test.cpp) \ No newline at end of file diff --git a/test/runner/e2e_shortest_path_test.cpp b/test/runner/e2e_shortest_path_test.cpp deleted file mode 100644 index 09375d6d22..0000000000 --- a/test/runner/e2e_shortest_path_test.cpp +++ /dev/null @@ -1,35 +0,0 @@ -/*#include "graph_test/graph_test.h" - -using ::testing::Test; -using namespace kuzu::testing; - -class SingleSourceShortestPathTest : public DBTest { -public: - std::string getInputDir() override { - return TestHelper::appendKuzuRootPath("dataset/shortest-path-tests/"); - } -}; - -TEST_F(SingleSourceShortestPathTest, BFS_SSSP) { - runTest(TestHelper::appendKuzuRootPath("test/test_files/shortest_path/bfs_sssp.test")); - runTest(TestHelper::appendKuzuRootPath("test/test_files/shortest_path/bfs_sssp_large.test")); -} - -TEST_F(SingleSourceShortestPathTest, SSSP_ExceptionTests) { - auto result = conn->query("MATCH p = (a:person)-[:knows*]->(b:person) WHERE a.fName = " - "'Alice' RETURN a.fName, b.fName, p.length"); - ASSERT_STREQ("Binder exception: Binding query pattern to path variable is only supported for " - "Shortest Path queries.", - result->getErrorMessage().c_str()); - - result = conn->query("MATCH p = (a:person) RETURN a"); - ASSERT_STREQ("Binder exception: Binding path to a single node is not supported.", - result->getErrorMessage().c_str()); - - result = conn->query("MATCH p = (a:person)-[r:knows* SHORTEST]->(b:person) WHERE a.fName = " - "'Alice' RETURN a.fName, b.fName, p.length"); - ASSERT_STREQ( - "Binder exception: Rel variable for Shortest path expression queries are not allowed.", - result->getErrorMessage().c_str()); -} -*/