Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use semi mask when scanning properties for path #1709

Merged
merged 1 commit into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ struct PlannerKnobs {
static constexpr uint64_t BUILD_PENALTY = 2;
// Avoid doing probe to build SIP if we have to accumulate a probe side that is much bigger than
// build side. Also avoid doing build to probe SIP if probe side is not much bigger than build.
static constexpr uint64_t ACC_HJ_PROBE_BUILD_RATIO = 5;
static constexpr uint64_t SIP_RATIO = 5;
};

struct ClientContextConstants {
Expand Down
10 changes: 8 additions & 2 deletions src/include/optimizer/acc_hash_join_optimizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class HashJoinSIPOptimizer : public LogicalOperatorVisitor {

void visitIntersect(planner::LogicalOperator* op) override;

void visitPathPropertyProbe(planner::LogicalOperator* op) override;

bool isProbeSideQualified(planner::LogicalOperator* probeRoot);

std::vector<planner::LogicalOperator*> resolveOperatorsToApplySemiMask(
Expand All @@ -35,8 +37,12 @@ class HashJoinSIPOptimizer : public LogicalOperatorVisitor {
std::vector<planner::LogicalOperator*> resolveShortestPathExtendToApplySemiMask(
const binder::Expression& nodeID, planner::LogicalOperator* root);

std::shared_ptr<planner::LogicalOperator> appendSemiMask(
std::vector<planner::LogicalOperator*> ops,
std::shared_ptr<planner::LogicalOperator> appendNodeSemiMasker(
std::vector<planner::LogicalOperator*> opsToApplySemiMask,
std::shared_ptr<planner::LogicalOperator> child);
std::shared_ptr<planner::LogicalOperator> appendPathSemiMasker(
std::shared_ptr<binder::Expression> pathExpression,
std::vector<planner::LogicalOperator*> opsToApplySemiMask,
std::shared_ptr<planner::LogicalOperator> child);
std::shared_ptr<planner::LogicalOperator> appendAccumulate(
std::shared_ptr<planner::LogicalOperator> child);
Expand Down
6 changes: 6 additions & 0 deletions src/include/optimizer/logical_operator_visitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ class LogicalOperatorVisitor {
return op;
}

virtual void visitPathPropertyProbe(planner::LogicalOperator* op) {}
virtual std::shared_ptr<planner::LogicalOperator> visitPathPropertyProbeReplace(
std::shared_ptr<planner::LogicalOperator> op) {
return op;
}

virtual void visitHashJoin(planner::LogicalOperator* op) {}
virtual std::shared_ptr<planner::LogicalOperator> visitHashJoinReplace(
std::shared_ptr<planner::LogicalOperator> op) {
Expand Down
2 changes: 1 addition & 1 deletion src/include/optimizer/projection_push_down_optimizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class ProjectionPushDownOptimizer : public LogicalOperatorVisitor {
private:
void visitOperator(planner::LogicalOperator* op);

void visitRecursiveExtend(planner::LogicalOperator* op) override;
void visitPathPropertyProbe(planner::LogicalOperator* op) override;
void visitExtend(planner::LogicalOperator* op) override;
void visitAccumulate(planner::LogicalOperator* op) override;
void visitFilter(planner::LogicalOperator* op) override;
Expand Down
4 changes: 2 additions & 2 deletions src/include/planner/join_order_enumerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ class JoinOrderEnumerator {
void createRecursivePlan(std::shared_ptr<NodeExpression> boundNode,
std::shared_ptr<NodeExpression> recursiveNode, std::shared_ptr<RelExpression> recursiveRel,
ExtendDirection direction, LogicalPlan& plan);
void createRecursiveNodePropertyScanPlan(
void createPathNodePropertyScanPlan(
std::shared_ptr<NodeExpression> recursiveNode, LogicalPlan& plan);
void createRecursiveRelPropertyScanPlan(std::shared_ptr<NodeExpression> recursiveNode,
void createPathRelPropertyScanPlan(std::shared_ptr<NodeExpression> recursiveNode,
std::shared_ptr<NodeExpression> nbrNode, std::shared_ptr<RelExpression> recursiveRel,
ExtendDirection direction, LogicalPlan& plan);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ class BaseLogicalExtend : public LogicalOperator {
BaseLogicalExtend(LogicalOperatorType operatorType,
std::shared_ptr<binder::NodeExpression> boundNode,
std::shared_ptr<binder::NodeExpression> nbrNode, std::shared_ptr<binder::RelExpression> rel,
ExtendDirection direction, std::vector<std::shared_ptr<LogicalOperator>> children)
: LogicalOperator{operatorType, std::move(children)}, boundNode{std::move(boundNode)},
ExtendDirection direction, std::shared_ptr<LogicalOperator> child)
: LogicalOperator{operatorType, std::move(child)}, boundNode{std::move(boundNode)},
nbrNode{std::move(nbrNode)}, rel{std::move(rel)}, direction{direction} {}

inline std::shared_ptr<binder::NodeExpression> getBoundNode() const { return boundNode; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ enum class LogicalOperatorType : uint8_t {
LIMIT,
MULTIPLICITY_REDUCER,
ORDER_BY,
PATH_PROPERTY_PROBE,
PROJECTION,
RECURSIVE_EXTEND,
RENAME_TABLE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ class LogicalExtend : public BaseLogicalExtend {
ExtendDirection direction, binder::expression_vector properties, bool hasAtMostOneNbr,
std::shared_ptr<LogicalOperator> child)
: BaseLogicalExtend{LogicalOperatorType::EXTEND, std::move(boundNode), std::move(nbrNode),
std::move(rel), direction,
std::vector<std::shared_ptr<LogicalOperator>>{std::move(child)}},
std::move(rel), direction, std::move(child)},
properties{std::move(properties)}, hasAtMostOneNbr{hasAtMostOneNbr} {}

f_group_pos_set getGroupsPosToFlatten() override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "base_logical_extend.h"
#include "recursive_join_type.h"
#include "side_way_info_passing.h"

namespace kuzu {
namespace planner {
Expand All @@ -11,10 +12,9 @@ class LogicalRecursiveExtend : public BaseLogicalExtend {
LogicalRecursiveExtend(std::shared_ptr<binder::NodeExpression> boundNode,
std::shared_ptr<binder::NodeExpression> nbrNode, std::shared_ptr<binder::RelExpression> rel,
ExtendDirection direction, RecursiveJoinType joinType,
std::vector<std::shared_ptr<LogicalOperator>> children,
std::shared_ptr<LogicalOperator> recursiveChild)
std::shared_ptr<LogicalOperator> child, std::shared_ptr<LogicalOperator> recursiveChild)
: BaseLogicalExtend{LogicalOperatorType::RECURSIVE_EXTEND, std::move(boundNode),
std::move(nbrNode), std::move(rel), direction, std::move(children)},
std::move(nbrNode), std::move(rel), direction, std::move(child)},
joinType{joinType}, recursiveChild{std::move(recursiveChild)} {}

f_group_pos_set getGroupsPosToFlatten() override;
Expand All @@ -27,20 +27,44 @@ class LogicalRecursiveExtend : public BaseLogicalExtend {
inline std::shared_ptr<LogicalOperator> getRecursiveChild() const { return recursiveChild; }

inline std::unique_ptr<LogicalOperator> copy() override {
std::vector<std::shared_ptr<LogicalOperator>> copiedChildren;
copiedChildren.reserve(children.size());
for (auto& child : children) {
copiedChildren.push_back(child->copy());
}
return std::make_unique<LogicalRecursiveExtend>(boundNode, nbrNode, rel, direction,
joinType, std::move(copiedChildren), recursiveChild->copy());
joinType, children[0]->copy(), recursiveChild->copy());
}

private:
RecursiveJoinType joinType;
std::shared_ptr<LogicalOperator> recursiveChild;
};

class LogicalPathPropertyProbe : public LogicalOperator {
public:
LogicalPathPropertyProbe(std::shared_ptr<binder::RelExpression> recursiveRel,
std::shared_ptr<LogicalOperator> probeChild, std::shared_ptr<LogicalOperator> nodeChild,
std::shared_ptr<LogicalOperator> relChild)
: LogicalOperator{LogicalOperatorType::PATH_PROPERTY_PROBE,
std::vector<std::shared_ptr<LogicalOperator>>{
std::move(probeChild), std::move(nodeChild), std::move(relChild)}},
recursiveRel{std::move(recursiveRel)}, sip{SidewaysInfoPassing::NONE} {}

void computeFactorizedSchema() override { copyChildSchema(0); }
void computeFlatSchema() override { copyChildSchema(0); }

std::string getExpressionsForPrinting() const override { return recursiveRel->toString(); }

inline std::shared_ptr<binder::RelExpression> getRel() const { return recursiveRel; }
inline void setSIP(SidewaysInfoPassing sip_) { sip = sip_; }
inline SidewaysInfoPassing getSIP() const { return sip; }

inline std::unique_ptr<LogicalOperator> copy() override {
return std::make_unique<LogicalPathPropertyProbe>(
recursiveRel, children[0]->copy(), children[1]->copy(), children[2]->copy());
}

private:
std::shared_ptr<binder::RelExpression> recursiveRel;
SidewaysInfoPassing sip;
};

class LogicalScanFrontier : public LogicalOperator {
public:
LogicalScanFrontier(std::shared_ptr<binder::NodeExpression> node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,36 @@
namespace kuzu {
namespace planner {

enum class SemiMaskType : uint8_t {
NODE = 0,
PATH = 1,
};

class LogicalSemiMasker : public LogicalOperator {
public:
LogicalSemiMasker(std::shared_ptr<binder::NodeExpression> node,
std::vector<LogicalOperator*> ops, std::shared_ptr<LogicalOperator> child)
: LogicalOperator{LogicalOperatorType::SEMI_MASKER, std::move(child)},
node{std::move(node)}, ops{std::move(ops)} {}
LogicalSemiMasker(SemiMaskType type, std::shared_ptr<binder::Expression> key,
std::shared_ptr<binder::NodeExpression> node, std::vector<LogicalOperator*> ops,
std::shared_ptr<LogicalOperator> child)
: LogicalOperator{LogicalOperatorType::SEMI_MASKER, std::move(child)}, type{type},
key{std::move(key)}, node{std::move(node)}, ops{std::move(ops)} {}

inline void computeFactorizedSchema() override { copyChildSchema(0); }
inline void computeFlatSchema() override { copyChildSchema(0); }

inline std::string getExpressionsForPrinting() const override { return node->toString(); }

inline SemiMaskType getType() const { return type; }
inline std::shared_ptr<binder::Expression> getKey() const { return key; }
inline std::shared_ptr<binder::NodeExpression> getNode() const { return node; }
inline std::vector<LogicalOperator*> getOperators() const { return ops; }

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalSemiMasker>(node, ops, children[0]->copy());
throw common::RuntimeException("LogicalSemiMasker::copy() should not be called.");

Check warning on line 33 in src/include/planner/logical_plan/logical_operator/logical_semi_masker.h

View check run for this annotation

Codecov / codecov/patch

src/include/planner/logical_plan/logical_operator/logical_semi_masker.h#L33

Added line #L33 was not covered by tests
}

private:
SemiMaskType type;
std::shared_ptr<binder::Expression> key;
std::shared_ptr<binder::NodeExpression> node;
std::vector<LogicalOperator*> ops;
};
Expand Down
4 changes: 3 additions & 1 deletion src/include/processor/mapper/plan_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class PlanMapper {
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalRecursiveExtendToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalPathPropertyProbeToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalFlattenToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalFilterToPhysical(
Expand Down Expand Up @@ -126,7 +128,7 @@ class PlanMapper {
planner::Schema* outSchema, std::unique_ptr<PhysicalOperator> prevOperator,
const std::string& paramsString);

static void mapAccHashJoin(PhysicalOperator* probe);
static void mapSIPJoin(PhysicalOperator* probe);

static std::vector<DataPos> getExpressionsDataPos(
const binder::expression_vector& expressions, const planner::Schema& schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct RecursiveJoinVectors {
common::ValueVector* srcNodeIDVector = nullptr;
common::ValueVector* dstNodeIDVector = nullptr;
common::ValueVector* pathLengthVector = nullptr;
common::ValueVector* pathVector = nullptr; // STRUCT(LIST(STRUCT), LIST(INTERNAL_ID))
common::ValueVector* pathVector = nullptr; // STRUCT(LIST(NODE), LIST(REL))
common::ValueVector* pathNodesVector = nullptr; // LIST(STRUCT)
common::ValueVector* pathNodesIDDataVector = nullptr; // INTERNAL_ID
common::ValueVector* pathRelsVector = nullptr; // LIST(STRUCT)
Expand Down
118 changes: 92 additions & 26 deletions src/include/processor/operator/semi_masker.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,59 +6,125 @@
namespace kuzu {
namespace processor {

// Multiple maskers can point to the same SemiMask, thus we associate each masker with an idx
// to indicate the execution sequence of its pipeline. Also, the maskerIdx is used as a flag to
// indicate if a value in the mask is masked or not, as each masker will increment the selected
// value in the mask by 1. More details are described in NodeTableSemiMask.
using mask_with_idx = std::pair<NodeSemiMask*, uint8_t>;
class BaseSemiMasker;

class SemiMaskerInfo {
friend class BaseSemiMasker;

public:
// Multiple maskers can point to the same SemiMask, thus we associate each masker with an idx
// to indicate the execution sequence of its pipeline. Also, the maskerIdx is used as a flag to
// indicate if a value in the mask is masked or not, as each masker will increment the selected
// value in the mask by 1. More details are described in NodeTableSemiMask.
using mask_with_idx = std::pair<NodeSemiMask*, uint8_t>;

SemiMaskerInfo(const DataPos& keyPos,
std::unordered_map<common::table_id_t, std::vector<mask_with_idx>> masksPerTable)
: keyPos{keyPos}, masksPerTable{std::move(masksPerTable)} {}
SemiMaskerInfo(const SemiMaskerInfo& other)
: keyPos{other.keyPos}, masksPerTable{other.masksPerTable} {}

inline const std::vector<mask_with_idx>& getSingleTableMasks() const {
assert(masksPerTable.size() == 1);
return masksPerTable.begin()->second;
}

inline const std::vector<mask_with_idx>& getTableMasks(common::table_id_t tableID) const {
assert(masksPerTable.contains(tableID));
return masksPerTable.at(tableID);
}

inline std::unique_ptr<SemiMaskerInfo> copy() const {
return std::make_unique<SemiMaskerInfo>(*this);
}

private:
DataPos keyPos;
std::unordered_map<common::table_id_t, std::vector<mask_with_idx>> masksPerTable;
};

class BaseSemiMasker : public PhysicalOperator {
protected:
BaseSemiMasker(const DataPos& keyDataPos,
std::unordered_map<common::table_id_t, std::vector<mask_with_idx>> masksPerTable,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
BaseSemiMasker(std::unique_ptr<SemiMaskerInfo> info, std::unique_ptr<PhysicalOperator> child,
uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::SEMI_MASKER, std::move(child), id, paramsString},
keyDataPos{keyDataPos}, masksPerTable{std::move(masksPerTable)} {}
info{std::move(info)} {}

void initGlobalStateInternal(ExecutionContext* context) override;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

protected:
DataPos keyDataPos;
std::unordered_map<common::table_id_t, std::vector<mask_with_idx>> masksPerTable;
std::shared_ptr<common::ValueVector> keyValueVector;
std::unique_ptr<SemiMaskerInfo> info;
common::ValueVector* keyVector;
};

class SingleTableSemiMasker : public BaseSemiMasker {
public:
SingleTableSemiMasker(const DataPos& keyDataPos,
std::unordered_map<common::table_id_t, std::vector<mask_with_idx>> masksPerTable,
SingleTableSemiMasker(std::unique_ptr<SemiMaskerInfo> info,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: BaseSemiMasker{keyDataPos, std::move(masksPerTable), std::move(child), id, paramsString} {
}
: BaseSemiMasker{std::move(info), std::move(child), id, paramsString} {}

bool getNextTuplesInternal(ExecutionContext* context) override;
bool getNextTuplesInternal(ExecutionContext* context) final;

inline std::unique_ptr<PhysicalOperator> clone() override {
inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<SingleTableSemiMasker>(
keyDataPos, masksPerTable, children[0]->clone(), id, paramsString);
info->copy(), children[0]->clone(), id, paramsString);
}
};

class MultiTableSemiMasker : public BaseSemiMasker {
public:
MultiTableSemiMasker(const DataPos& keyDataPos,
std::unordered_map<common::table_id_t, std::vector<mask_with_idx>> masksPerTable,
MultiTableSemiMasker(std::unique_ptr<SemiMaskerInfo> info,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: BaseSemiMasker{keyDataPos, std::move(masksPerTable), std::move(child), id, paramsString} {
}
: BaseSemiMasker{std::move(info), std::move(child), id, paramsString} {}

bool getNextTuplesInternal(ExecutionContext* context) override;
bool getNextTuplesInternal(ExecutionContext* context) final;

inline std::unique_ptr<PhysicalOperator> clone() override {
inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<MultiTableSemiMasker>(
keyDataPos, masksPerTable, children[0]->clone(), id, paramsString);
info->copy(), children[0]->clone(), id, paramsString);
}
};

class PathSemiMasker : public BaseSemiMasker {
protected:
PathSemiMasker(std::unique_ptr<SemiMaskerInfo> info, std::unique_ptr<PhysicalOperator> child,
uint32_t id, const std::string& paramsString)
: BaseSemiMasker{std::move(info), std::move(child), id, paramsString} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

protected:
common::ValueVector* pathNodesVector;
common::ValueVector* pathNodesIDDataVector;
};

class PathSingleTableSemiMasker : public PathSemiMasker {
public:
PathSingleTableSemiMasker(std::unique_ptr<SemiMaskerInfo> info,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: PathSemiMasker{std::move(info), std::move(child), id, paramsString} {}

bool getNextTuplesInternal(ExecutionContext* context) final;

inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<PathSingleTableSemiMasker>(
info->copy(), children[0]->clone(), id, paramsString);
}
};

class PathMultipleTableSemiMasker : public PathSemiMasker {
public:
PathMultipleTableSemiMasker(std::unique_ptr<SemiMaskerInfo> info,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: PathSemiMasker{std::move(info), std::move(child), id, paramsString} {}

bool getNextTuplesInternal(ExecutionContext* context) final;

inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<PathMultipleTableSemiMasker>(
info->copy(), children[0]->clone(), id, paramsString);
}
};

Expand Down
Loading
Loading