Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recursive join no path tracking optimizer #1547

Merged
merged 1 commit into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/include/common/vector/auxiliary_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class ListAuxiliaryBuffer : public AuxiliaryBuffer {

list_entry_t addList(uint64_t listSize);

inline void resetSize() { size = 0; }

private:
uint64_t capacity;
uint64_t size;
Expand Down
4 changes: 4 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ class ListVector {
return reinterpret_cast<ListAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())
->addList(listSize);
}
static inline void resetListAuxiliaryBuffer(ValueVector* vector) {
assert(vector->dataType.typeID == VAR_LIST);
reinterpret_cast<ListAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())->resetSize();
}
};

class StructVector {
Expand Down
4 changes: 3 additions & 1 deletion src/include/optimizer/projection_push_down_optimizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class ProjectionPushDownOptimizer : public LogicalOperatorVisitor {
private:
void visitOperator(planner::LogicalOperator* op);

void visitRecursiveExtend(planner::LogicalOperator* op) override;
void visitAccumulate(planner::LogicalOperator* op) override;
void visitFilter(planner::LogicalOperator* op) override;
void visitHashJoin(planner::LogicalOperator* op) override;
Expand All @@ -33,7 +34,7 @@ class ProjectionPushDownOptimizer : public LogicalOperatorVisitor {
void visitDeleteNode(planner::LogicalOperator* op) override;
void visitDeleteRel(planner::LogicalOperator* op) override;

void collectPropertiesInUse(std::shared_ptr<binder::Expression> expression);
void collectExpressionsInUse(std::shared_ptr<binder::Expression> expression);

binder::expression_vector pruneExpressions(const binder::expression_vector& expressions);

Expand All @@ -42,6 +43,7 @@ class ProjectionPushDownOptimizer : public LogicalOperatorVisitor {

private:
binder::expression_set propertiesInUse;
binder::expression_set variablesInUse;
};

} // namespace optimizer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,30 @@ class LogicalRecursiveExtend : public BaseLogicalExtend {
LogicalRecursiveExtend(std::shared_ptr<binder::NodeExpression> boundNode,
std::shared_ptr<binder::NodeExpression> nbrNode, std::shared_ptr<binder::RelExpression> rel,
common::ExtendDirection direction, std::shared_ptr<LogicalOperator> child)
: LogicalRecursiveExtend{std::move(boundNode), std::move(nbrNode), std::move(rel),
direction, true /* trackPath */, std::move(child)} {}
LogicalRecursiveExtend(std::shared_ptr<binder::NodeExpression> boundNode,
std::shared_ptr<binder::NodeExpression> nbrNode, std::shared_ptr<binder::RelExpression> rel,
common::ExtendDirection direction, bool trackPath_, std::shared_ptr<LogicalOperator> child)
: BaseLogicalExtend{LogicalOperatorType::RECURSIVE_EXTEND, std::move(boundNode),
std::move(nbrNode), std::move(rel), direction, std::move(child)} {}
std::move(nbrNode), std::move(rel), direction, std::move(child)},
trackPath_{trackPath_} {}

f_group_pos_set getGroupsPosToFlatten() override;

void computeFactorizedSchema() override;
void computeFlatSchema() override;

inline void disableTrackPath() { trackPath_ = false; }
inline bool trackPath() { return trackPath_; }

inline std::unique_ptr<LogicalOperator> copy() override {
return std::make_unique<LogicalRecursiveExtend>(
boundNode, nbrNode, rel, direction, children[0]->copy());
boundNode, nbrNode, rel, direction, trackPath_, children[0]->copy());
}

private:
bool trackPath_;
};

} // namespace planner
Expand Down
125 changes: 81 additions & 44 deletions src/include/processor/operator/recursive_extend/bfs_state.h
Original file line number Diff line number Diff line change
@@ -1,37 +1,12 @@
#pragma once

#include "frontier.h"
#include "processor/operator/mask.h"

namespace kuzu {
namespace processor {

enum VisitedState : uint8_t {
NOT_VISITED_DST = 0,
VISITED_DST = 1,
NOT_VISITED = 2,
VISITED = 3,
};

struct Frontier {
std::vector<common::offset_t> nodeOffsets;
std::unordered_map<common::offset_t, std::vector<common::offset_t>> bwdEdges;

Frontier() = default;
inline virtual void resetState() {
nodeOffsets.clear();
bwdEdges.clear();
}
inline void addEdge(common::offset_t boundOffset, common::offset_t nbrOffset) {
if (!bwdEdges.contains(nbrOffset)) {
nodeOffsets.push_back(nbrOffset);
bwdEdges.insert({nbrOffset, std::vector<common::offset_t>{}});
}
bwdEdges.at(nbrOffset).push_back(boundOffset);
}
};

struct BaseBFSMorsel {
friend struct FixedLengthPathScanner;
// Static information
common::offset_t maxOffset;
uint8_t lowerBound;
Expand Down Expand Up @@ -69,12 +44,16 @@ struct BaseBFSMorsel {

virtual void resetState();
virtual bool isComplete() = 0;

virtual void markSrc(common::offset_t offset) = 0;
virtual void markVisited(common::offset_t boundOffset, common::offset_t nbrOffset) = 0;
virtual void markVisited(
common::offset_t boundOffset, common::offset_t nbrOffset, uint64_t multiplicity) = 0;

inline void finalizeCurrentLevel() { moveNextLevelAsCurrentLevel(); }

protected:
inline uint64_t getNumNodes() const { return maxOffset + 1; }

inline bool isAllDstTarget() const { return targetDstNodeOffsets.empty(); }
inline bool isCurrentFrontierEmpty() const { return currentFrontier->nodeOffsets.empty(); }
inline bool isUpperBoundReached() const { return currentLevel == upperBound; }
Expand All @@ -90,51 +69,109 @@ struct BaseBFSMorsel {
void moveNextLevelAsCurrentLevel();
};

struct ShortestPathBFSMorsel : public BaseBFSMorsel {
enum VisitedState : uint8_t {
NOT_VISITED_DST = 0,
VISITED_DST = 1,
NOT_VISITED = 2,
VISITED = 3,
};

template<bool trackPath>
struct ShortestPathMorsel : public BaseBFSMorsel {
// Visited state
uint64_t numVisitedDstNodes;
uint8_t* visitedNodes;

ShortestPathBFSMorsel(common::offset_t maxOffset, uint8_t lowerBound, uint8_t upperBound,
ShortestPathMorsel(common::offset_t maxOffset, uint8_t lowerBound, uint8_t upperBound,
NodeOffsetSemiMask* semiMask)
: BaseBFSMorsel{maxOffset, lowerBound, upperBound, semiMask}, numVisitedDstNodes{0} {
visitedNodesBuffer = std::make_unique<uint8_t[]>(getNumNodes() * sizeof(uint8_t));
visitedNodes = visitedNodesBuffer.get();
}
~ShortestPathMorsel() override = default;

inline bool isComplete() override {
inline bool isComplete() final {
return isCurrentFrontierEmpty() || isUpperBoundReached() || isAllDstReached();
}
inline void resetState() override {
inline void resetState() final {
BaseBFSMorsel::resetState();
resetVisitedState();
}
void markSrc(common::offset_t offset) override;
void markVisited(common::offset_t boundOffset, common::offset_t nbrOffset) override;

private:
inline void markSrc(common::offset_t offset) final {
if (visitedNodes[offset] == NOT_VISITED_DST) {
visitedNodes[offset] = VISITED_DST;
numVisitedDstNodes++;
}
currentFrontier->addNode(offset);
}

inline void markVisited(
common::offset_t boundOffset, common::offset_t nbrOffset, uint64_t multiplicity) override {
if (visitedNodes[nbrOffset] == NOT_VISITED_DST) {
visitedNodes[nbrOffset] = VISITED_DST;
numVisitedDstNodes++;
if constexpr (trackPath) {
nextFrontier->addEdge(boundOffset, nbrOffset);
} else {
nextFrontier->addNode(nbrOffset);
}
} else if (visitedNodes[nbrOffset] == NOT_VISITED) {
visitedNodes[nbrOffset] = VISITED;
if constexpr (trackPath) {
nextFrontier->addEdge(boundOffset, nbrOffset);
} else {
nextFrontier->addNode(nbrOffset);
}
}
}

protected:
inline bool isAllDstReached() const { return numVisitedDstNodes == numTargetDstNodes; }
void resetVisitedState();
inline void resetVisitedState() {
numVisitedDstNodes = 0;
if (!isAllDstTarget()) {
std::fill(
visitedNodes, visitedNodes + getNumNodes(), (uint8_t)VisitedState::NOT_VISITED);
for (auto offset : targetDstNodeOffsets) {
visitedNodes[offset] = VisitedState::NOT_VISITED_DST;
}
numTargetDstNodes = targetDstNodeOffsets.size();
} else {
std::fill(
visitedNodes, visitedNodes + getNumNodes(), (uint8_t)VisitedState::NOT_VISITED_DST);
numTargetDstNodes = getNumNodes();
}
}

private:
std::unique_ptr<uint8_t[]> visitedNodesBuffer;
};

struct VariableLengthBFSMorsel : public BaseBFSMorsel {
explicit VariableLengthBFSMorsel(common::offset_t maxOffset, uint8_t lowerBound,
uint8_t upperBound, NodeOffsetSemiMask* semiMask)
template<bool trackPath>
struct VariableLengthMorsel : public BaseBFSMorsel {
VariableLengthMorsel(common::offset_t maxOffset, uint8_t lowerBound, uint8_t upperBound,
NodeOffsetSemiMask* semiMask)
: BaseBFSMorsel{maxOffset, lowerBound, upperBound, semiMask} {}
~VariableLengthMorsel() override = default;

inline void resetState() override {
inline void resetState() final {
BaseBFSMorsel::resetState();
numTargetDstNodes = isAllDstTarget() ? getNumNodes() : targetDstNodeOffsets.size();
}
inline bool isComplete() override { return isCurrentFrontierEmpty() || isUpperBoundReached(); }
inline void markSrc(common::offset_t offset) override {
currentFrontier->nodeOffsets.push_back(offset);
inline bool isComplete() final { return isCurrentFrontierEmpty() || isUpperBoundReached(); }

inline void markSrc(common::offset_t offset) final {
currentFrontier->addNodeWithMultiplicity(offset, 1 /* multiplicity */);
}
inline void markVisited(common::offset_t boundOffset, common::offset_t nbrOffset) override {
nextFrontier->addEdge(boundOffset, nbrOffset);

inline void markVisited(
common::offset_t boundOffset, common::offset_t nbrOffset, uint64_t multiplicity) final {
if constexpr (trackPath) {
nextFrontier->addEdge(boundOffset, nbrOffset);
} else {
nextFrontier->addNodeWithMultiplicity(nbrOffset, multiplicity);
}
}
};

Expand Down
56 changes: 56 additions & 0 deletions src/include/processor/operator/recursive_extend/frontier.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once

#include <unordered_map>

#include "common/types/types_include.h"

namespace kuzu {
namespace processor {

/*
* A Frontier can stores dst node offsets, its multiplicity and its bwd edges. Note that we don't
* need to track all information in BFS computation.
*
* Computation | Information tracked
* Shortest path track path | nodeOffsets & bwdEdges
* Shortest path NOT track path | nodeOffsets
* Var length track path | nodeOffsets & bwdEdges
* Var length NOT track path | nodeOffsets & offsetToMultiplicity
*/
struct Frontier {
std::vector<common::offset_t> nodeOffsets;
std::unordered_map<common::offset_t, std::vector<common::offset_t>> bwdEdges;
std::unordered_map<common::offset_t, uint64_t> offsetToMultiplicity;

inline void resetState() {
nodeOffsets.clear();
bwdEdges.clear();
offsetToMultiplicity.clear();
}

inline void addNode(common::offset_t offset) { nodeOffsets.push_back(offset); }

inline void addEdge(common::offset_t boundOffset, common::offset_t nbrOffset) {
if (!bwdEdges.contains(nbrOffset)) {
nodeOffsets.push_back(nbrOffset);
bwdEdges.insert({nbrOffset, std::vector<common::offset_t>{}});
}
bwdEdges.at(nbrOffset).push_back(boundOffset);
}

inline void addNodeWithMultiplicity(common::offset_t offset, uint64_t multiplicity) {
if (offsetToMultiplicity.contains(offset)) {
offsetToMultiplicity.at(offset) += multiplicity;
} else {
offsetToMultiplicity.insert({offset, multiplicity});
nodeOffsets.push_back(offset);
}
}

inline uint64_t getMultiplicity(common::offset_t offset) const {
return offsetToMultiplicity.empty() ? 1 : offsetToMultiplicity.at(offset);
}
};

} // namespace processor
} // namespace kuzu
Loading