Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sideway information passing for shortest path extend #1502

Merged
merged 1 commit into from
Apr 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions src/include/optimizer/acc_hash_join_optimizer.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

#include "logical_operator_visitor.h"
#include "planner/logical_plan/logical_plan.h"

Expand All @@ -6,7 +8,7 @@ namespace optimizer {

// This optimizer enables the Accumulated hash join algorithm as introduced in paper "Kuzu Graph
// Database Management System".
class AccHashJoinOptimizer : public LogicalOperatorVisitor {
class HashJoinSIPOptimizer : public LogicalOperatorVisitor {
public:
void rewrite(planner::LogicalPlan* plan);

Expand All @@ -22,16 +24,22 @@ class AccHashJoinOptimizer : public LogicalOperatorVisitor {

bool isProbeSideQualified(planner::LogicalOperator* probeRoot);

binder::expression_map<std::vector<planner::LogicalOperator*>> resolveScanNodesToApplySemiMask(
const binder::expression_vector& nodeIDCandidates,
const std::vector<planner::LogicalOperator*>& roots);

std::shared_ptr<planner::LogicalOperator> applySemiMasks(
const binder::expression_map<std::vector<planner::LogicalOperator*>>& nodeIDToScanNodes,
std::shared_ptr<planner::LogicalOperator> root);
void applyAccHashJoin(
const binder::expression_map<std::vector<planner::LogicalOperator*>>& nodeIDToScanNodes,
planner::LogicalOperator* op);
std::vector<planner::LogicalOperator*> resolveOperatorsToApplySemiMask(
const binder::Expression& nodeID, planner::LogicalOperator* root);
// Find all ScanNodeIDs under root which scans parameter nodeID. Note that there might be
// multiple ScanNodeIDs matches because both node and rel table scans will trigger scanNodeIDs.
std::vector<planner::LogicalOperator*> resolveScanNodeIDsToApplySemiMask(
const binder::Expression& nodeID, planner::LogicalOperator* root);
// Find all ShortestPathExtend under root which extend to parameter nodeID. There will be at
// most one match because rel table is scanned exactly once.
std::vector<planner::LogicalOperator*> resolveShortestPathExtendToApplySemiMask(
const binder::Expression& nodeID, planner::LogicalOperator* root);

std::shared_ptr<planner::LogicalOperator> appendSemiMask(
std::shared_ptr<binder::Expression> nodeID, std::vector<planner::LogicalOperator*> ops,
std::shared_ptr<planner::LogicalOperator> child);
std::shared_ptr<planner::LogicalOperator> appendAccumulate(
std::shared_ptr<planner::LogicalOperator> child);
};

} // namespace optimizer
Expand Down
5 changes: 5 additions & 0 deletions src/include/optimizer/logical_operator_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,10 @@ class LogicalIndexScanNodeCollector : public LogicalOperatorCollector {
void visitIndexScanNode(planner::LogicalOperator* op) override { ops.push_back(op); }
};

class LogicalRecursiveExtendCollector : public LogicalOperatorCollector {
protected:
void visitRecursiveExtend(planner::LogicalOperator* op) override { ops.push_back(op); }
};

} // namespace optimizer
} // namespace kuzu
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,32 @@

#include "base_logical_operator.h"
#include "binder/expression/node_expression.h"
#include "logical_scan_node.h"

namespace kuzu {
namespace planner {

class LogicalSemiMasker : public LogicalOperator {
public:
LogicalSemiMasker(std::shared_ptr<binder::Expression> nodeID,
std::vector<LogicalOperator*> scanNodes, std::shared_ptr<LogicalOperator> child)
LogicalSemiMasker(std::shared_ptr<binder::NodeExpression> node,
std::vector<LogicalOperator*> ops, std::shared_ptr<LogicalOperator> child)
: LogicalOperator{LogicalOperatorType::SEMI_MASKER, std::move(child)},
nodeID{std::move(nodeID)}, scanNodes{std::move(scanNodes)} {}
node{std::move(node)}, ops{std::move(ops)} {}

inline void computeFactorizedSchema() override { copyChildSchema(0); }
inline void computeFlatSchema() override { copyChildSchema(0); }

inline std::string getExpressionsForPrinting() const override { return nodeID->toString(); }
inline std::string getExpressionsForPrinting() const override { return node->toString(); }

inline std::shared_ptr<binder::Expression> getNodeID() const { return nodeID; }
inline bool isMultiLabel() const {
return ((LogicalScanNode*)scanNodes[0])->getNode()->isMultiLabeled();
}
inline std::vector<LogicalOperator*> getScanNodes() const { return scanNodes; }
inline std::shared_ptr<binder::NodeExpression> getNode() const { return node; }
inline std::vector<LogicalOperator*> getOperators() const { return ops; }

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalSemiMasker>(nodeID, scanNodes, children[0]->copy());
return make_unique<LogicalSemiMasker>(node, ops, children[0]->copy());
}

private:
std::shared_ptr<binder::Expression> nodeID;
std::vector<LogicalOperator*> scanNodes;
std::shared_ptr<binder::NodeExpression> node;
std::vector<LogicalOperator*> ops;
};

} // namespace planner
Expand Down
143 changes: 143 additions & 0 deletions src/include/processor/operator/mask.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#pragma once

#include <mutex>

#include "storage/store/node_table.h"

namespace kuzu {
namespace processor {

// Note: Classes in this file are NOT thread-safe.
struct MaskUtil {
static inline common::offset_t getMorselIdx(common::offset_t offset) {
return offset >> common::DEFAULT_VECTOR_CAPACITY_LOG_2;
}
};

struct MaskData {
uint8_t* data;

explicit MaskData(uint64_t size) {
dataBuffer = std::make_unique<uint8_t[]>(size);
data = dataBuffer.get();
std::fill(data, data + size, 0);
}

inline void setMask(uint64_t pos, uint8_t maskValue) { data[pos] = maskValue; }
inline bool isMasked(uint64_t pos, uint8_t trueMaskVal) { return data[pos] == trueMaskVal; }

private:
std::unique_ptr<uint8_t[]> dataBuffer;
};

// MaskCollection represents multiple mask on the same domain with AND semantic.
class MaskCollection {
public:
MaskCollection() : numMasks{0} {}

inline void init(common::offset_t maxOffset) {
if (maskData != nullptr) { // MaskCollection might be initialized repeatedly.
return;
}
maskData = std::make_unique<MaskData>(maxOffset + 1);
}

inline bool isMasked(common::offset_t offset) { return maskData->isMasked(offset, numMasks); }
// Increment mask value for the given nodeOffset if its current mask value is equal to
// the specified `currentMaskValue`.
inline void incrementMaskValue(common::offset_t offset, uint8_t currentMaskValue) {
if (maskData->isMasked(offset, currentMaskValue)) {
maskData->setMask(offset, currentMaskValue + 1);
}
}

inline uint8_t getNumMasks() const { return numMasks; }
inline void incrementNumMasks() { numMasks++; }

private:
std::unique_ptr<MaskData> maskData;
uint8_t numMasks;
andyfengHKU marked this conversation as resolved.
Show resolved Hide resolved
};

class NodeSemiMask {
public:
NodeSemiMask(storage::NodeTable* nodeTable) : nodeTable{nodeTable} {}

virtual void init(transaction::Transaction* trx) = 0;

virtual void incrementMaskValue(common::offset_t nodeOffset, uint8_t currentMaskValue) = 0;

virtual uint8_t getNumMasks() const = 0;
virtual void incrementNumMasks() = 0;

inline bool isEnabled() { return getNumMasks() > 0; }

protected:
storage::NodeTable* nodeTable;
};

class NodeOffsetSemiMask : public NodeSemiMask {
public:
NodeOffsetSemiMask(storage::NodeTable* nodeTable) : NodeSemiMask{nodeTable} {
offsetMask = std::make_unique<MaskCollection>();
}

inline void init(transaction::Transaction* trx) override {
offsetMask->init(nodeTable->getMaxNodeOffset(trx) + 1);
}

inline void incrementMaskValue(common::offset_t nodeOffset, uint8_t currentMaskValue) override {
offsetMask->incrementMaskValue(nodeOffset, currentMaskValue);
}

inline uint8_t getNumMasks() const override { return offsetMask->getNumMasks(); }
inline void incrementNumMasks() override { offsetMask->incrementNumMasks(); }

inline bool isNodeMasked(common::offset_t nodeOffset) {
return offsetMask->isMasked(nodeOffset);
}

private:
std::unique_ptr<MaskCollection> offsetMask;
};

class NodeOffsetAndMorselSemiMask : public NodeSemiMask {
public:
NodeOffsetAndMorselSemiMask(storage::NodeTable* nodeTable) : NodeSemiMask{nodeTable} {
offsetMask = std::make_unique<MaskCollection>();
morselMask = std::make_unique<MaskCollection>();
}

inline void init(transaction::Transaction* trx) override {
auto maxNodeOffset = nodeTable->getMaxNodeOffset(trx);
offsetMask->init(maxNodeOffset + 1);
morselMask->init(MaskUtil::getMorselIdx(maxNodeOffset) + 1);
}

// Note: blindly update mask does not parallelize well, so we minimize write by first checking
// if the mask is set to true (mask value is equal to the expected currentMaskValue) or not.
inline void incrementMaskValue(uint64_t nodeOffset, uint8_t currentMaskValue) override {
offsetMask->incrementMaskValue(nodeOffset, currentMaskValue);
morselMask->incrementMaskValue(MaskUtil::getMorselIdx(nodeOffset), currentMaskValue);
}

inline uint8_t getNumMasks() const override { return offsetMask->getNumMasks(); }
inline void incrementNumMasks() override {
offsetMask->incrementNumMasks();
morselMask->incrementNumMasks();
}

inline bool isMorselMasked(common::offset_t morselIdx) {
return morselMask->isMasked(morselIdx);
}
inline bool isNodeMasked(common::offset_t nodeOffset) {
return offsetMask->isMasked(nodeOffset);
}

private:
std::unique_ptr<MaskCollection> offsetMask;
std::unique_ptr<MaskCollection> morselMask;
};

} // namespace processor
} // namespace kuzu
2 changes: 1 addition & 1 deletion src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ enum class PhysicalOperatorType : uint8_t {
RENAME_PROPERTY,
RENAME_TABLE,
RESULT_COLLECTOR,
SCAN_BFS_LEVEL,
SCAN_FRONTIER,
SCAN_NODE_ID,
SCAN_NODE_PROPERTY,
SCAN_REL_PROPERTY,
Expand Down
Loading