Skip to content

Commit

Permalink
Add sip for shortest path
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Apr 29, 2023
1 parent fdd77ba commit 77e0463
Show file tree
Hide file tree
Showing 21 changed files with 521 additions and 331 deletions.
30 changes: 19 additions & 11 deletions src/include/optimizer/acc_hash_join_optimizer.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

#include "logical_operator_visitor.h"
#include "planner/logical_plan/logical_plan.h"

Expand All @@ -6,7 +8,7 @@ namespace optimizer {

// This optimizer enables the Accumulated hash join algorithm as introduced in paper "Kuzu Graph
// Database Management System".
class AccHashJoinOptimizer : public LogicalOperatorVisitor {
class HashJoinSIPOptimizer : public LogicalOperatorVisitor {
public:
void rewrite(planner::LogicalPlan* plan);

Expand All @@ -22,16 +24,22 @@ class AccHashJoinOptimizer : public LogicalOperatorVisitor {

bool isProbeSideQualified(planner::LogicalOperator* probeRoot);

binder::expression_map<std::vector<planner::LogicalOperator*>> resolveScanNodesToApplySemiMask(
const binder::expression_vector& nodeIDCandidates,
const std::vector<planner::LogicalOperator*>& roots);

std::shared_ptr<planner::LogicalOperator> applySemiMasks(
const binder::expression_map<std::vector<planner::LogicalOperator*>>& nodeIDToScanNodes,
std::shared_ptr<planner::LogicalOperator> root);
void applyAccHashJoin(
const binder::expression_map<std::vector<planner::LogicalOperator*>>& nodeIDToScanNodes,
planner::LogicalOperator* op);
std::vector<planner::LogicalOperator*> resolveOperatorsToApplySemiMask(
const binder::Expression& nodeID, planner::LogicalOperator* root);
// Find all ScanNodeIDs under root which scans parameter nodeID. Note that there might be
// multiple ScanNodeIDs matches because both node and rel table scans will trigger scanNodeIDs.
std::vector<planner::LogicalOperator*> resolveScanNodeIDsToApplySemiMask(
const binder::Expression& nodeID, planner::LogicalOperator* root);
// Find all ShortestPathExtend under root which extend to parameter nodeID. There will be at
// most one match because rel table is scanned exactly once.
std::vector<planner::LogicalOperator*> resolveShortestPathExtendToApplySemiMask(
const binder::Expression& nodeID, planner::LogicalOperator* root);

std::shared_ptr<planner::LogicalOperator> appendSemiMask(
std::shared_ptr<binder::Expression> nodeID, std::vector<planner::LogicalOperator*> ops,
std::shared_ptr<planner::LogicalOperator> child);
std::shared_ptr<planner::LogicalOperator> appendAccumulate(
std::shared_ptr<planner::LogicalOperator> child);
};

} // namespace optimizer
Expand Down
5 changes: 5 additions & 0 deletions src/include/optimizer/logical_operator_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,10 @@ class LogicalIndexScanNodeCollector : public LogicalOperatorCollector {
void visitIndexScanNode(planner::LogicalOperator* op) override { ops.push_back(op); }
};

class LogicalRecursiveExtendCollector : public LogicalOperatorCollector {
protected:
void visitRecursiveExtend(planner::LogicalOperator* op) override { ops.push_back(op); }
};

} // namespace optimizer
} // namespace kuzu
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,32 @@

#include "base_logical_operator.h"
#include "binder/expression/node_expression.h"
#include "logical_scan_node.h"

namespace kuzu {
namespace planner {

class LogicalSemiMasker : public LogicalOperator {
public:
LogicalSemiMasker(std::shared_ptr<binder::Expression> nodeID,
std::vector<LogicalOperator*> scanNodes, std::shared_ptr<LogicalOperator> child)
LogicalSemiMasker(std::shared_ptr<binder::NodeExpression> node,
std::vector<LogicalOperator*> ops, std::shared_ptr<LogicalOperator> child)
: LogicalOperator{LogicalOperatorType::SEMI_MASKER, std::move(child)},
nodeID{std::move(nodeID)}, scanNodes{std::move(scanNodes)} {}
node{std::move(node)}, ops{std::move(ops)} {}

inline void computeFactorizedSchema() override { copyChildSchema(0); }
inline void computeFlatSchema() override { copyChildSchema(0); }

inline std::string getExpressionsForPrinting() const override { return nodeID->toString(); }
inline std::string getExpressionsForPrinting() const override { return node->toString(); }

inline std::shared_ptr<binder::Expression> getNodeID() const { return nodeID; }
inline bool isMultiLabel() const {
return ((LogicalScanNode*)scanNodes[0])->getNode()->isMultiLabeled();
}
inline std::vector<LogicalOperator*> getScanNodes() const { return scanNodes; }
inline std::shared_ptr<binder::NodeExpression> getNode() const { return node; }
inline std::vector<LogicalOperator*> getOperators() const { return ops; }

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalSemiMasker>(nodeID, scanNodes, children[0]->copy());
return make_unique<LogicalSemiMasker>(node, ops, children[0]->copy());
}

private:
std::shared_ptr<binder::Expression> nodeID;
std::vector<LogicalOperator*> scanNodes;
std::shared_ptr<binder::NodeExpression> node;
std::vector<LogicalOperator*> ops;
};

} // namespace planner
Expand Down
143 changes: 143 additions & 0 deletions src/include/processor/operator/mask.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#pragma once

#include <mutex>

#include "storage/store/node_table.h"

namespace kuzu {
namespace processor {

// Note: Classes in this file are NOT thread-safe.
struct MaskUtil {
static inline common::offset_t getMorselIdx(common::offset_t offset) {
return offset >> common::DEFAULT_VECTOR_CAPACITY_LOG_2;
}
};

struct MaskData {
uint8_t* data;

explicit MaskData(uint64_t size) {
dataBuffer = std::make_unique<uint8_t[]>(size);
data = dataBuffer.get();
std::fill(data, data + size, 0);
}

inline void setMask(uint64_t pos, uint8_t maskValue) { data[pos] = maskValue; }
inline bool isMasked(uint64_t pos, uint8_t trueMaskVal) { return data[pos] == trueMaskVal; }

private:
std::unique_ptr<uint8_t[]> dataBuffer;
};

// MaskCollection represents multiple mask on the same domain with AND semantic.
class MaskCollection {
public:
MaskCollection() : numMasks{0} {}

inline void init(common::offset_t maxOffset) {
if (maskData != nullptr) { // MaskCollection might be initialized repeatedly.
return;
}
maskData = std::make_unique<MaskData>(maxOffset + 1);
}

inline bool isMasked(common::offset_t offset) { return maskData->isMasked(offset, numMasks); }
// Increment mask value for the given nodeOffset if its current mask value is equal to
// the specified `currentMaskValue`.
inline void incrementMaskValue(common::offset_t offset, uint8_t currentMaskValue) {
if (maskData->isMasked(offset, currentMaskValue)) {
maskData->setMask(offset, currentMaskValue + 1);
}
}

inline uint8_t getNumMasks() const { return numMasks; }
inline void incrementNumMasks() { numMasks++; }

private:
std::unique_ptr<MaskData> maskData;
uint8_t numMasks;
};

class NodeSemiMask {
public:
NodeSemiMask(storage::NodeTable* nodeTable) : nodeTable{nodeTable} {}

virtual void init(transaction::Transaction* trx) = 0;

virtual void incrementMaskValue(common::offset_t nodeOffset, uint8_t currentMaskValue) = 0;

virtual uint8_t getNumMasks() const = 0;
virtual void incrementNumMasks() = 0;

inline bool isEnabled() { return getNumMasks() > 0; }

protected:
storage::NodeTable* nodeTable;
};

class NodeOffsetSemiMask : public NodeSemiMask {
public:
NodeOffsetSemiMask(storage::NodeTable* nodeTable) : NodeSemiMask{nodeTable} {
offsetMask = std::make_unique<MaskCollection>();
}

inline void init(transaction::Transaction* trx) override {
offsetMask->init(nodeTable->getMaxNodeOffset(trx) + 1);
}

inline void incrementMaskValue(common::offset_t nodeOffset, uint8_t currentMaskValue) override {
offsetMask->incrementMaskValue(nodeOffset, currentMaskValue);
}

inline uint8_t getNumMasks() const override { return offsetMask->getNumMasks(); }
inline void incrementNumMasks() override { offsetMask->incrementNumMasks(); }

inline bool isNodeMasked(common::offset_t nodeOffset) {
return offsetMask->isMasked(nodeOffset);
}

private:
std::unique_ptr<MaskCollection> offsetMask;
};

class NodeOffsetAndMorselSemiMask : public NodeSemiMask {
public:
NodeOffsetAndMorselSemiMask(storage::NodeTable* nodeTable) : NodeSemiMask{nodeTable} {
offsetMask = std::make_unique<MaskCollection>();
morselMask = std::make_unique<MaskCollection>();
}

inline void init(transaction::Transaction* trx) override {
auto maxNodeOffset = nodeTable->getMaxNodeOffset(trx);
offsetMask->init(maxNodeOffset + 1);
morselMask->init(MaskUtil::getMorselIdx(maxNodeOffset) + 1);
}

// Note: blindly update mask does not parallelize well, so we minimize write by first checking
// if the mask is set to true (mask value is equal to the expected currentMaskValue) or not.
inline void incrementMaskValue(uint64_t nodeOffset, uint8_t currentMaskValue) override {
offsetMask->incrementMaskValue(nodeOffset, currentMaskValue);
morselMask->incrementMaskValue(MaskUtil::getMorselIdx(nodeOffset), currentMaskValue);
}

inline uint8_t getNumMasks() const override { return offsetMask->getNumMasks(); }
inline void incrementNumMasks() override {
offsetMask->incrementNumMasks();
morselMask->incrementNumMasks();
}

inline bool isMorselMasked(common::offset_t morselIdx) {
return morselMask->isMasked(morselIdx);
}
inline bool isNodeMasked(common::offset_t nodeOffset) {
return offsetMask->isMasked(nodeOffset);
}

private:
std::unique_ptr<MaskCollection> offsetMask;
std::unique_ptr<MaskCollection> morselMask;
};

} // namespace processor
} // namespace kuzu
2 changes: 1 addition & 1 deletion src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ enum class PhysicalOperatorType : uint8_t {
RENAME_PROPERTY,
RENAME_TABLE,
RESULT_COLLECTOR,
SCAN_BFS_LEVEL,
SCAN_FRONTIER,
SCAN_NODE_ID,
SCAN_NODE_PROPERTY,
SCAN_REL_PROPERTY,
Expand Down
Loading

0 comments on commit 77e0463

Please sign in to comment.