Skip to content

Commit

Permalink
add path semi mask
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Jun 22, 2023
1 parent 527f5e9 commit 8d12790
Show file tree
Hide file tree
Showing 31 changed files with 540 additions and 284 deletions.
2 changes: 1 addition & 1 deletion src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ struct PlannerKnobs {
static constexpr uint64_t BUILD_PENALTY = 2;
// Avoid doing probe to build SIP if we have to accumulate a probe side that is much bigger than
// build side. Also avoid doing build to probe SIP if probe side is not much bigger than build.
static constexpr uint64_t ACC_HJ_PROBE_BUILD_RATIO = 5;
static constexpr uint64_t SIP_RATIO = 5;
};

struct ClientContextConstants {
Expand Down
10 changes: 8 additions & 2 deletions src/include/optimizer/acc_hash_join_optimizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class HashJoinSIPOptimizer : public LogicalOperatorVisitor {

void visitIntersect(planner::LogicalOperator* op) override;

void visitPathPropertyProbe(planner::LogicalOperator* op) override;

bool isProbeSideQualified(planner::LogicalOperator* probeRoot);

std::vector<planner::LogicalOperator*> resolveOperatorsToApplySemiMask(
Expand All @@ -35,8 +37,12 @@ class HashJoinSIPOptimizer : public LogicalOperatorVisitor {
std::vector<planner::LogicalOperator*> resolveShortestPathExtendToApplySemiMask(
const binder::Expression& nodeID, planner::LogicalOperator* root);

std::shared_ptr<planner::LogicalOperator> appendSemiMask(
std::vector<planner::LogicalOperator*> ops,
std::shared_ptr<planner::LogicalOperator> appendNodeSemiMasker(
std::vector<planner::LogicalOperator*> opsToApplySemiMask,
std::shared_ptr<planner::LogicalOperator> child);
std::shared_ptr<planner::LogicalOperator> appendPathSemiMasker(
std::shared_ptr<binder::Expression> pathExpression,
std::vector<planner::LogicalOperator*> opsToApplySemiMask,
std::shared_ptr<planner::LogicalOperator> child);
std::shared_ptr<planner::LogicalOperator> appendAccumulate(
std::shared_ptr<planner::LogicalOperator> child);
Expand Down
6 changes: 6 additions & 0 deletions src/include/optimizer/logical_operator_visitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ class LogicalOperatorVisitor {
return op;
}

virtual void visitPathPropertyProbe(planner::LogicalOperator* op) {}
virtual std::shared_ptr<planner::LogicalOperator> visitPathPropertyProbeReplace(
std::shared_ptr<planner::LogicalOperator> op) {
return op;
}

virtual void visitHashJoin(planner::LogicalOperator* op) {}
virtual std::shared_ptr<planner::LogicalOperator> visitHashJoinReplace(
std::shared_ptr<planner::LogicalOperator> op) {
Expand Down
2 changes: 1 addition & 1 deletion src/include/optimizer/projection_push_down_optimizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class ProjectionPushDownOptimizer : public LogicalOperatorVisitor {
private:
void visitOperator(planner::LogicalOperator* op);

void visitRecursiveExtend(planner::LogicalOperator* op) override;
void visitPathPropertyProbe(planner::LogicalOperator* op) override;
void visitExtend(planner::LogicalOperator* op) override;
void visitAccumulate(planner::LogicalOperator* op) override;
void visitFilter(planner::LogicalOperator* op) override;
Expand Down
4 changes: 2 additions & 2 deletions src/include/planner/join_order_enumerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ class JoinOrderEnumerator {
void createRecursivePlan(std::shared_ptr<NodeExpression> boundNode,
std::shared_ptr<NodeExpression> recursiveNode, std::shared_ptr<RelExpression> recursiveRel,
ExtendDirection direction, LogicalPlan& plan);
void createRecursiveNodePropertyScanPlan(
void createPathNodePropertyScanPlan(
std::shared_ptr<NodeExpression> recursiveNode, LogicalPlan& plan);
void createRecursiveRelPropertyScanPlan(std::shared_ptr<NodeExpression> recursiveNode,
void createPathRelPropertyScanPlan(std::shared_ptr<NodeExpression> recursiveNode,
std::shared_ptr<NodeExpression> nbrNode, std::shared_ptr<RelExpression> recursiveRel,
ExtendDirection direction, LogicalPlan& plan);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ class BaseLogicalExtend : public LogicalOperator {
BaseLogicalExtend(LogicalOperatorType operatorType,
std::shared_ptr<binder::NodeExpression> boundNode,
std::shared_ptr<binder::NodeExpression> nbrNode, std::shared_ptr<binder::RelExpression> rel,
ExtendDirection direction, std::vector<std::shared_ptr<LogicalOperator>> children)
: LogicalOperator{operatorType, std::move(children)}, boundNode{std::move(boundNode)},
ExtendDirection direction, std::shared_ptr<LogicalOperator> child)
: LogicalOperator{operatorType, std::move(child)}, boundNode{std::move(boundNode)},
nbrNode{std::move(nbrNode)}, rel{std::move(rel)}, direction{direction} {}

inline std::shared_ptr<binder::NodeExpression> getBoundNode() const { return boundNode; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ enum class LogicalOperatorType : uint8_t {
LIMIT,
MULTIPLICITY_REDUCER,
ORDER_BY,
PATH_PROPERTY_PROBE,
PROJECTION,
RECURSIVE_EXTEND,
RENAME_TABLE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ class LogicalExtend : public BaseLogicalExtend {
ExtendDirection direction, binder::expression_vector properties, bool hasAtMostOneNbr,
std::shared_ptr<LogicalOperator> child)
: BaseLogicalExtend{LogicalOperatorType::EXTEND, std::move(boundNode), std::move(nbrNode),
std::move(rel), direction,
std::vector<std::shared_ptr<LogicalOperator>>{std::move(child)}},
std::move(rel), direction, std::move(child)},
properties{std::move(properties)}, hasAtMostOneNbr{hasAtMostOneNbr} {}

f_group_pos_set getGroupsPosToFlatten() override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "base_logical_extend.h"
#include "recursive_join_type.h"
#include "side_way_info_passing.h"

namespace kuzu {
namespace planner {
Expand All @@ -11,10 +12,9 @@ class LogicalRecursiveExtend : public BaseLogicalExtend {
LogicalRecursiveExtend(std::shared_ptr<binder::NodeExpression> boundNode,
std::shared_ptr<binder::NodeExpression> nbrNode, std::shared_ptr<binder::RelExpression> rel,
ExtendDirection direction, RecursiveJoinType joinType,
std::vector<std::shared_ptr<LogicalOperator>> children,
std::shared_ptr<LogicalOperator> recursiveChild)
std::shared_ptr<LogicalOperator> child, std::shared_ptr<LogicalOperator> recursiveChild)
: BaseLogicalExtend{LogicalOperatorType::RECURSIVE_EXTEND, std::move(boundNode),
std::move(nbrNode), std::move(rel), direction, std::move(children)},
std::move(nbrNode), std::move(rel), direction, std::move(child)},
joinType{joinType}, recursiveChild{std::move(recursiveChild)} {}

f_group_pos_set getGroupsPosToFlatten() override;
Expand All @@ -27,20 +27,44 @@ class LogicalRecursiveExtend : public BaseLogicalExtend {
inline std::shared_ptr<LogicalOperator> getRecursiveChild() const { return recursiveChild; }

inline std::unique_ptr<LogicalOperator> copy() override {
std::vector<std::shared_ptr<LogicalOperator>> copiedChildren;
copiedChildren.reserve(children.size());
for (auto& child : children) {
copiedChildren.push_back(child->copy());
}
return std::make_unique<LogicalRecursiveExtend>(boundNode, nbrNode, rel, direction,
joinType, std::move(copiedChildren), recursiveChild->copy());
joinType, children[0]->copy(), recursiveChild->copy());
}

private:
RecursiveJoinType joinType;
std::shared_ptr<LogicalOperator> recursiveChild;
};

class LogicalPathPropertyProbe : public LogicalOperator {
public:
LogicalPathPropertyProbe(std::shared_ptr<binder::RelExpression> recursiveRel,
std::shared_ptr<LogicalOperator> probeChild, std::shared_ptr<LogicalOperator> nodeChild,
std::shared_ptr<LogicalOperator> relChild)
: LogicalOperator{LogicalOperatorType::PATH_PROPERTY_PROBE,
std::vector<std::shared_ptr<LogicalOperator>>{
std::move(probeChild), std::move(nodeChild), std::move(relChild)}},
recursiveRel{std::move(recursiveRel)}, sip{SidewaysInfoPassing::NONE} {}

void computeFactorizedSchema() override { copyChildSchema(0); }
void computeFlatSchema() override { copyChildSchema(0); }

std::string getExpressionsForPrinting() const override { return recursiveRel->toString(); }

inline std::shared_ptr<binder::RelExpression> getRel() const { return recursiveRel; }
inline void setSIP(SidewaysInfoPassing sip_) { sip = sip_; }
inline SidewaysInfoPassing getSIP() const { return sip; }

inline std::unique_ptr<LogicalOperator> copy() override {
return std::make_unique<LogicalPathPropertyProbe>(
recursiveRel, children[0]->copy(), children[1]->copy(), children[2]->copy());
}

private:
std::shared_ptr<binder::RelExpression> recursiveRel;
SidewaysInfoPassing sip;
};

class LogicalScanFrontier : public LogicalOperator {
public:
LogicalScanFrontier(std::shared_ptr<binder::NodeExpression> node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,36 @@
namespace kuzu {
namespace planner {

enum class SemiMaskType : uint8_t {
NODE = 0,
PATH = 1,
};

class LogicalSemiMasker : public LogicalOperator {
public:
LogicalSemiMasker(std::shared_ptr<binder::NodeExpression> node,
std::vector<LogicalOperator*> ops, std::shared_ptr<LogicalOperator> child)
: LogicalOperator{LogicalOperatorType::SEMI_MASKER, std::move(child)},
node{std::move(node)}, ops{std::move(ops)} {}
LogicalSemiMasker(SemiMaskType type, std::shared_ptr<binder::Expression> key,
std::shared_ptr<binder::NodeExpression> node, std::vector<LogicalOperator*> ops,
std::shared_ptr<LogicalOperator> child)
: LogicalOperator{LogicalOperatorType::SEMI_MASKER, std::move(child)}, type{type},
key{std::move(key)}, node{std::move(node)}, ops{std::move(ops)} {}

inline void computeFactorizedSchema() override { copyChildSchema(0); }
inline void computeFlatSchema() override { copyChildSchema(0); }

inline std::string getExpressionsForPrinting() const override { return node->toString(); }

inline SemiMaskType getType() const { return type; }
inline std::shared_ptr<binder::Expression> getKey() const { return key; }
inline std::shared_ptr<binder::NodeExpression> getNode() const { return node; }
inline std::vector<LogicalOperator*> getOperators() const { return ops; }

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalSemiMasker>(node, ops, children[0]->copy());
throw common::RuntimeException("LogicalSemiMasker::copy() should not be called.");

Check warning on line 33 in src/include/planner/logical_plan/logical_operator/logical_semi_masker.h

View check run for this annotation

Codecov / codecov/patch

src/include/planner/logical_plan/logical_operator/logical_semi_masker.h#L33

Added line #L33 was not covered by tests
}

private:
SemiMaskType type;
std::shared_ptr<binder::Expression> key;
std::shared_ptr<binder::NodeExpression> node;
std::vector<LogicalOperator*> ops;
};
Expand Down
4 changes: 3 additions & 1 deletion src/include/processor/mapper/plan_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class PlanMapper {
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalRecursiveExtendToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalPathPropertyProbeToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalFlattenToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalFilterToPhysical(
Expand Down Expand Up @@ -126,7 +128,7 @@ class PlanMapper {
planner::Schema* outSchema, std::unique_ptr<PhysicalOperator> prevOperator,
const std::string& paramsString);

static void mapAccHashJoin(PhysicalOperator* probe);
static void mapSIPJoin(PhysicalOperator* probe);

static std::vector<DataPos> getExpressionsDataPos(
const binder::expression_vector& expressions, const planner::Schema& schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct RecursiveJoinVectors {
common::ValueVector* srcNodeIDVector = nullptr;
common::ValueVector* dstNodeIDVector = nullptr;
common::ValueVector* pathLengthVector = nullptr;
common::ValueVector* pathVector = nullptr; // STRUCT(LIST(STRUCT), LIST(INTERNAL_ID))
common::ValueVector* pathVector = nullptr; // STRUCT(LIST(NODE), LIST(REL))
common::ValueVector* pathNodesVector = nullptr; // LIST(STRUCT)
common::ValueVector* pathNodesIDDataVector = nullptr; // INTERNAL_ID
common::ValueVector* pathRelsVector = nullptr; // LIST(STRUCT)
Expand Down
118 changes: 92 additions & 26 deletions src/include/processor/operator/semi_masker.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,59 +6,125 @@
namespace kuzu {
namespace processor {

// Multiple maskers can point to the same SemiMask, thus we associate each masker with an idx
// to indicate the execution sequence of its pipeline. Also, the maskerIdx is used as a flag to
// indicate if a value in the mask is masked or not, as each masker will increment the selected
// value in the mask by 1. More details are described in NodeTableSemiMask.
using mask_with_idx = std::pair<NodeSemiMask*, uint8_t>;
class BaseSemiMasker;

class SemiMaskerInfo {
friend class BaseSemiMasker;

public:
// Multiple maskers can point to the same SemiMask, thus we associate each masker with an idx
// to indicate the execution sequence of its pipeline. Also, the maskerIdx is used as a flag to
// indicate if a value in the mask is masked or not, as each masker will increment the selected
// value in the mask by 1. More details are described in NodeTableSemiMask.
using mask_with_idx = std::pair<NodeSemiMask*, uint8_t>;

SemiMaskerInfo(const DataPos& keyPos,
std::unordered_map<common::table_id_t, std::vector<mask_with_idx>> masksPerTable)
: keyPos{keyPos}, masksPerTable{std::move(masksPerTable)} {}
SemiMaskerInfo(const SemiMaskerInfo& other)
: keyPos{other.keyPos}, masksPerTable{other.masksPerTable} {}

inline const std::vector<mask_with_idx>& getSingleTableMasks() const {
assert(masksPerTable.size() == 1);
return masksPerTable.begin()->second;
}

inline const std::vector<mask_with_idx>& getTableMasks(common::table_id_t tableID) const {
assert(masksPerTable.contains(tableID));
return masksPerTable.at(tableID);
}

inline std::unique_ptr<SemiMaskerInfo> copy() const {
return std::make_unique<SemiMaskerInfo>(*this);
}

private:
DataPos keyPos;
std::unordered_map<common::table_id_t, std::vector<mask_with_idx>> masksPerTable;
};

class BaseSemiMasker : public PhysicalOperator {
protected:
BaseSemiMasker(const DataPos& keyDataPos,
std::unordered_map<common::table_id_t, std::vector<mask_with_idx>> masksPerTable,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
BaseSemiMasker(std::unique_ptr<SemiMaskerInfo> info, std::unique_ptr<PhysicalOperator> child,
uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::SEMI_MASKER, std::move(child), id, paramsString},
keyDataPos{keyDataPos}, masksPerTable{std::move(masksPerTable)} {}
info{std::move(info)} {}

void initGlobalStateInternal(ExecutionContext* context) override;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

protected:
DataPos keyDataPos;
std::unordered_map<common::table_id_t, std::vector<mask_with_idx>> masksPerTable;
std::shared_ptr<common::ValueVector> keyValueVector;
std::unique_ptr<SemiMaskerInfo> info;
common::ValueVector* keyVector;
};

class SingleTableSemiMasker : public BaseSemiMasker {
public:
SingleTableSemiMasker(const DataPos& keyDataPos,
std::unordered_map<common::table_id_t, std::vector<mask_with_idx>> masksPerTable,
SingleTableSemiMasker(std::unique_ptr<SemiMaskerInfo> info,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: BaseSemiMasker{keyDataPos, std::move(masksPerTable), std::move(child), id, paramsString} {
}
: BaseSemiMasker{std::move(info), std::move(child), id, paramsString} {}

bool getNextTuplesInternal(ExecutionContext* context) override;
bool getNextTuplesInternal(ExecutionContext* context) final;

inline std::unique_ptr<PhysicalOperator> clone() override {
inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<SingleTableSemiMasker>(
keyDataPos, masksPerTable, children[0]->clone(), id, paramsString);
info->copy(), children[0]->clone(), id, paramsString);
}
};

class MultiTableSemiMasker : public BaseSemiMasker {
public:
MultiTableSemiMasker(const DataPos& keyDataPos,
std::unordered_map<common::table_id_t, std::vector<mask_with_idx>> masksPerTable,
MultiTableSemiMasker(std::unique_ptr<SemiMaskerInfo> info,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: BaseSemiMasker{keyDataPos, std::move(masksPerTable), std::move(child), id, paramsString} {
}
: BaseSemiMasker{std::move(info), std::move(child), id, paramsString} {}

bool getNextTuplesInternal(ExecutionContext* context) override;
bool getNextTuplesInternal(ExecutionContext* context) final;

inline std::unique_ptr<PhysicalOperator> clone() override {
inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<MultiTableSemiMasker>(
keyDataPos, masksPerTable, children[0]->clone(), id, paramsString);
info->copy(), children[0]->clone(), id, paramsString);
}
};

class PathSemiMasker : public BaseSemiMasker {
protected:
PathSemiMasker(std::unique_ptr<SemiMaskerInfo> info, std::unique_ptr<PhysicalOperator> child,
uint32_t id, const std::string& paramsString)
: BaseSemiMasker{std::move(info), std::move(child), id, paramsString} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

protected:
common::ValueVector* pathNodesVector;
common::ValueVector* pathNodesIDDataVector;
};

class PathSingleTableSemiMasker : public PathSemiMasker {
public:
PathSingleTableSemiMasker(std::unique_ptr<SemiMaskerInfo> info,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: PathSemiMasker{std::move(info), std::move(child), id, paramsString} {}

bool getNextTuplesInternal(ExecutionContext* context) final;

inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<PathSingleTableSemiMasker>(
info->copy(), children[0]->clone(), id, paramsString);
}
};

class PathMultipleTableSemiMasker : public PathSemiMasker {
public:
PathMultipleTableSemiMasker(std::unique_ptr<SemiMaskerInfo> info,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: PathSemiMasker{std::move(info), std::move(child), id, paramsString} {}

bool getNextTuplesInternal(ExecutionContext* context) final;

inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<PathMultipleTableSemiMasker>(
info->copy(), children[0]->clone(), id, paramsString);
}
};

Expand Down
Loading

0 comments on commit 8d12790

Please sign in to comment.