Skip to content

Commit

Permalink
Merge pull request #1850 from kuzudb/mapper-refactor
Browse files Browse the repository at this point in the history
Rename mapper functions
  • Loading branch information
andyfengHKU committed Jul 23, 2023
2 parents 4af13f0 + 5ff4810 commit 77c7829
Show file tree
Hide file tree
Showing 35 changed files with 197 additions and 276 deletions.
156 changes: 54 additions & 102 deletions src/include/processor/mapper/plan_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include "binder/expression/node_expression.h"
#include "common/statement_type.h"
#include "planner/logical_plan/logical_operator/logical_copy.h"
#include "planner/logical_plan/logical_plan.h"
#include "processor/mapper/expression_mapper.h"
#include "processor/operator/result_collector.h"
Expand All @@ -28,109 +27,64 @@ class PlanMapper {
planner::LogicalPlan* logicalPlan, const binder::expression_vector& expressionsToCollect);

private:
std::unique_ptr<PhysicalOperator> mapLogicalOperatorToPhysical(
const std::shared_ptr<planner::LogicalOperator>& logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalScanFrontierToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalScanNodeToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalIndexScanNodeToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalUnwindToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalExtendToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalRecursiveExtendToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalPathPropertyProbeToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalFlattenToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalFilterToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalProjectionToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalScanNodePropertyToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalSemiMaskerToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalHashJoinToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalIntersectToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalCrossProductToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalMultiplicityReducerToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalNodeLabelFilterToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalSkipToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalLimitToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalAggregateToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalDistinctToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalOrderByToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalUnionAllToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalAccumulateToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalExpressionsScanToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalCreateNodeToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalCreateRelToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalSetNodePropertyToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalSetRelPropertyToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalDeleteNodeToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalDeleteRelToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalCreateNodeTableToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalCreateRelTableToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalCopyToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalCopyNodeToPhysical(planner::LogicalCopy* copy);
std::unique_ptr<PhysicalOperator> mapLogicalCopyRelToPhysical(planner::LogicalCopy* copy);
std::unique_ptr<PhysicalOperator> mapLogicalDropTableToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalRenameTableToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalAddPropertyToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalDropPropertyToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalRenamePropertyToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalStandaloneCallToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalInQueryCallToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalExplainToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalCreateMacroToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapOperator(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapScanFrontier(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapScanNode(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapIndexScanNode(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapUnwind(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapExtend(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapRecursiveExtend(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapPathPropertyProbe(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapFlatten(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapFilter(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapProjection(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapScanNodeProperty(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapSemiMasker(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapHashJoin(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapIntersect(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapCrossProduct(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapMultiplicityReducer(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapNodeLabelFilter(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapSkip(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLimit(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapAggregate(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapDistinct(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapOrderBy(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapUnionAll(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapAccumulate(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapExpressionsScan(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapCreateNode(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapCreateRel(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapSetNodeProperty(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapSetRelProperty(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapDeleteNode(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapDeleteRel(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapCreateNodeTable(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapCreateRelTable(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapCopy(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapCopyNode(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapCopyRel(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapDropTable(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapRenameTable(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapAddProperty(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapDropProperty(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapRenameProperty(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapStandaloneCall(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapInQueryCall(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapExplain(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapCreateMacro(planner::LogicalOperator* logicalOperator);

std::unique_ptr<ResultCollector> createResultCollector(
const binder::expression_vector& expressions, planner::Schema* schema,
std::unique_ptr<PhysicalOperator> prevOperator);
std::unique_ptr<PhysicalOperator> createFactorizedTableScan(
const binder::expression_vector& expressions, planner::Schema* schema,
std::shared_ptr<FactorizedTable> table, std::unique_ptr<PhysicalOperator> prevOperator);

inline uint32_t getOperatorID() { return physicalOperatorID++; }

std::unique_ptr<HashJoinBuildInfo> createHashBuildInfo(const planner::Schema& buildSideSchema,
const binder::expression_vector& keys, const binder::expression_vector& payloads);

std::unique_ptr<PhysicalOperator> createHashAggregate(
const binder::expression_vector& keyExpressions,
const binder::expression_vector& dependentKeyExpressions,
Expand All @@ -139,19 +93,17 @@ class PlanMapper {
std::vector<DataPos> aggregatesOutputPos, planner::Schema* inSchema,
planner::Schema* outSchema, std::unique_ptr<PhysicalOperator> prevOperator,
const std::string& paramsString);
std::unique_ptr<PhysicalOperator> appendResultCollectorIfNotCopy(
std::unique_ptr<PhysicalOperator> lastOperator,
binder::expression_vector expressionsToCollect, planner::Schema* schema);

inline uint32_t getOperatorID() { return physicalOperatorID++; }

static void mapSIPJoin(PhysicalOperator* probe);

static std::vector<DataPos> getExpressionsDataPos(
const binder::expression_vector& expressions, const planner::Schema& schema);

std::unique_ptr<PhysicalOperator> appendResultCollectorIfNotCopy(
std::unique_ptr<PhysicalOperator> lastOperator,
binder::expression_vector expressionsToCollect, planner::Schema* schema);

static void setPhysicalPlanIfProfile(
planner::LogicalPlan* logicalPlan, PhysicalPlan* physicalPlan);

public:
storage::StorageManager& storageManager;
storage::MemoryManager* memoryManager;
Expand Down
5 changes: 2 additions & 3 deletions src/processor/mapper/map_accumulate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ using namespace kuzu::planner;
namespace kuzu {
namespace processor {

std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalAccumulateToPhysical(
LogicalOperator* logicalOperator) {
std::unique_ptr<PhysicalOperator> PlanMapper::mapAccumulate(LogicalOperator* logicalOperator) {
auto logicalAccumulate = (LogicalAccumulate*)logicalOperator;
auto outSchema = logicalAccumulate->getSchema();
auto inSchema = logicalAccumulate->getChild(0)->getSchema();
// append result collector
auto prevOperator = mapLogicalOperatorToPhysical(logicalAccumulate->getChild(0));
auto prevOperator = mapOperator(logicalAccumulate->getChild(0).get());
auto expressions = logicalAccumulate->getExpressions();
auto resultCollector = createResultCollector(expressions, inSchema, std::move(prevOperator));
// append factorized table scan
Expand Down
5 changes: 2 additions & 3 deletions src/processor/mapper/map_aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,11 @@ static binder::expression_vector getKeyExpressions(
return result;
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalAggregateToPhysical(
LogicalOperator* logicalOperator) {
std::unique_ptr<PhysicalOperator> PlanMapper::mapAggregate(LogicalOperator* logicalOperator) {
auto& logicalAggregate = (const LogicalAggregate&)*logicalOperator;
auto outSchema = logicalAggregate.getSchema();
auto inSchema = logicalAggregate.getChild(0)->getSchema();
auto prevOperator = mapLogicalOperatorToPhysical(logicalOperator->getChild(0));
auto prevOperator = mapOperator(logicalOperator->getChild(0).get());
auto paramsString = logicalAggregate.getExpressionsForPrinting();
std::vector<std::unique_ptr<AggregateFunction>> aggregateFunctions;
for (auto& expression : logicalAggregate.getAggregateExpressions()) {
Expand Down
15 changes: 9 additions & 6 deletions src/processor/mapper/map_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@ using namespace kuzu::storage;
namespace kuzu {
namespace processor {

std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyToPhysical(
LogicalOperator* logicalOperator) {
std::unique_ptr<PhysicalOperator> PlanMapper::mapCopy(LogicalOperator* logicalOperator) {
auto copy = (LogicalCopy*)logicalOperator;
auto tableName = catalog->getReadOnlyVersion()->getTableName(copy->getTableID());
if (catalog->getReadOnlyVersion()->containNodeTable(tableName)) {
return mapLogicalCopyNodeToPhysical(copy);
return mapCopyNode(logicalOperator);
} else {
return mapLogicalCopyRelToPhysical(copy);
return mapCopyRel(logicalOperator);
}
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyNodeToPhysical(LogicalCopy* copy) {
std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNode(
planner::LogicalOperator* logicalOperator) {
auto copy = (LogicalCopy*)logicalOperator;
auto fileType = copy->getCopyDescription().fileType;
if (fileType != common::CopyDescription::FileType::CSV &&
fileType != common::CopyDescription::FileType::PARQUET &&
Expand Down Expand Up @@ -91,7 +92,9 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyNodeToPhysical(Logic
outputExpressions, outSchema, copyNodeSharedState->table, std::move(copyNode));
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyRelToPhysical(LogicalCopy* copy) {
std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyRel(
planner::LogicalOperator* logicalOperator) {
auto copy = (LogicalCopy*)logicalOperator;
auto relsStatistics = &storageManager.getRelsStore().getRelsStatistics();
auto table = storageManager.getRelsStore().getRelTable(copy->getTableID());
return std::make_unique<CopyRel>(catalog, copy->getCopyDescription(), table,
Expand Down
10 changes: 4 additions & 6 deletions src/processor/mapper/map_create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ using namespace kuzu::storage;
namespace kuzu {
namespace processor {

std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCreateNodeToPhysical(
LogicalOperator* logicalOperator) {
std::unique_ptr<PhysicalOperator> PlanMapper::mapCreateNode(LogicalOperator* logicalOperator) {
auto logicalCreateNode = (LogicalCreateNode*)logicalOperator;
auto outSchema = logicalCreateNode->getSchema();
auto inSchema = logicalCreateNode->getChild(0)->getSchema();
auto prevOperator = mapLogicalOperatorToPhysical(logicalOperator->getChild(0));
auto prevOperator = mapOperator(logicalOperator->getChild(0).get());
auto& nodesStore = storageManager.getNodesStore();
auto catalogContent = catalog->getReadOnlyVersion();
std::vector<std::unique_ptr<CreateNodeInfo>> createNodeInfos;
Expand All @@ -40,11 +39,10 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCreateNodeToPhysical(
getOperatorID(), logicalCreateNode->getExpressionsForPrinting());
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCreateRelToPhysical(
LogicalOperator* logicalOperator) {
std::unique_ptr<PhysicalOperator> PlanMapper::mapCreateRel(LogicalOperator* logicalOperator) {
auto logicalCreateRel = (LogicalCreateRel*)logicalOperator;
auto inSchema = logicalCreateRel->getChild(0)->getSchema();
auto prevOperator = mapLogicalOperatorToPhysical(logicalOperator->getChild(0));
auto prevOperator = mapOperator(logicalOperator->getChild(0).get());
auto& relStore = storageManager.getRelsStore();
std::vector<std::unique_ptr<CreateRelInfo>> createRelInfos;
for (auto i = 0u; i < logicalCreateRel->getNumRels(); ++i) {
Expand Down
2 changes: 1 addition & 1 deletion src/processor/mapper/map_create_macro.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ using namespace kuzu::planner;
namespace kuzu {
namespace processor {

std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCreateMacroToPhysical(
std::unique_ptr<PhysicalOperator> PlanMapper::mapCreateMacro(
planner::LogicalOperator* logicalOperator) {
auto logicalCreateMacro = (LogicalCreateMacro*)logicalOperator;
auto outSchema = logicalCreateMacro->getSchema();
Expand Down
7 changes: 3 additions & 4 deletions src/processor/mapper/map_cross_product.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@ using namespace kuzu::planner;
namespace kuzu {
namespace processor {

std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCrossProductToPhysical(
LogicalOperator* logicalOperator) {
std::unique_ptr<PhysicalOperator> PlanMapper::mapCrossProduct(LogicalOperator* logicalOperator) {
auto logicalCrossProduct = (LogicalCrossProduct*)logicalOperator;
auto outSchema = logicalCrossProduct->getSchema();
// map build side
auto buildSideSchema = logicalCrossProduct->getBuildSideSchema();
auto buildSidePrevOperator = mapLogicalOperatorToPhysical(logicalCrossProduct->getChild(1));
auto buildSidePrevOperator = mapOperator(logicalCrossProduct->getChild(1).get());
auto resultCollector = createResultCollector(buildSideSchema->getExpressionsInScope(),
buildSideSchema, std::move(buildSidePrevOperator));
// map probe side
auto probeSidePrevOperator = mapLogicalOperatorToPhysical(logicalCrossProduct->getChild(0));
auto probeSidePrevOperator = mapOperator(logicalCrossProduct->getChild(0).get());
std::vector<DataPos> outVecPos;
std::vector<uint32_t> colIndicesToScan;
auto expressions = buildSideSchema->getExpressionsInScope();
Expand Down
Loading

0 comments on commit 77c7829

Please sign in to comment.