Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sink projection push down #1325

Merged
merged 1 commit into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/include/optimizer/projection_push_down_optimizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,18 @@ class ProjectionPushDownOptimizer {
void visitIntersect(planner::LogicalOperator* op);
void visitProjection(planner::LogicalOperator* op);
void visitOrderBy(planner::LogicalOperator* op);
void visitUnwind(planner::LogicalOperator* op);
void visitSetNodeProperty(planner::LogicalOperator* op);
void visitSetRelProperty(planner::LogicalOperator* op);
void visitCreateNode(planner::LogicalOperator* op);
void visitCreateRel(planner::LogicalOperator* op);
void visitDeleteNode(planner::LogicalOperator* op);
void visitDeleteRel(planner::LogicalOperator* op);

void collectPropertiesInUse(std::shared_ptr<binder::Expression> expression);

binder::expression_vector pruneExpressions(const binder::expression_vector& expressions);

private:
binder::expression_set propertiesInUse;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ class LogicalAccumulate : public LogicalOperator {
return binder::ExpressionUtil::toString(expressions);
}

inline void setExpressions(binder::expression_vector expressions_) {
expressions = std::move(expressions_);
}
inline binder::expression_vector getExpressions() const { return expressions; }
inline Schema* getSchemaBeforeSink() const { return children[0]->getSchema(); }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,8 @@ class LogicalHashJoin : public LogicalOperator {
return binder::ExpressionUtil::toString(joinNodeIDs);
}

inline void setExpressionsToMaterialize(binder::expression_set expressions) {
expressionsToMaterialize.clear();
for (auto& expression : expressions) {
expressionsToMaterialize.push_back(expression);
}
inline void setExpressionsToMaterialize(binder::expression_vector expressions) {
expressionsToMaterialize = std::move(expressions);
}
inline binder::expression_vector getExpressionsToMaterialize() const {
return expressionsToMaterialize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ struct LogicalIntersectBuildInfo {
std::shared_ptr<binder::Expression> keyNodeID, binder::expression_vector expressions)
: keyNodeID{std::move(keyNodeID)}, expressionsToMaterialize{std::move(expressions)} {}

inline void setExpressionsToMaterialize(binder::expression_vector expressions) {
expressionsToMaterialize = std::move(expressions);
}
inline binder::expression_vector getExpressionsToMaterialize() const {
return expressionsToMaterialize;
}

inline std::unique_ptr<LogicalIntersectBuildInfo> copy() {
return make_unique<LogicalIntersectBuildInfo>(keyNodeID, expressionsToMaterialize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class LogicalOrderBy : public LogicalOperator {
}
inline std::vector<bool> getIsAscOrders() const { return isAscOrders; }
inline Schema* getSchemaBeforeOrderBy() const { return children[0]->getSchema(); }
inline void setExpressionsToMaterialize(binder::expression_vector expressions) {
expressionsToMaterialize = std::move(expressions);
}
inline binder::expression_vector getExpressionsToMaterialize() const {
return expressionsToMaterialize;
}
Expand Down
155 changes: 140 additions & 15 deletions src/optimizer/projection_push_down_optimizer.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
#include "optimizer/projection_push_down_optimizer.h"

#include "planner/logical_plan/logical_operator/logical_accumulate.h"
#include "planner/logical_plan/logical_operator/logical_create.h"
#include "planner/logical_plan/logical_operator/logical_delete.h"
#include "planner/logical_plan/logical_operator/logical_filter.h"
#include "planner/logical_plan/logical_operator/logical_hash_join.h"
#include "planner/logical_plan/logical_operator/logical_intersect.h"
#include "planner/logical_plan/logical_operator/logical_order_by.h"
#include "planner/logical_plan/logical_operator/logical_projection.h"
#include "planner/logical_plan/logical_operator/logical_set.h"
#include "planner/logical_plan/logical_operator/logical_unwind.h"

using namespace kuzu::common;
using namespace kuzu::planner;
Expand Down Expand Up @@ -39,6 +43,27 @@ void ProjectionPushDownOptimizer::visitOperator(LogicalOperator* op) {
case LogicalOperatorType::ORDER_BY: {
visitOrderBy(op);
} break;
case LogicalOperatorType::UNWIND: {
visitUnwind(op);
} break;
case LogicalOperatorType::CREATE_NODE: {
visitCreateNode(op);
} break;
case LogicalOperatorType::CREATE_REL: {
visitCreateRel(op);
} break;
case LogicalOperatorType::DELETE_NODE: {
visitDeleteNode(op);
} break;
case LogicalOperatorType::DELETE_REL: {
visitDeleteRel(op);
} break;
case LogicalOperatorType::SET_NODE_PROPERTY: {
visitSetNodeProperty(op);
} break;
case LogicalOperatorType::SET_REL_PROPERTY: {
visitSetRelProperty(op);
} break;
default:
break;
}
Expand All @@ -49,9 +74,15 @@ void ProjectionPushDownOptimizer::visitOperator(LogicalOperator* op) {

void ProjectionPushDownOptimizer::visitAccumulate(planner::LogicalOperator* op) {
auto accumulate = (LogicalAccumulate*)op;
for (auto& expression : accumulate->getExpressions()) {
collectPropertiesInUse(expression);
auto expressionsBeforePruning = accumulate->getExpressions();
auto expressionsAfterPruning = pruneExpressions(expressionsBeforePruning);
if (expressionsBeforePruning.size() == expressionsAfterPruning.size()) {
return;
}
accumulate->setExpressions(expressionsAfterPruning);
auto projection = std::make_shared<LogicalProjection>(
std::move(expressionsAfterPruning), accumulate->getChild(0));
accumulate->setChild(0, std::move(projection));
}

void ProjectionPushDownOptimizer::visitFilter(planner::LogicalOperator* op) {
Expand All @@ -68,22 +99,14 @@ void ProjectionPushDownOptimizer::visitHashJoin(planner::LogicalOperator* op) {
return;
}
auto expressionsBeforePruning = hashJoin->getExpressionsToMaterialize();
expression_set expressionsAfterPruning;
for (auto& expression : expressionsBeforePruning) {
if (expression->expressionType != common::PROPERTY ||
propertiesInUse.contains(expression)) {
expressionsAfterPruning.insert(expression);
}
}
auto expressionsAfterPruning = pruneExpressions(expressionsBeforePruning);
if (expressionsBeforePruning.size() == expressionsAfterPruning.size()) {
// TODO(Xiyang): replace this with a separate optimizer.
return;
}
hashJoin->setExpressionsToMaterialize(expressionsAfterPruning);
auto projectionExpressions =
expression_vector{expressionsAfterPruning.begin(), expressionsAfterPruning.end()};
auto projection = std::make_shared<LogicalProjection>(
std::move(projectionExpressions), hashJoin->getChild(1));
std::move(expressionsAfterPruning), hashJoin->getChild(1));
hashJoin->setChild(1, std::move(projection));
}

Expand All @@ -93,9 +116,31 @@ void ProjectionPushDownOptimizer::visitIntersect(planner::LogicalOperator* op) {
for (auto i = 0u; i < intersect->getNumBuilds(); ++i) {
auto buildInfo = intersect->getBuildInfo(i);
collectPropertiesInUse(buildInfo->keyNodeID);
// Note: we have a potential bug under intersect.cpp. The following code ensures build key
// and intersect key always appear as the first and second column. Should be removed once
// the bug is fixed.
expression_vector expressionsBeforePruning;
expression_vector expressionsAfterPruning;
for (auto& expression : buildInfo->expressionsToMaterialize) {
collectPropertiesInUse(expression);
if (expression->getUniqueName() == intersect->getIntersectNodeID()->getUniqueName() ||
expression->getUniqueName() == buildInfo->keyNodeID->getUniqueName()) {
continue;
}
expressionsBeforePruning.push_back(expression);
}
expressionsAfterPruning.push_back(buildInfo->keyNodeID);
expressionsAfterPruning.push_back(intersect->getIntersectNodeID());
for (auto& expression : pruneExpressions(expressionsBeforePruning)) {
expressionsAfterPruning.push_back(expression);
}
if (expressionsBeforePruning.size() == expressionsAfterPruning.size()) {
return;
}
buildInfo->setExpressionsToMaterialize(expressionsAfterPruning);
auto childIdx = i + 1; // skip probe
auto projection = std::make_shared<LogicalProjection>(
std::move(expressionsAfterPruning), intersect->getChild(childIdx));
intersect->setChild(childIdx, std::move(projection));
}
}

Expand All @@ -115,8 +160,76 @@ void ProjectionPushDownOptimizer::visitOrderBy(planner::LogicalOperator* op) {
for (auto& expression : orderBy->getExpressionsToOrderBy()) {
collectPropertiesInUse(expression);
}
for (auto& expression : orderBy->getExpressionsToMaterialize()) {
collectPropertiesInUse(expression);
auto expressionsBeforePruning = orderBy->getExpressionsToMaterialize();
auto expressionsAfterPruning = pruneExpressions(expressionsBeforePruning);
if (expressionsBeforePruning.size() == expressionsAfterPruning.size()) {
return;
}
orderBy->setExpressionsToMaterialize(expressionsAfterPruning);
auto projection = std::make_shared<LogicalProjection>(
std::move(expressionsAfterPruning), orderBy->getChild(0));
orderBy->setChild(0, std::move(projection));
}

void ProjectionPushDownOptimizer::visitUnwind(planner::LogicalOperator* op) {
auto unwind = (LogicalUnwind*)op;
collectPropertiesInUse(unwind->getExpression());
}

void ProjectionPushDownOptimizer::visitCreateNode(planner::LogicalOperator* op) {
auto createNode = (LogicalCreateNode*)op;
for (auto i = 0u; i < createNode->getNumNodes(); ++i) {
collectPropertiesInUse(createNode->getPrimaryKey(i));
}
}

void ProjectionPushDownOptimizer::visitCreateRel(planner::LogicalOperator* op) {
auto createRel = (LogicalCreateRel*)op;
for (auto i = 0; i < createRel->getNumRels(); ++i) {
auto rel = createRel->getRel(i);
collectPropertiesInUse(rel->getSrcNode()->getInternalIDProperty());
collectPropertiesInUse(rel->getDstNode()->getInternalIDProperty());
collectPropertiesInUse(rel->getInternalIDProperty());
for (auto& setItem : createRel->getSetItems(i)) {
collectPropertiesInUse(setItem.second);
}
}
}

void ProjectionPushDownOptimizer::visitDeleteNode(planner::LogicalOperator* op) {
auto deleteNode = (LogicalDeleteNode*)op;
for (auto i = 0u; i < deleteNode->getNumNodes(); ++i) {
collectPropertiesInUse(deleteNode->getNode(i)->getInternalIDProperty());
collectPropertiesInUse(deleteNode->getPrimaryKey(i));
}
}

void ProjectionPushDownOptimizer::visitDeleteRel(planner::LogicalOperator* op) {
auto deleteRel = (LogicalDeleteRel*)op;
for (auto i = 0; i < deleteRel->getNumRels(); ++i) {
auto rel = deleteRel->getRel(i);
collectPropertiesInUse(rel->getSrcNode()->getInternalIDProperty());
collectPropertiesInUse(rel->getDstNode()->getInternalIDProperty());
collectPropertiesInUse(rel->getInternalIDProperty());
}
}

void ProjectionPushDownOptimizer::visitSetNodeProperty(planner::LogicalOperator* op) {
auto setNodeProperty = (LogicalSetNodeProperty*)op;
for (auto i = 0u; i < setNodeProperty->getNumNodes(); ++i) {
collectPropertiesInUse(setNodeProperty->getNode(i)->getInternalIDProperty());
collectPropertiesInUse(setNodeProperty->getSetItem(i).second);
}
}

void ProjectionPushDownOptimizer::visitSetRelProperty(planner::LogicalOperator* op) {
auto setRelProperty = (LogicalSetRelProperty*)op;
for (auto i = 0; i < setRelProperty->getNumRels(); ++i) {
auto rel = setRelProperty->getRel(i);
collectPropertiesInUse(rel->getSrcNode()->getInternalIDProperty());
collectPropertiesInUse(rel->getDstNode()->getInternalIDProperty());
collectPropertiesInUse(rel->getInternalIDProperty());
collectPropertiesInUse(setRelProperty->getSetItem(i).second);
}
}

Expand All @@ -131,5 +244,17 @@ void ProjectionPushDownOptimizer::collectPropertiesInUse(
}
}

binder::expression_vector ProjectionPushDownOptimizer::pruneExpressions(
const binder::expression_vector& expressions) {
expression_set expressionsAfterPruning;
for (auto& expression : expressions) {
if (expression->expressionType != common::PROPERTY ||
propertiesInUse.contains(expression)) {
expressionsAfterPruning.insert(expression);
}
}
return expression_vector{expressionsAfterPruning.begin(), expressionsAfterPruning.end()};
}

} // namespace optimizer
} // namespace kuzu
17 changes: 12 additions & 5 deletions src/processor/mapper/map_hash_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ static FactorizedTableScan* getTableScanForAccHashJoin(HashJoinProbe* hashJoinPr
return (FactorizedTableScan*)op;
}

static SemiMasker* getSemiMasker(FactorizedTableScan* tableScan) {
auto op = (PhysicalOperator*)tableScan;
// Search on current pipeline.
while (
op->getNumChildren() == 1 && op->getOperatorType() != PhysicalOperatorType::SEMI_MASKER) {
op = op->getChild(0);
}
assert(op->getOperatorType() == PhysicalOperatorType::SEMI_MASKER);
return (SemiMasker*)op;
}

static void constructAccPipeline(FactorizedTableScan* tableScan, HashJoinProbe* hashJoinProbe) {
auto resultCollector = tableScan->moveUnaryChild();
hashJoinProbe->addChild(std::move(resultCollector));
Expand All @@ -59,11 +70,7 @@ static void mapASPJoin(Expression* joinNodeID, HashJoinProbe* hashJoinProbe) {
assert(scanNodeIDCandidates.size() == 1);
// set semi masker
auto tableScan = getTableScanForAccHashJoin(hashJoinProbe);
// TODO(Xiyang): `tableScan->getChild(0)->getChild(0)`. This is not a good practice, can we
// change this to a more meaningful way?
assert(tableScan->getChild(0)->getChild(0)->getOperatorType() ==
PhysicalOperatorType::SEMI_MASKER);
auto semiMasker = (SemiMasker*)tableScan->getChild(0)->getChild(0);
auto semiMasker = getSemiMasker(tableScan);
auto sharedState = scanNodeIDCandidates[0]->getSharedState();
assert(sharedState->getNumTableStates() == 1);
semiMasker->setSharedState(sharedState->getTableState(0));
Expand Down
3 changes: 3 additions & 0 deletions src/processor/operator/intersect/intersect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ void Intersect::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* c
for (auto i = 0u; i < dataInfo.payloadsDataPos.size(); i++) {
auto vector = resultSet->getValueVector(dataInfo.payloadsDataPos[i]);
// Always skip the first two columns in the fTable: build key and intersect key.
// TODO(Guodong): this is a potential bug because you cannot guarantee intersect key is
// the second column. Once this is solved, go back and refactor projection push down for
// intersect.
columnIdxesToScanFrom.push_back(i + 2);
vectorsToReadInto.push_back(vector);
}
Expand Down
1 change: 0 additions & 1 deletion test/test_files/tinysnb/match/node.test
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@
-QUERY MATCH (a:organisation) RETURN COUNT(*)
---- 1
3