Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factorization aware dp #1384

Merged
merged 1 commit into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions src/binder/query/query_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ std::unordered_set<uint32_t> SubqueryGraph::getRelNbrPositions() const {
return result;
}

std::unordered_set<SubqueryGraph, SubqueryGraphHasher> SubqueryGraph::getNbrSubgraphs(
uint32_t size) const {
subquery_graph_set_t SubqueryGraph::getNbrSubgraphs(uint32_t size) const {
auto result = getBaseNbrSubgraph();
for (auto i = 1u; i < size; ++i) {
std::unordered_set<SubqueryGraph, SubqueryGraphHasher> tmp;
Expand Down Expand Up @@ -106,8 +105,8 @@ std::unordered_set<uint32_t> SubqueryGraph::getNodePositionsIgnoringNodeSelector
return result;
}

std::unordered_set<SubqueryGraph, SubqueryGraphHasher> SubqueryGraph::getBaseNbrSubgraph() const {
std::unordered_set<SubqueryGraph, SubqueryGraphHasher> result;
subquery_graph_set_t SubqueryGraph::getBaseNbrSubgraph() const {
subquery_graph_set_t result;
for (auto& nodePos : getNodeNbrPositions()) {
auto nbr = SubqueryGraph(queryGraph);
nbr.addQueryNode(nodePos);
Expand All @@ -121,9 +120,8 @@ std::unordered_set<SubqueryGraph, SubqueryGraphHasher> SubqueryGraph::getBaseNbr
return result;
}

std::unordered_set<SubqueryGraph, SubqueryGraphHasher> SubqueryGraph::getNextNbrSubgraphs(
const SubqueryGraph& prevNbr) const {
std::unordered_set<SubqueryGraph, SubqueryGraphHasher> result;
subquery_graph_set_t SubqueryGraph::getNextNbrSubgraphs(const SubqueryGraph& prevNbr) const {
subquery_graph_set_t result;
for (auto& nodePos : prevNbr.getNodeNbrPositions()) {
if (queryNodesSelector[nodePos]) {
continue;
Expand Down
11 changes: 7 additions & 4 deletions src/include/binder/query/reading_clause/query_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ const uint8_t MAX_NUM_VARIABLES = 64;

class QueryGraph;
struct SubqueryGraph;
struct SubqueryGraphHasher;
using subquery_graph_set_t = std::unordered_set<SubqueryGraph, SubqueryGraphHasher>;
template<typename T>
using subquery_graph_V_map_t = std::unordered_map<SubqueryGraph, T, SubqueryGraphHasher>;

// hash on node bitset if subgraph has no rel
struct SubqueryGraphHasher {
Expand Down Expand Up @@ -51,7 +55,7 @@ struct SubqueryGraph {

std::unordered_set<uint32_t> getNodeNbrPositions() const;
std::unordered_set<uint32_t> getRelNbrPositions() const;
std::unordered_set<SubqueryGraph, SubqueryGraphHasher> getNbrSubgraphs(uint32_t size) const;
subquery_graph_set_t getNbrSubgraphs(uint32_t size) const;
std::vector<uint32_t> getConnectedNodePos(const SubqueryGraph& nbr) const;

// E.g. query graph (a)-[e1]->(b) and subgraph (a)-[e1], although (b) is not in subgraph, we
Expand All @@ -65,9 +69,8 @@ struct SubqueryGraph {
}

private:
std::unordered_set<SubqueryGraph, SubqueryGraphHasher> getBaseNbrSubgraph() const;
std::unordered_set<SubqueryGraph, SubqueryGraphHasher> getNextNbrSubgraphs(
const SubqueryGraph& prevNbr) const;
subquery_graph_set_t getBaseNbrSubgraph() const;
subquery_graph_set_t getNextNbrSubgraphs(const SubqueryGraph& prevNbr) const;
};

// QueryGraph represents a connected pattern specified in MATCH clause.
Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ using frame_group_idx_t = page_group_idx_t;
using list_header_t = uint32_t;
using property_id_t = uint32_t;
constexpr property_id_t INVALID_PROPERTY_ID = UINT32_MAX;
using vector_idx_t = uint32_t;

// System representation for a variable-sized overflow value.
struct overflow_value_t {
Expand Down
27 changes: 23 additions & 4 deletions src/include/planner/logical_plan/logical_operator/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@
namespace kuzu {
namespace planner {

typedef uint32_t f_group_pos;
typedef std::unordered_set<f_group_pos> f_group_pos_set;
using f_group_pos = uint32_t;
using f_group_pos_set = std::unordered_set<f_group_pos>;
andyfengHKU marked this conversation as resolved.
Show resolved Hide resolved
constexpr f_group_pos INVALID_F_GROUP_POS = UINT32_MAX;

class Schema;
struct SchemaHasher;
struct SchemaApproximateEquality;
template<typename T>
using schema_map_t = std::unordered_map<Schema*, T, SchemaHasher, SchemaApproximateEquality>;

class FactorizationGroup {
friend class Schema;

Expand Down Expand Up @@ -57,7 +63,9 @@ class FactorizationGroup {

class Schema {
public:
inline f_group_pos getNumGroups() const { return groups.size(); }
inline size_t getNumGroups() const { return groups.size(); }
inline size_t getNumFlatGroups() const { return getNumGroups(true /* isFlat */); }
inline size_t getNumUnFlatGroups() const { return getNumGroups(false /* isFlat */); }

inline FactorizationGroup* getGroup(std::shared_ptr<binder::Expression> expression) const {
return getGroup(getGroupPos(expression->getUniqueName()));
Expand Down Expand Up @@ -114,12 +122,15 @@ class Schema {
}

// Get the group positions containing at least one expression in scope.
std::unordered_set<f_group_pos> getGroupsPosInScope() const;
f_group_pos_set getGroupsPosInScope() const;

std::unique_ptr<Schema> copy() const;

void clear();

private:
size_t getNumGroups(bool isFlat) const;

private:
std::vector<std::unique_ptr<FactorizationGroup>> groups;
std::unordered_map<std::string, uint32_t> expressionNameToGroupPos;
Expand All @@ -128,6 +139,14 @@ class Schema {
binder::expression_vector expressionsInScope;
};

struct SchemaHasher {
std::size_t operator()(const Schema* const& schema) const;
};

struct SchemaApproximateEquality {
bool operator()(const Schema* const& left, const Schema* const& right) const;
};

class SchemaUtils {
public:
static std::vector<binder::expression_vector> getExpressionsPerGroup(
Expand Down
2 changes: 2 additions & 0 deletions src/include/planner/logical_plan/logical_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
namespace kuzu {
namespace planner {

class LogicalPlan;

class LogicalPlan {
public:
LogicalPlan() : estCardinality{1}, cost{0} {}
Expand Down
25 changes: 19 additions & 6 deletions src/include/planner/subplans_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ namespace kuzu {
namespace planner {

const uint64_t MAX_LEVEL_TO_PLAN_EXACTLY = 7;
const uint64_t MAX_NUM_SUBGRAPHS_PER_LEVEL = 100;
const uint64_t MAX_NUM_SUBGRAPHS_PER_LEVEL = 50;
const uint64_t MAX_NUM_PLANS_PER_SUBGRAPH = 50;

class SubPlansTable {
typedef std::unordered_map<SubqueryGraph, std::vector<std::unique_ptr<LogicalPlan>>,
SubqueryGraphHasher>
SubqueryGraphPlansMap;
struct PlanSet;
// Each dp level is a map from sub query graph to a set of plans
using dp_level_t = subquery_graph_V_map_t<std::unique_ptr<PlanSet>>;

public:
void resize(uint32_t newSize);
Expand All @@ -31,12 +31,25 @@ class SubPlansTable {
std::vector<SubqueryGraph> getSubqueryGraphs(uint32_t level);

void addPlan(const SubqueryGraph& subqueryGraph, std::unique_ptr<LogicalPlan> plan);
void finalizeLevel(uint32_t level);

void clear();

private:
std::vector<std::unique_ptr<SubqueryGraphPlansMap>> subPlans;
struct PlanSet {
std::vector<std::unique_ptr<LogicalPlan>> plans;
schema_map_t<common::vector_idx_t> schemaToPlanIdx;
andyfengHKU marked this conversation as resolved.
Show resolved Hide resolved

inline std::vector<std::unique_ptr<LogicalPlan>>& getPlans() { return plans; }

void addPlan(std::unique_ptr<LogicalPlan> plan);
};

dp_level_t* getDPLevel(const SubqueryGraph& subqueryGraph) const {
return dpLevels[subqueryGraph.getTotalNumVariables()].get();
}

private:
std::vector<std::unique_ptr<dp_level_t>> dpLevels;
};

} // namespace planner
Expand Down
1 change: 0 additions & 1 deletion src/planner/join_order_enumerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ void JoinOrderEnumerator::planLevel(uint32_t level) {
} else {
planLevelExactly(level);
}
context->subPlansTable->finalizeLevel(level);
}

void JoinOrderEnumerator::planLevelExactly(uint32_t level) {
Expand Down
45 changes: 45 additions & 0 deletions src/planner/operator/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,51 @@ void Schema::clear() {
clearExpressionsInScope();
}

size_t Schema::getNumGroups(bool isFlat) const {
auto result = 0u;
for (auto groupPos : getGroupsPosInScope()) {
result += groups[groupPos]->isFlat() == isFlat;
}
return result;
}

std::size_t SchemaHasher::operator()(const Schema* const& schema) const {
return std::hash<size_t>{}(schema->getNumFlatGroups()) ^
std::hash<size_t>{}(schema->getNumUnFlatGroups());
}

// We use this equality in join order enumeration to make sure at each DP level, we don't just keep
// the best plan, but keep best plan for each unique factorization schema.
// In order to balance enumeration time, we use an approximate equality check to reduce computation.
andyfengHKU marked this conversation as resolved.
Show resolved Hide resolved
// We check the following
// - number of factorization groups
// - number of unFlat factorization groups
// - number of expressions
// - if an expression has the same flat/unFlat flag in both schemas
bool SchemaApproximateEquality::operator()(
const Schema* const& left, const Schema* const& right) const {
if (left->getNumGroups() != right->getNumGroups()) {
return false;
}
if (left->getNumUnFlatGroups() != right->getNumUnFlatGroups()) {
return false;
}
if (left->getExpressionsInScope().size() != right->getExpressionsInScope().size()) {
return false;
}
for (auto& expression : left->getExpressionsInScope()) {
if (!right->isExpressionInScope(*expression)) {
return false;
}
auto leftGroupPos = left->getGroupPos(*expression);
auto rightGroupPos = right->getGroupPos(*expression);
if (left->getGroup(leftGroupPos)->isFlat() != right->getGroup(rightGroupPos)->isFlat()) {
return false;
}
}
return true;
}

std::vector<binder::expression_vector> SchemaUtils::getExpressionsPerGroup(
const binder::expression_vector& expressions, const Schema& schema) {
std::vector<binder::expression_vector> result;
Expand Down
58 changes: 31 additions & 27 deletions src/planner/subplans_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,60 +6,64 @@ namespace kuzu {
namespace planner {

void SubPlansTable::resize(uint32_t newSize) {
auto prevSize = subPlans.size();
subPlans.resize(newSize);
auto prevSize = dpLevels.size();
dpLevels.resize(newSize);
for (auto i = prevSize; i < newSize; ++i) {
subPlans[i] = std::make_unique<SubqueryGraphPlansMap>();
dpLevels[i] = std::make_unique<dp_level_t>();
}
}

bool SubPlansTable::containSubgraphPlans(const SubqueryGraph& subqueryGraph) const {
return subPlans[subqueryGraph.getTotalNumVariables()]->contains(subqueryGraph);
return getDPLevel(subqueryGraph)->contains(subqueryGraph);
}

std::vector<std::unique_ptr<LogicalPlan>>& SubPlansTable::getSubgraphPlans(
const SubqueryGraph& subqueryGraph) {
auto subqueryGraphPlansMap = subPlans[subqueryGraph.getTotalNumVariables()].get();
KU_ASSERT(subqueryGraphPlansMap->contains(subqueryGraph));
return subqueryGraphPlansMap->at(subqueryGraph);
auto dpLevel = getDPLevel(subqueryGraph);
KU_ASSERT(dpLevel->contains(subqueryGraph));
return dpLevel->at(subqueryGraph)->getPlans();
}

std::vector<SubqueryGraph> SubPlansTable::getSubqueryGraphs(uint32_t level) {
std::vector<SubqueryGraph> result;
for (auto& [subGraph, plans] : *subPlans[level]) {
for (auto& [subGraph, _] : *dpLevels[level]) {
result.push_back(subGraph);
}
return result;
}

void SubPlansTable::addPlan(const SubqueryGraph& subqueryGraph, std::unique_ptr<LogicalPlan> plan) {
assert(subPlans[subqueryGraph.getTotalNumVariables()]);
auto subgraphPlansMap = subPlans[subqueryGraph.getTotalNumVariables()].get();
if (subgraphPlansMap->size() > MAX_NUM_SUBGRAPHS_PER_LEVEL) {
auto dpLevel = getDPLevel(subqueryGraph);
if (dpLevel->size() > MAX_NUM_SUBGRAPHS_PER_LEVEL) {
return;
}
if (!subgraphPlansMap->contains(subqueryGraph)) {
subgraphPlansMap->emplace(subqueryGraph, std::vector<std::unique_ptr<LogicalPlan>>());
if (!dpLevel->contains(subqueryGraph)) {
dpLevel->emplace(subqueryGraph, std::make_unique<PlanSet>());
}
subgraphPlansMap->at(subqueryGraph).push_back(std::move(plan));
dpLevel->at(subqueryGraph)->addPlan(std::move(plan));
}

void SubPlansTable::finalizeLevel(uint32_t level) {
// cap number of plans per subgraph
for (auto& [subgraph, plans] : *subPlans[level]) {
if (plans.size() < MAX_NUM_PLANS_PER_SUBGRAPH) {
continue;
}
sort(plans.begin(), plans.end(),
[](const std::unique_ptr<LogicalPlan>& a, const std::unique_ptr<LogicalPlan>& b)
-> bool { return a->getCost() < b->getCost(); });
plans.resize(MAX_NUM_PLANS_PER_SUBGRAPH);
void SubPlansTable::clear() {
for (auto& dpLevel : dpLevels) {
dpLevel->clear();
}
}

void SubPlansTable::clear() {
for (auto& subPlan : subPlans) {
subPlan->clear();
void SubPlansTable::PlanSet::addPlan(std::unique_ptr<LogicalPlan> plan) {
if (plans.size() >= MAX_NUM_PLANS_PER_SUBGRAPH) {
return;
}
auto schema = plan->getSchema();
if (!schemaToPlanIdx.contains(schema)) { // add plan if this is a new factorization schema
schemaToPlanIdx.insert({schema, plans.size()});
plans.push_back(std::move(plan));
} else { // swap plan for lower cost under the same factorization schema
auto idx = schemaToPlanIdx.at(schema);
assert(idx < MAX_NUM_PLANS_PER_SUBGRAPH);
auto currentPlan = plans[idx].get();
if (currentPlan->getCost() > plan->getCost()) {
plans[idx] = std::move(plan);
}
}
}

Expand Down
17 changes: 10 additions & 7 deletions src/processor/mapper/map_intersect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace processor {
std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalIntersectToPhysical(
LogicalOperator* logicalOperator) {
auto logicalIntersect = (LogicalIntersect*)logicalOperator;
auto intersectNodeID = logicalIntersect->getIntersectNodeID();
auto outSchema = logicalIntersect->getSchema();
std::vector<std::unique_ptr<PhysicalOperator>> children;
children.resize(logicalOperator->getNumChildren());
Expand All @@ -22,17 +23,19 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalIntersectToPhysical(
auto buildSchema = logicalIntersect->getChild(i)->getSchema();
auto buildSidePrevOperator = mapLogicalOperatorToPhysical(logicalIntersect->getChild(i));
std::vector<DataPos> payloadsDataPos;
auto buildDataInfo =
generateBuildDataInfo(*buildSchema, {keyNodeID}, buildSchema->getExpressionsInScope());
for (auto& [dataPos, _] : buildDataInfo.payloadsPosAndType) {
auto expression = buildSchema->getGroup(dataPos.dataChunkPos)
->getExpressions()[dataPos.valueVectorPos];
if (expression->getUniqueName() ==
logicalIntersect->getIntersectNodeID()->getUniqueName()) {
binder::expression_vector expressionsToMaterialize;
expressionsToMaterialize.push_back(keyNodeID);
expressionsToMaterialize.push_back(intersectNodeID);
for (auto& expression : buildSchema->getExpressionsInScope()) {
if (expression->getUniqueName() == keyNodeID->getUniqueName() ||
expression->getUniqueName() == intersectNodeID->getUniqueName()) {
continue;
}
expressionsToMaterialize.push_back(expression);
payloadsDataPos.emplace_back(outSchema->getExpressionPos(*expression));
}
auto buildDataInfo =
generateBuildDataInfo(*buildSchema, {keyNodeID}, expressionsToMaterialize);
auto sharedState = std::make_shared<IntersectSharedState>();
sharedStates.push_back(sharedState);
children[i] = make_unique<IntersectBuild>(
Expand Down
Loading