Skip to content

Commit

Permalink
Merge pull request #1325 from kuzudb/sink-projection-push-down
Browse files Browse the repository at this point in the history
Sink projection push down
  • Loading branch information
andyfengHKU committed Feb 28, 2023
2 parents 93dee2d + 70399a1 commit f33e003
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 26 deletions.
9 changes: 9 additions & 0 deletions src/include/optimizer/projection_push_down_optimizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,18 @@ class ProjectionPushDownOptimizer {
void visitIntersect(planner::LogicalOperator* op);
void visitProjection(planner::LogicalOperator* op);
void visitOrderBy(planner::LogicalOperator* op);
void visitUnwind(planner::LogicalOperator* op);
void visitSetNodeProperty(planner::LogicalOperator* op);
void visitSetRelProperty(planner::LogicalOperator* op);
void visitCreateNode(planner::LogicalOperator* op);
void visitCreateRel(planner::LogicalOperator* op);
void visitDeleteNode(planner::LogicalOperator* op);
void visitDeleteRel(planner::LogicalOperator* op);

void collectPropertiesInUse(std::shared_ptr<binder::Expression> expression);

binder::expression_vector pruneExpressions(const binder::expression_vector& expressions);

private:
binder::expression_set propertiesInUse;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ class LogicalAccumulate : public LogicalOperator {
return binder::ExpressionUtil::toString(expressions);
}

inline void setExpressions(binder::expression_vector expressions_) {
expressions = std::move(expressions_);
}
inline binder::expression_vector getExpressions() const { return expressions; }
inline Schema* getSchemaBeforeSink() const { return children[0]->getSchema(); }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,8 @@ class LogicalHashJoin : public LogicalOperator {
return binder::ExpressionUtil::toString(joinNodeIDs);
}

inline void setExpressionsToMaterialize(binder::expression_set expressions) {
expressionsToMaterialize.clear();
for (auto& expression : expressions) {
expressionsToMaterialize.push_back(expression);
}
inline void setExpressionsToMaterialize(binder::expression_vector expressions) {
expressionsToMaterialize = std::move(expressions);
}
inline binder::expression_vector getExpressionsToMaterialize() const {
return expressionsToMaterialize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ struct LogicalIntersectBuildInfo {
std::shared_ptr<binder::Expression> keyNodeID, binder::expression_vector expressions)
: keyNodeID{std::move(keyNodeID)}, expressionsToMaterialize{std::move(expressions)} {}

inline void setExpressionsToMaterialize(binder::expression_vector expressions) {
expressionsToMaterialize = std::move(expressions);
}
inline binder::expression_vector getExpressionsToMaterialize() const {
return expressionsToMaterialize;
}

inline std::unique_ptr<LogicalIntersectBuildInfo> copy() {
return make_unique<LogicalIntersectBuildInfo>(keyNodeID, expressionsToMaterialize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class LogicalOrderBy : public LogicalOperator {
}
inline std::vector<bool> getIsAscOrders() const { return isAscOrders; }
inline Schema* getSchemaBeforeOrderBy() const { return children[0]->getSchema(); }
inline void setExpressionsToMaterialize(binder::expression_vector expressions) {
expressionsToMaterialize = std::move(expressions);
}
inline binder::expression_vector getExpressionsToMaterialize() const {
return expressionsToMaterialize;
}
Expand Down
155 changes: 140 additions & 15 deletions src/optimizer/projection_push_down_optimizer.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
#include "optimizer/projection_push_down_optimizer.h"

#include "planner/logical_plan/logical_operator/logical_accumulate.h"
#include "planner/logical_plan/logical_operator/logical_create.h"
#include "planner/logical_plan/logical_operator/logical_delete.h"
#include "planner/logical_plan/logical_operator/logical_filter.h"
#include "planner/logical_plan/logical_operator/logical_hash_join.h"
#include "planner/logical_plan/logical_operator/logical_intersect.h"
#include "planner/logical_plan/logical_operator/logical_order_by.h"
#include "planner/logical_plan/logical_operator/logical_projection.h"
#include "planner/logical_plan/logical_operator/logical_set.h"
#include "planner/logical_plan/logical_operator/logical_unwind.h"

using namespace kuzu::common;
using namespace kuzu::planner;
Expand Down Expand Up @@ -39,6 +43,27 @@ void ProjectionPushDownOptimizer::visitOperator(LogicalOperator* op) {
case LogicalOperatorType::ORDER_BY: {
visitOrderBy(op);
} break;
case LogicalOperatorType::UNWIND: {
visitUnwind(op);
} break;
case LogicalOperatorType::CREATE_NODE: {
visitCreateNode(op);
} break;
case LogicalOperatorType::CREATE_REL: {
visitCreateRel(op);
} break;
case LogicalOperatorType::DELETE_NODE: {
visitDeleteNode(op);
} break;
case LogicalOperatorType::DELETE_REL: {
visitDeleteRel(op);
} break;
case LogicalOperatorType::SET_NODE_PROPERTY: {
visitSetNodeProperty(op);
} break;
case LogicalOperatorType::SET_REL_PROPERTY: {
visitSetRelProperty(op);
} break;
default:
break;
}
Expand All @@ -49,9 +74,15 @@ void ProjectionPushDownOptimizer::visitOperator(LogicalOperator* op) {

void ProjectionPushDownOptimizer::visitAccumulate(planner::LogicalOperator* op) {
auto accumulate = (LogicalAccumulate*)op;
for (auto& expression : accumulate->getExpressions()) {
collectPropertiesInUse(expression);
auto expressionsBeforePruning = accumulate->getExpressions();
auto expressionsAfterPruning = pruneExpressions(expressionsBeforePruning);
if (expressionsBeforePruning.size() == expressionsAfterPruning.size()) {
return;
}
accumulate->setExpressions(expressionsAfterPruning);
auto projection = std::make_shared<LogicalProjection>(
std::move(expressionsAfterPruning), accumulate->getChild(0));
accumulate->setChild(0, std::move(projection));
}

void ProjectionPushDownOptimizer::visitFilter(planner::LogicalOperator* op) {
Expand All @@ -68,22 +99,14 @@ void ProjectionPushDownOptimizer::visitHashJoin(planner::LogicalOperator* op) {
return;
}
auto expressionsBeforePruning = hashJoin->getExpressionsToMaterialize();
expression_set expressionsAfterPruning;
for (auto& expression : expressionsBeforePruning) {
if (expression->expressionType != common::PROPERTY ||
propertiesInUse.contains(expression)) {
expressionsAfterPruning.insert(expression);
}
}
auto expressionsAfterPruning = pruneExpressions(expressionsBeforePruning);
if (expressionsBeforePruning.size() == expressionsAfterPruning.size()) {
// TODO(Xiyang): replace this with a separate optimizer.
return;
}
hashJoin->setExpressionsToMaterialize(expressionsAfterPruning);
auto projectionExpressions =
expression_vector{expressionsAfterPruning.begin(), expressionsAfterPruning.end()};
auto projection = std::make_shared<LogicalProjection>(
std::move(projectionExpressions), hashJoin->getChild(1));
std::move(expressionsAfterPruning), hashJoin->getChild(1));
hashJoin->setChild(1, std::move(projection));
}

Expand All @@ -93,9 +116,31 @@ void ProjectionPushDownOptimizer::visitIntersect(planner::LogicalOperator* op) {
for (auto i = 0u; i < intersect->getNumBuilds(); ++i) {
auto buildInfo = intersect->getBuildInfo(i);
collectPropertiesInUse(buildInfo->keyNodeID);
// Note: we have a potential bug under intersect.cpp. The following code ensures build key
// and intersect key always appear as the first and second column. Should be removed once
// the bug is fixed.
expression_vector expressionsBeforePruning;
expression_vector expressionsAfterPruning;
for (auto& expression : buildInfo->expressionsToMaterialize) {
collectPropertiesInUse(expression);
if (expression->getUniqueName() == intersect->getIntersectNodeID()->getUniqueName() ||
expression->getUniqueName() == buildInfo->keyNodeID->getUniqueName()) {
continue;
}
expressionsBeforePruning.push_back(expression);
}
expressionsAfterPruning.push_back(buildInfo->keyNodeID);
expressionsAfterPruning.push_back(intersect->getIntersectNodeID());
for (auto& expression : pruneExpressions(expressionsBeforePruning)) {
expressionsAfterPruning.push_back(expression);
}
if (expressionsBeforePruning.size() == expressionsAfterPruning.size()) {
return;
}
buildInfo->setExpressionsToMaterialize(expressionsAfterPruning);
auto childIdx = i + 1; // skip probe
auto projection = std::make_shared<LogicalProjection>(
std::move(expressionsAfterPruning), intersect->getChild(childIdx));
intersect->setChild(childIdx, std::move(projection));
}
}

Expand All @@ -115,8 +160,76 @@ void ProjectionPushDownOptimizer::visitOrderBy(planner::LogicalOperator* op) {
for (auto& expression : orderBy->getExpressionsToOrderBy()) {
collectPropertiesInUse(expression);
}
for (auto& expression : orderBy->getExpressionsToMaterialize()) {
collectPropertiesInUse(expression);
auto expressionsBeforePruning = orderBy->getExpressionsToMaterialize();
auto expressionsAfterPruning = pruneExpressions(expressionsBeforePruning);
if (expressionsBeforePruning.size() == expressionsAfterPruning.size()) {
return;
}
orderBy->setExpressionsToMaterialize(expressionsAfterPruning);
auto projection = std::make_shared<LogicalProjection>(
std::move(expressionsAfterPruning), orderBy->getChild(0));
orderBy->setChild(0, std::move(projection));
}

void ProjectionPushDownOptimizer::visitUnwind(planner::LogicalOperator* op) {
auto unwind = (LogicalUnwind*)op;
collectPropertiesInUse(unwind->getExpression());
}

void ProjectionPushDownOptimizer::visitCreateNode(planner::LogicalOperator* op) {
auto createNode = (LogicalCreateNode*)op;
for (auto i = 0u; i < createNode->getNumNodes(); ++i) {
collectPropertiesInUse(createNode->getPrimaryKey(i));
}
}

void ProjectionPushDownOptimizer::visitCreateRel(planner::LogicalOperator* op) {
auto createRel = (LogicalCreateRel*)op;
for (auto i = 0; i < createRel->getNumRels(); ++i) {
auto rel = createRel->getRel(i);
collectPropertiesInUse(rel->getSrcNode()->getInternalIDProperty());
collectPropertiesInUse(rel->getDstNode()->getInternalIDProperty());
collectPropertiesInUse(rel->getInternalIDProperty());
for (auto& setItem : createRel->getSetItems(i)) {
collectPropertiesInUse(setItem.second);
}
}
}

void ProjectionPushDownOptimizer::visitDeleteNode(planner::LogicalOperator* op) {
auto deleteNode = (LogicalDeleteNode*)op;
for (auto i = 0u; i < deleteNode->getNumNodes(); ++i) {
collectPropertiesInUse(deleteNode->getNode(i)->getInternalIDProperty());
collectPropertiesInUse(deleteNode->getPrimaryKey(i));
}
}

void ProjectionPushDownOptimizer::visitDeleteRel(planner::LogicalOperator* op) {
auto deleteRel = (LogicalDeleteRel*)op;
for (auto i = 0; i < deleteRel->getNumRels(); ++i) {
auto rel = deleteRel->getRel(i);
collectPropertiesInUse(rel->getSrcNode()->getInternalIDProperty());
collectPropertiesInUse(rel->getDstNode()->getInternalIDProperty());
collectPropertiesInUse(rel->getInternalIDProperty());
}
}

void ProjectionPushDownOptimizer::visitSetNodeProperty(planner::LogicalOperator* op) {
auto setNodeProperty = (LogicalSetNodeProperty*)op;
for (auto i = 0u; i < setNodeProperty->getNumNodes(); ++i) {
collectPropertiesInUse(setNodeProperty->getNode(i)->getInternalIDProperty());
collectPropertiesInUse(setNodeProperty->getSetItem(i).second);
}
}

void ProjectionPushDownOptimizer::visitSetRelProperty(planner::LogicalOperator* op) {
auto setRelProperty = (LogicalSetRelProperty*)op;
for (auto i = 0; i < setRelProperty->getNumRels(); ++i) {
auto rel = setRelProperty->getRel(i);
collectPropertiesInUse(rel->getSrcNode()->getInternalIDProperty());
collectPropertiesInUse(rel->getDstNode()->getInternalIDProperty());
collectPropertiesInUse(rel->getInternalIDProperty());
collectPropertiesInUse(setRelProperty->getSetItem(i).second);
}
}

Expand All @@ -131,5 +244,17 @@ void ProjectionPushDownOptimizer::collectPropertiesInUse(
}
}

binder::expression_vector ProjectionPushDownOptimizer::pruneExpressions(
const binder::expression_vector& expressions) {
expression_set expressionsAfterPruning;
for (auto& expression : expressions) {
if (expression->expressionType != common::PROPERTY ||
propertiesInUse.contains(expression)) {
expressionsAfterPruning.insert(expression);
}
}
return expression_vector{expressionsAfterPruning.begin(), expressionsAfterPruning.end()};
}

} // namespace optimizer
} // namespace kuzu
17 changes: 12 additions & 5 deletions src/processor/mapper/map_hash_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ static FactorizedTableScan* getTableScanForAccHashJoin(HashJoinProbe* hashJoinPr
return (FactorizedTableScan*)op;
}

static SemiMasker* getSemiMasker(FactorizedTableScan* tableScan) {
auto op = (PhysicalOperator*)tableScan;
// Search on current pipeline.
while (
op->getNumChildren() == 1 && op->getOperatorType() != PhysicalOperatorType::SEMI_MASKER) {
op = op->getChild(0);
}
assert(op->getOperatorType() == PhysicalOperatorType::SEMI_MASKER);
return (SemiMasker*)op;
}

static void constructAccPipeline(FactorizedTableScan* tableScan, HashJoinProbe* hashJoinProbe) {
auto resultCollector = tableScan->moveUnaryChild();
hashJoinProbe->addChild(std::move(resultCollector));
Expand All @@ -59,11 +70,7 @@ static void mapASPJoin(Expression* joinNodeID, HashJoinProbe* hashJoinProbe) {
assert(scanNodeIDCandidates.size() == 1);
// set semi masker
auto tableScan = getTableScanForAccHashJoin(hashJoinProbe);
// TODO(Xiyang): `tableScan->getChild(0)->getChild(0)`. This is not a good practice, can we
// change this to a more meaningful way?
assert(tableScan->getChild(0)->getChild(0)->getOperatorType() ==
PhysicalOperatorType::SEMI_MASKER);
auto semiMasker = (SemiMasker*)tableScan->getChild(0)->getChild(0);
auto semiMasker = getSemiMasker(tableScan);
auto sharedState = scanNodeIDCandidates[0]->getSharedState();
assert(sharedState->getNumTableStates() == 1);
semiMasker->setSharedState(sharedState->getTableState(0));
Expand Down
3 changes: 3 additions & 0 deletions src/processor/operator/intersect/intersect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ void Intersect::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* c
for (auto i = 0u; i < dataInfo.payloadsDataPos.size(); i++) {
auto vector = resultSet->getValueVector(dataInfo.payloadsDataPos[i]);
// Always skip the first two columns in the fTable: build key and intersect key.
// TODO(Guodong): this is a potential bug because you cannot guarantee intersect key is
// the second column. Once this is solved, go back and refactor projection push down for
// intersect.
columnIdxesToScanFrom.push_back(i + 2);
vectorsToReadInto.push_back(vector);
}
Expand Down
1 change: 0 additions & 1 deletion test/test_files/tinysnb/match/node.test
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@
-QUERY MATCH (a:organisation) RETURN COUNT(*)
---- 1
3

0 comments on commit f33e003

Please sign in to comment.