Skip to content

Commit

Permalink
Keep optimal plan per factorization schema during enumeration
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Mar 20, 2023
1 parent 8299c02 commit 3a06d27
Show file tree
Hide file tree
Showing 14 changed files with 166 additions and 71 deletions.
12 changes: 5 additions & 7 deletions src/binder/query/query_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ std::unordered_set<uint32_t> SubqueryGraph::getRelNbrPositions() const {
return result;
}

std::unordered_set<SubqueryGraph, SubqueryGraphHasher> SubqueryGraph::getNbrSubgraphs(
uint32_t size) const {
subquery_graph_set_t SubqueryGraph::getNbrSubgraphs(uint32_t size) const {
auto result = getBaseNbrSubgraph();
for (auto i = 1u; i < size; ++i) {
std::unordered_set<SubqueryGraph, SubqueryGraphHasher> tmp;
Expand Down Expand Up @@ -106,8 +105,8 @@ std::unordered_set<uint32_t> SubqueryGraph::getNodePositionsIgnoringNodeSelector
return result;
}

std::unordered_set<SubqueryGraph, SubqueryGraphHasher> SubqueryGraph::getBaseNbrSubgraph() const {
std::unordered_set<SubqueryGraph, SubqueryGraphHasher> result;
subquery_graph_set_t SubqueryGraph::getBaseNbrSubgraph() const {
subquery_graph_set_t result;
for (auto& nodePos : getNodeNbrPositions()) {
auto nbr = SubqueryGraph(queryGraph);
nbr.addQueryNode(nodePos);
Expand All @@ -121,9 +120,8 @@ std::unordered_set<SubqueryGraph, SubqueryGraphHasher> SubqueryGraph::getBaseNbr
return result;
}

std::unordered_set<SubqueryGraph, SubqueryGraphHasher> SubqueryGraph::getNextNbrSubgraphs(
const SubqueryGraph& prevNbr) const {
std::unordered_set<SubqueryGraph, SubqueryGraphHasher> result;
subquery_graph_set_t SubqueryGraph::getNextNbrSubgraphs(const SubqueryGraph& prevNbr) const {
subquery_graph_set_t result;
for (auto& nodePos : prevNbr.getNodeNbrPositions()) {
if (queryNodesSelector[nodePos]) {
continue;
Expand Down
11 changes: 7 additions & 4 deletions src/include/binder/query/reading_clause/query_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ const uint8_t MAX_NUM_VARIABLES = 64;

class QueryGraph;
struct SubqueryGraph;
struct SubqueryGraphHasher;
using subquery_graph_set_t = std::unordered_set<SubqueryGraph, SubqueryGraphHasher>;
template<typename T>
using subquery_graph_V_map_t = std::unordered_map<SubqueryGraph, T, SubqueryGraphHasher>;

// hash on node bitset if subgraph has no rel
struct SubqueryGraphHasher {
Expand Down Expand Up @@ -51,7 +55,7 @@ struct SubqueryGraph {

std::unordered_set<uint32_t> getNodeNbrPositions() const;
std::unordered_set<uint32_t> getRelNbrPositions() const;
std::unordered_set<SubqueryGraph, SubqueryGraphHasher> getNbrSubgraphs(uint32_t size) const;
subquery_graph_set_t getNbrSubgraphs(uint32_t size) const;
std::vector<uint32_t> getConnectedNodePos(const SubqueryGraph& nbr) const;

// E.g. query graph (a)-[e1]->(b) and subgraph (a)-[e1], although (b) is not in subgraph, we
Expand All @@ -65,9 +69,8 @@ struct SubqueryGraph {
}

private:
std::unordered_set<SubqueryGraph, SubqueryGraphHasher> getBaseNbrSubgraph() const;
std::unordered_set<SubqueryGraph, SubqueryGraphHasher> getNextNbrSubgraphs(
const SubqueryGraph& prevNbr) const;
subquery_graph_set_t getBaseNbrSubgraph() const;
subquery_graph_set_t getNextNbrSubgraphs(const SubqueryGraph& prevNbr) const;
};

// QueryGraph represents a connected pattern specified in MATCH clause.
Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ using frame_group_idx_t = page_group_idx_t;
using list_header_t = uint32_t;
using property_id_t = uint32_t;
constexpr property_id_t INVALID_PROPERTY_ID = UINT32_MAX;
using vector_idx_t = uint32_t;

// System representation for a variable-sized overflow value.
struct overflow_value_t {
Expand Down
27 changes: 23 additions & 4 deletions src/include/planner/logical_plan/logical_operator/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@
namespace kuzu {
namespace planner {

typedef uint32_t f_group_pos;
typedef std::unordered_set<f_group_pos> f_group_pos_set;
using f_group_pos = uint32_t;
using f_group_pos_set = std::unordered_set<f_group_pos>;
constexpr f_group_pos INVALID_F_GROUP_POS = UINT32_MAX;

class Schema;
struct SchemaHasher;
struct SchemaApproximateEquality;
template<typename T>
using schema_map_t = std::unordered_map<Schema*, T, SchemaHasher, SchemaApproximateEquality>;

class FactorizationGroup {
friend class Schema;

Expand Down Expand Up @@ -57,7 +63,9 @@ class FactorizationGroup {

class Schema {
public:
inline f_group_pos getNumGroups() const { return groups.size(); }
inline size_t getNumGroups() const { return groups.size(); }
inline size_t getNumFlatGroups() const { return getNumGroups(true /* isFlat */); }
inline size_t getNumUnFlatGroups() const { return getNumGroups(false /* isFlat */); }

inline FactorizationGroup* getGroup(std::shared_ptr<binder::Expression> expression) const {
return getGroup(getGroupPos(expression->getUniqueName()));
Expand Down Expand Up @@ -114,12 +122,15 @@ class Schema {
}

// Get the group positions containing at least one expression in scope.
std::unordered_set<f_group_pos> getGroupsPosInScope() const;
f_group_pos_set getGroupsPosInScope() const;

std::unique_ptr<Schema> copy() const;

void clear();

private:
size_t getNumGroups(bool isFlat) const;

private:
std::vector<std::unique_ptr<FactorizationGroup>> groups;
std::unordered_map<std::string, uint32_t> expressionNameToGroupPos;
Expand All @@ -128,6 +139,14 @@ class Schema {
binder::expression_vector expressionsInScope;
};

struct SchemaHasher {
std::size_t operator()(const Schema* const& schema) const;
};

struct SchemaApproximateEquality {
bool operator()(const Schema* const& left, const Schema* const& right) const;
};

class SchemaUtils {
public:
static std::vector<binder::expression_vector> getExpressionsPerGroup(
Expand Down
2 changes: 2 additions & 0 deletions src/include/planner/logical_plan/logical_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
namespace kuzu {
namespace planner {

class LogicalPlan;

class LogicalPlan {
public:
LogicalPlan() : estCardinality{1}, cost{0} {}
Expand Down
25 changes: 19 additions & 6 deletions src/include/planner/subplans_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ namespace kuzu {
namespace planner {

const uint64_t MAX_LEVEL_TO_PLAN_EXACTLY = 7;
const uint64_t MAX_NUM_SUBGRAPHS_PER_LEVEL = 100;
const uint64_t MAX_NUM_SUBGRAPHS_PER_LEVEL = 50;
const uint64_t MAX_NUM_PLANS_PER_SUBGRAPH = 50;

class SubPlansTable {
typedef std::unordered_map<SubqueryGraph, std::vector<std::unique_ptr<LogicalPlan>>,
SubqueryGraphHasher>
SubqueryGraphPlansMap;
struct PlanSet;
// Each dp level is a map from sub query graph to a set of plans
using dp_level_t = subquery_graph_V_map_t<std::unique_ptr<PlanSet>>;

public:
void resize(uint32_t newSize);
Expand All @@ -31,12 +31,25 @@ class SubPlansTable {
std::vector<SubqueryGraph> getSubqueryGraphs(uint32_t level);

void addPlan(const SubqueryGraph& subqueryGraph, std::unique_ptr<LogicalPlan> plan);
void finalizeLevel(uint32_t level);

void clear();

private:
std::vector<std::unique_ptr<SubqueryGraphPlansMap>> subPlans;
struct PlanSet {
std::vector<std::unique_ptr<LogicalPlan>> plans;
schema_map_t<common::vector_idx_t> schemaToPlanIdx;

inline std::vector<std::unique_ptr<LogicalPlan>>& getPlans() { return plans; }

void addPlan(std::unique_ptr<LogicalPlan> plan);
};

dp_level_t* getDPLevel(const SubqueryGraph& subqueryGraph) const {
return dpLevels[subqueryGraph.getTotalNumVariables()].get();
}

private:
std::vector<std::unique_ptr<dp_level_t>> dpLevels;
};

} // namespace planner
Expand Down
1 change: 0 additions & 1 deletion src/planner/join_order_enumerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ void JoinOrderEnumerator::planLevel(uint32_t level) {
} else {
planLevelExactly(level);
}
context->subPlansTable->finalizeLevel(level);
}

void JoinOrderEnumerator::planLevelExactly(uint32_t level) {
Expand Down
45 changes: 45 additions & 0 deletions src/planner/operator/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,51 @@ void Schema::clear() {
clearExpressionsInScope();
}

size_t Schema::getNumGroups(bool isFlat) const {
auto result = 0u;
for (auto groupPos : getGroupsPosInScope()) {
result += groups[groupPos]->isFlat() == isFlat;
}
return result;
}

std::size_t SchemaHasher::operator()(const Schema* const& schema) const {
return std::hash<size_t>{}(schema->getNumFlatGroups()) ^
std::hash<size_t>{}(schema->getNumUnFlatGroups());
}

// We use this equality in join order enumeration to make sure at each DP level, we don't just keep
// the best plan, but keep best plan for each unique factorization schema.
// In order to balance enumeration time, we use an approximate equality check to reduce computation.
// We check the following
// - number of factorization groups
// - number of unFlat factorization groups
// - number of expressions
// - if an expression has the same flat/unFlat flag in both schemas
bool SchemaApproximateEquality::operator()(
const Schema* const& left, const Schema* const& right) const {
if (left->getNumGroups() != right->getNumGroups()) {
return false;
}
if (left->getNumUnFlatGroups() != right->getNumUnFlatGroups()) {
return false;
}
if (left->getExpressionsInScope().size() != right->getExpressionsInScope().size()) {
return false;
}
for (auto& expression : left->getExpressionsInScope()) {
if (!right->isExpressionInScope(*expression)) {
return false;
}
auto leftGroupPos = left->getGroupPos(*expression);
auto rightGroupPos = right->getGroupPos(*expression);
if (left->getGroup(leftGroupPos)->isFlat() != right->getGroup(rightGroupPos)->isFlat()) {
return false;
}
}
return true;
}

std::vector<binder::expression_vector> SchemaUtils::getExpressionsPerGroup(
const binder::expression_vector& expressions, const Schema& schema) {
std::vector<binder::expression_vector> result;
Expand Down
58 changes: 31 additions & 27 deletions src/planner/subplans_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,60 +6,64 @@ namespace kuzu {
namespace planner {

void SubPlansTable::resize(uint32_t newSize) {
auto prevSize = subPlans.size();
subPlans.resize(newSize);
auto prevSize = dpLevels.size();
dpLevels.resize(newSize);
for (auto i = prevSize; i < newSize; ++i) {
subPlans[i] = std::make_unique<SubqueryGraphPlansMap>();
dpLevels[i] = std::make_unique<dp_level_t>();
}
}

bool SubPlansTable::containSubgraphPlans(const SubqueryGraph& subqueryGraph) const {
return subPlans[subqueryGraph.getTotalNumVariables()]->contains(subqueryGraph);
return getDPLevel(subqueryGraph)->contains(subqueryGraph);
}

std::vector<std::unique_ptr<LogicalPlan>>& SubPlansTable::getSubgraphPlans(
const SubqueryGraph& subqueryGraph) {
auto subqueryGraphPlansMap = subPlans[subqueryGraph.getTotalNumVariables()].get();
KU_ASSERT(subqueryGraphPlansMap->contains(subqueryGraph));
return subqueryGraphPlansMap->at(subqueryGraph);
auto dpLevel = getDPLevel(subqueryGraph);
KU_ASSERT(dpLevel->contains(subqueryGraph));
return dpLevel->at(subqueryGraph)->getPlans();
}

std::vector<SubqueryGraph> SubPlansTable::getSubqueryGraphs(uint32_t level) {
std::vector<SubqueryGraph> result;
for (auto& [subGraph, plans] : *subPlans[level]) {
for (auto& [subGraph, _] : *dpLevels[level]) {
result.push_back(subGraph);
}
return result;
}

void SubPlansTable::addPlan(const SubqueryGraph& subqueryGraph, std::unique_ptr<LogicalPlan> plan) {
assert(subPlans[subqueryGraph.getTotalNumVariables()]);
auto subgraphPlansMap = subPlans[subqueryGraph.getTotalNumVariables()].get();
if (subgraphPlansMap->size() > MAX_NUM_SUBGRAPHS_PER_LEVEL) {
auto dpLevel = getDPLevel(subqueryGraph);
if (dpLevel->size() > MAX_NUM_SUBGRAPHS_PER_LEVEL) {
return;
}
if (!subgraphPlansMap->contains(subqueryGraph)) {
subgraphPlansMap->emplace(subqueryGraph, std::vector<std::unique_ptr<LogicalPlan>>());
if (!dpLevel->contains(subqueryGraph)) {
dpLevel->emplace(subqueryGraph, std::make_unique<PlanSet>());
}
subgraphPlansMap->at(subqueryGraph).push_back(std::move(plan));
dpLevel->at(subqueryGraph)->addPlan(std::move(plan));
}

void SubPlansTable::finalizeLevel(uint32_t level) {
// cap number of plans per subgraph
for (auto& [subgraph, plans] : *subPlans[level]) {
if (plans.size() < MAX_NUM_PLANS_PER_SUBGRAPH) {
continue;
}
sort(plans.begin(), plans.end(),
[](const std::unique_ptr<LogicalPlan>& a, const std::unique_ptr<LogicalPlan>& b)
-> bool { return a->getCost() < b->getCost(); });
plans.resize(MAX_NUM_PLANS_PER_SUBGRAPH);
void SubPlansTable::clear() {
for (auto& dpLevel : dpLevels) {
dpLevel->clear();
}
}

void SubPlansTable::clear() {
for (auto& subPlan : subPlans) {
subPlan->clear();
void SubPlansTable::PlanSet::addPlan(std::unique_ptr<LogicalPlan> plan) {
if (plans.size() >= MAX_NUM_PLANS_PER_SUBGRAPH) {
return;
}
auto schema = plan->getSchema();
if (!schemaToPlanIdx.contains(schema)) { // add plan if this is a new factorization schema
schemaToPlanIdx.insert({schema, plans.size()});
plans.push_back(std::move(plan));
} else { // swap plan for lower cost under the same factorization schema
auto idx = schemaToPlanIdx.at(schema);
assert(idx < MAX_NUM_PLANS_PER_SUBGRAPH);
auto currentPlan = plans[idx].get();
if (currentPlan->getCost() > plan->getCost()) {
plans[idx] = std::move(plan);
}
}
}

Expand Down
17 changes: 10 additions & 7 deletions src/processor/mapper/map_intersect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace processor {
std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalIntersectToPhysical(
LogicalOperator* logicalOperator) {
auto logicalIntersect = (LogicalIntersect*)logicalOperator;
auto intersectNodeID = logicalIntersect->getIntersectNodeID();
auto outSchema = logicalIntersect->getSchema();
std::vector<std::unique_ptr<PhysicalOperator>> children;
children.resize(logicalOperator->getNumChildren());
Expand All @@ -22,17 +23,19 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalIntersectToPhysical(
auto buildSchema = logicalIntersect->getChild(i)->getSchema();
auto buildSidePrevOperator = mapLogicalOperatorToPhysical(logicalIntersect->getChild(i));
std::vector<DataPos> payloadsDataPos;
auto buildDataInfo =
generateBuildDataInfo(*buildSchema, {keyNodeID}, buildSchema->getExpressionsInScope());
for (auto& [dataPos, _] : buildDataInfo.payloadsPosAndType) {
auto expression = buildSchema->getGroup(dataPos.dataChunkPos)
->getExpressions()[dataPos.valueVectorPos];
if (expression->getUniqueName() ==
logicalIntersect->getIntersectNodeID()->getUniqueName()) {
binder::expression_vector expressionsToMaterialize;
expressionsToMaterialize.push_back(keyNodeID);
expressionsToMaterialize.push_back(intersectNodeID);
for (auto& expression : buildSchema->getExpressionsInScope()) {
if (expression->getUniqueName() == keyNodeID->getUniqueName() ||
expression->getUniqueName() == intersectNodeID->getUniqueName()) {
continue;
}
expressionsToMaterialize.push_back(expression);
payloadsDataPos.emplace_back(outSchema->getExpressionPos(*expression));
}
auto buildDataInfo =
generateBuildDataInfo(*buildSchema, {keyNodeID}, expressionsToMaterialize);
auto sharedState = std::make_shared<IntersectSharedState>();
sharedStates.push_back(sharedState);
children[i] = make_unique<IntersectBuild>(
Expand Down
Loading

0 comments on commit 3a06d27

Please sign in to comment.