Skip to content

Commit

Permalink
add recursive join no track path optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed May 18, 2023
1 parent d8d5152 commit fa8dcd6
Show file tree
Hide file tree
Showing 18 changed files with 470 additions and 267 deletions.
2 changes: 2 additions & 0 deletions src/include/common/vector/auxiliary_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class ListAuxiliaryBuffer : public AuxiliaryBuffer {

list_entry_t addList(uint64_t listSize);

inline void resetSize() { size = 0; }

private:
uint64_t capacity;
uint64_t size;
Expand Down
4 changes: 4 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ class ListVector {
return reinterpret_cast<ListAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())
->addList(listSize);
}
static inline void resetListAuxiliaryBuffer(ValueVector* vector) {
assert(vector->dataType.typeID == VAR_LIST);
reinterpret_cast<ListAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())->resetSize();
}
};

class StructVector {
Expand Down
4 changes: 3 additions & 1 deletion src/include/optimizer/projection_push_down_optimizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class ProjectionPushDownOptimizer : public LogicalOperatorVisitor {
private:
void visitOperator(planner::LogicalOperator* op);

void visitRecursiveExtend(planner::LogicalOperator* op) override;
void visitAccumulate(planner::LogicalOperator* op) override;
void visitFilter(planner::LogicalOperator* op) override;
void visitHashJoin(planner::LogicalOperator* op) override;
Expand All @@ -33,7 +34,7 @@ class ProjectionPushDownOptimizer : public LogicalOperatorVisitor {
void visitDeleteNode(planner::LogicalOperator* op) override;
void visitDeleteRel(planner::LogicalOperator* op) override;

void collectPropertiesInUse(std::shared_ptr<binder::Expression> expression);
void collectExpressionsInUse(std::shared_ptr<binder::Expression> expression);

binder::expression_vector pruneExpressions(const binder::expression_vector& expressions);

Expand All @@ -42,6 +43,7 @@ class ProjectionPushDownOptimizer : public LogicalOperatorVisitor {

private:
binder::expression_set propertiesInUse;
binder::expression_set variablesInUse;
};

} // namespace optimizer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,30 @@ class LogicalRecursiveExtend : public BaseLogicalExtend {
LogicalRecursiveExtend(std::shared_ptr<binder::NodeExpression> boundNode,
std::shared_ptr<binder::NodeExpression> nbrNode, std::shared_ptr<binder::RelExpression> rel,
common::ExtendDirection direction, std::shared_ptr<LogicalOperator> child)
: LogicalRecursiveExtend{std::move(boundNode), std::move(nbrNode), std::move(rel),
direction, true /* trackPath */, std::move(child)} {}
LogicalRecursiveExtend(std::shared_ptr<binder::NodeExpression> boundNode,
std::shared_ptr<binder::NodeExpression> nbrNode, std::shared_ptr<binder::RelExpression> rel,
common::ExtendDirection direction, bool trackPath_, std::shared_ptr<LogicalOperator> child)
: BaseLogicalExtend{LogicalOperatorType::RECURSIVE_EXTEND, std::move(boundNode),
std::move(nbrNode), std::move(rel), direction, std::move(child)} {}
std::move(nbrNode), std::move(rel), direction, std::move(child)},
trackPath_{trackPath_} {}

f_group_pos_set getGroupsPosToFlatten() override;

void computeFactorizedSchema() override;
void computeFlatSchema() override;

inline void disableTrackPath() { trackPath_ = false; }
inline bool trackPath() { return trackPath_; }

inline std::unique_ptr<LogicalOperator> copy() override {
return std::make_unique<LogicalRecursiveExtend>(
boundNode, nbrNode, rel, direction, children[0]->copy());
boundNode, nbrNode, rel, direction, trackPath_, children[0]->copy());
}

private:
bool trackPath_;
};

} // namespace planner
Expand Down
125 changes: 81 additions & 44 deletions src/include/processor/operator/recursive_extend/bfs_state.h
Original file line number Diff line number Diff line change
@@ -1,37 +1,12 @@
#pragma once

#include "frontier.h"
#include "processor/operator/mask.h"

namespace kuzu {
namespace processor {

enum VisitedState : uint8_t {
NOT_VISITED_DST = 0,
VISITED_DST = 1,
NOT_VISITED = 2,
VISITED = 3,
};

struct Frontier {
std::vector<common::offset_t> nodeOffsets;
std::unordered_map<common::offset_t, std::vector<common::offset_t>> bwdEdges;

Frontier() = default;
inline virtual void resetState() {
nodeOffsets.clear();
bwdEdges.clear();
}
inline void addEdge(common::offset_t boundOffset, common::offset_t nbrOffset) {
if (!bwdEdges.contains(nbrOffset)) {
nodeOffsets.push_back(nbrOffset);
bwdEdges.insert({nbrOffset, std::vector<common::offset_t>{}});
}
bwdEdges.at(nbrOffset).push_back(boundOffset);
}
};

struct BaseBFSMorsel {
friend struct FixedLengthPathScanner;
// Static information
common::offset_t maxOffset;
uint8_t lowerBound;
Expand Down Expand Up @@ -69,12 +44,16 @@ struct BaseBFSMorsel {

virtual void resetState();
virtual bool isComplete() = 0;

virtual void markSrc(common::offset_t offset) = 0;
virtual void markVisited(common::offset_t boundOffset, common::offset_t nbrOffset) = 0;
virtual void markVisited(
common::offset_t boundOffset, common::offset_t nbrOffset, uint64_t multiplicity) = 0;

inline void finalizeCurrentLevel() { moveNextLevelAsCurrentLevel(); }

protected:
inline uint64_t getNumNodes() const { return maxOffset + 1; }

inline bool isAllDstTarget() const { return targetDstNodeOffsets.empty(); }
inline bool isCurrentFrontierEmpty() const { return currentFrontier->nodeOffsets.empty(); }
inline bool isUpperBoundReached() const { return currentLevel == upperBound; }
Expand All @@ -90,51 +69,109 @@ struct BaseBFSMorsel {
void moveNextLevelAsCurrentLevel();
};

struct ShortestPathBFSMorsel : public BaseBFSMorsel {
enum VisitedState : uint8_t {
NOT_VISITED_DST = 0,
VISITED_DST = 1,
NOT_VISITED = 2,
VISITED = 3,
};

template<bool trackPath>
struct ShortestPathMorsel : public BaseBFSMorsel {
// Visited state
uint64_t numVisitedDstNodes;
uint8_t* visitedNodes;

ShortestPathBFSMorsel(common::offset_t maxOffset, uint8_t lowerBound, uint8_t upperBound,
ShortestPathMorsel(common::offset_t maxOffset, uint8_t lowerBound, uint8_t upperBound,
NodeOffsetSemiMask* semiMask)
: BaseBFSMorsel{maxOffset, lowerBound, upperBound, semiMask}, numVisitedDstNodes{0} {
visitedNodesBuffer = std::make_unique<uint8_t[]>(getNumNodes() * sizeof(uint8_t));
visitedNodes = visitedNodesBuffer.get();
}
~ShortestPathMorsel() override = default;

inline bool isComplete() override {
inline bool isComplete() final {
return isCurrentFrontierEmpty() || isUpperBoundReached() || isAllDstReached();
}
inline void resetState() override {
inline void resetState() final {
BaseBFSMorsel::resetState();
resetVisitedState();
}
void markSrc(common::offset_t offset) override;
void markVisited(common::offset_t boundOffset, common::offset_t nbrOffset) override;

private:
inline void markSrc(common::offset_t offset) final {
if (visitedNodes[offset] == NOT_VISITED_DST) {
visitedNodes[offset] = VISITED_DST;
numVisitedDstNodes++;
}
currentFrontier->addNode(offset);
}

inline void markVisited(
common::offset_t boundOffset, common::offset_t nbrOffset, uint64_t multiplicity) override {
if (visitedNodes[nbrOffset] == NOT_VISITED_DST) {
visitedNodes[nbrOffset] = VISITED_DST;
numVisitedDstNodes++;
if constexpr (trackPath) {
nextFrontier->addEdge(boundOffset, nbrOffset);
} else {
nextFrontier->addNode(nbrOffset);
}
} else if (visitedNodes[nbrOffset] == NOT_VISITED) {
visitedNodes[nbrOffset] = VISITED;
if constexpr (trackPath) {
nextFrontier->addEdge(boundOffset, nbrOffset);
} else {
nextFrontier->addNode(nbrOffset);
}
}
}

protected:
inline bool isAllDstReached() const { return numVisitedDstNodes == numTargetDstNodes; }
void resetVisitedState();
inline void resetVisitedState() {
numVisitedDstNodes = 0;
if (!isAllDstTarget()) {
std::fill(
visitedNodes, visitedNodes + getNumNodes(), (uint8_t)VisitedState::NOT_VISITED);
for (auto offset : targetDstNodeOffsets) {
visitedNodes[offset] = VisitedState::NOT_VISITED_DST;
}
numTargetDstNodes = targetDstNodeOffsets.size();
} else {
std::fill(
visitedNodes, visitedNodes + getNumNodes(), (uint8_t)VisitedState::NOT_VISITED_DST);
numTargetDstNodes = getNumNodes();
}
}

private:
std::unique_ptr<uint8_t[]> visitedNodesBuffer;
};

struct VariableLengthBFSMorsel : public BaseBFSMorsel {
explicit VariableLengthBFSMorsel(common::offset_t maxOffset, uint8_t lowerBound,
uint8_t upperBound, NodeOffsetSemiMask* semiMask)
template<bool trackPath>
struct VariableLengthMorsel : public BaseBFSMorsel {
VariableLengthMorsel(common::offset_t maxOffset, uint8_t lowerBound, uint8_t upperBound,
NodeOffsetSemiMask* semiMask)
: BaseBFSMorsel{maxOffset, lowerBound, upperBound, semiMask} {}
~VariableLengthMorsel() override = default;

inline void resetState() override {
inline void resetState() final {
BaseBFSMorsel::resetState();
numTargetDstNodes = isAllDstTarget() ? getNumNodes() : targetDstNodeOffsets.size();
}
inline bool isComplete() override { return isCurrentFrontierEmpty() || isUpperBoundReached(); }
inline void markSrc(common::offset_t offset) override {
currentFrontier->nodeOffsets.push_back(offset);
inline bool isComplete() final { return isCurrentFrontierEmpty() || isUpperBoundReached(); }

inline void markSrc(common::offset_t offset) final {
currentFrontier->addNodeWithMultiplicity(offset, 1 /* multiplicity */);
}
inline void markVisited(common::offset_t boundOffset, common::offset_t nbrOffset) override {
nextFrontier->addEdge(boundOffset, nbrOffset);

inline void markVisited(
common::offset_t boundOffset, common::offset_t nbrOffset, uint64_t multiplicity) final {
if constexpr (trackPath) {
nextFrontier->addEdge(boundOffset, nbrOffset);
} else {
nextFrontier->addNodeWithMultiplicity(nbrOffset, multiplicity);
}
}
};

Expand Down
56 changes: 56 additions & 0 deletions src/include/processor/operator/recursive_extend/frontier.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once

#include <unordered_map>

#include "common/types/types_include.h"

namespace kuzu {
namespace processor {

/*
* A Frontier can stores dst node offsets, its multiplicity and its bwd edges. Note that we don't
* need to track all information in BFS computation.
*
* Computation | Information tracked
* Shortest path track path | nodeOffsets & bwdEdges
* Shortest path NOT track path | nodeOffsets
* Var length track path | nodeOffsets & bwdEdges
* Var length NOT track path | nodeOffsets & offsetToMultiplicity
*/
struct Frontier {
std::vector<common::offset_t> nodeOffsets;
std::unordered_map<common::offset_t, std::vector<common::offset_t>> bwdEdges;
std::unordered_map<common::offset_t, uint64_t> offsetToMultiplicity;

inline void resetState() {
nodeOffsets.clear();
bwdEdges.clear();
offsetToMultiplicity.clear();
}

inline void addNode(common::offset_t offset) { nodeOffsets.push_back(offset); }

inline void addEdge(common::offset_t boundOffset, common::offset_t nbrOffset) {
if (!bwdEdges.contains(nbrOffset)) {
nodeOffsets.push_back(nbrOffset);
bwdEdges.insert({nbrOffset, std::vector<common::offset_t>{}});
}
bwdEdges.at(nbrOffset).push_back(boundOffset);
}

inline void addNodeWithMultiplicity(common::offset_t offset, uint64_t multiplicity) {
if (offsetToMultiplicity.contains(offset)) {
offsetToMultiplicity.at(offset) += multiplicity;
} else {
offsetToMultiplicity.insert({offset, multiplicity});
nodeOffsets.push_back(offset);
}
}

inline uint64_t getMultiplicity(common::offset_t offset) const {
return offsetToMultiplicity.empty() ? 1 : offsetToMultiplicity.at(offset);
}
};

} // namespace processor
} // namespace kuzu
Loading

0 comments on commit fa8dcd6

Please sign in to comment.