Skip to content

Commit

Permalink
Merge pull request #1887 from kuzudb/create-refactor
Browse files Browse the repository at this point in the history
Refactor create operators
  • Loading branch information
andyfengHKU committed Aug 4, 2023
2 parents 8a49c40 + 92d3315 commit 42d4bc7
Show file tree
Hide file tree
Showing 17 changed files with 392 additions and 337 deletions.
105 changes: 60 additions & 45 deletions src/include/planner/logical_plan/logical_operator/logical_create.h
Original file line number Diff line number Diff line change
@@ -1,71 +1,86 @@
#pragma once

#include "flatten_resolver.h"
#include "logical_update.h"
#include "base_logical_operator.h"
#include "binder/expression/rel_expression.h"

namespace kuzu {
namespace planner {

class LogicalCreateNode : public LogicalUpdateNode {
public:
LogicalCreateNode(std::vector<std::shared_ptr<binder::NodeExpression>> nodes,
binder::expression_vector primaryKeys, std::shared_ptr<LogicalOperator> child)
: LogicalUpdateNode{LogicalOperatorType::CREATE_NODE, std::move(nodes), std::move(child)},
primaryKeys{std::move(primaryKeys)} {}
~LogicalCreateNode() override = default;

void computeFactorizedSchema() override;
void computeFlatSchema() override;

inline f_group_pos_set getGroupsPosToFlatten() {
// Flatten all inputs. E.g. MATCH (a) CREATE (b). We need to create b for each tuple in the
// match clause. This is to simplify operator implementation.
auto childSchema = children[0]->getSchema();
return factorization::FlattenAll::getGroupsPosToFlatten(
childSchema->getGroupsPosInScope(), childSchema);
struct LogicalCreateNodeInfo {
std::shared_ptr<binder::NodeExpression> node;
std::shared_ptr<binder::Expression> primaryKey;

LogicalCreateNodeInfo(std::shared_ptr<binder::NodeExpression> node,
std::shared_ptr<binder::Expression> primaryKey)
: node{std::move(node)}, primaryKey{std::move(primaryKey)} {}
LogicalCreateNodeInfo(const LogicalCreateNodeInfo& other)
: node{other.node}, primaryKey{other.primaryKey} {}

inline std::unique_ptr<LogicalCreateNodeInfo> copy() const {
return std::make_unique<LogicalCreateNodeInfo>(*this);
}
};

struct LogicalCreateRelInfo {
std::shared_ptr<binder::RelExpression> rel;
std::vector<binder::expression_pair> setItems;

inline std::shared_ptr<binder::Expression> getPrimaryKey(size_t idx) const {
return primaryKeys[idx];
LogicalCreateRelInfo(
std::shared_ptr<binder::RelExpression> rel, std::vector<binder::expression_pair> setItems)
: rel{std::move(rel)}, setItems{std::move(setItems)} {}
LogicalCreateRelInfo(const LogicalCreateRelInfo& other)
: rel{other.rel}, setItems{other.setItems} {}

inline std::unique_ptr<LogicalCreateRelInfo> copy() const {
return std::make_unique<LogicalCreateRelInfo>(*this);
}
};

class LogicalCreateNode : public LogicalOperator {
public:
LogicalCreateNode(std::vector<std::unique_ptr<LogicalCreateNodeInfo>> infos,
std::shared_ptr<LogicalOperator> child)
: LogicalOperator{LogicalOperatorType::CREATE_NODE, std::move(child)}, infos{std::move(
infos)} {}

void computeFactorizedSchema() final;
void computeFlatSchema() final;

std::string getExpressionsForPrinting() const final;

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCreateNode>(nodes, primaryKeys, children[0]->copy());
f_group_pos_set getGroupsPosToFlatten();

inline const std::vector<std::unique_ptr<LogicalCreateNodeInfo>>& getInfosRef() const {
return infos;
}

std::unique_ptr<LogicalOperator> copy() final;

private:
binder::expression_vector primaryKeys;
std::vector<std::unique_ptr<LogicalCreateNodeInfo>> infos;
};

class LogicalCreateRel : public LogicalUpdateRel {
class LogicalCreateRel : public LogicalOperator {
public:
LogicalCreateRel(std::vector<std::shared_ptr<binder::RelExpression>> rels,
std::vector<std::vector<binder::expression_pair>> setItemsPerRel,
LogicalCreateRel(std::vector<std::unique_ptr<LogicalCreateRelInfo>> infos,
std::shared_ptr<LogicalOperator> child)
: LogicalUpdateRel{LogicalOperatorType::CREATE_REL, std::move(rels), std::move(child)},
setItemsPerRel{std::move(setItemsPerRel)} {}
~LogicalCreateRel() override = default;
: LogicalOperator{LogicalOperatorType::CREATE_REL, std::move(child)}, infos{std::move(
infos)} {}

inline void computeFactorizedSchema() override { copyChildSchema(0); }
inline void computeFlatSchema() override { copyChildSchema(0); }
inline void computeFactorizedSchema() final { copyChildSchema(0); }
inline void computeFlatSchema() final { copyChildSchema(0); }

inline f_group_pos_set getGroupsPosToFlatten() {
auto childSchema = children[0]->getSchema();
return factorization::FlattenAll::getGroupsPosToFlatten(
childSchema->getGroupsPosInScope(), childSchema);
}
std::string getExpressionsForPrinting() const final;

inline std::vector<binder::expression_pair> getSetItems(uint32_t idx) const {
return setItemsPerRel[idx];
}
f_group_pos_set getGroupsPosToFlatten();

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCreateRel>(rels, setItemsPerRel, children[0]->copy());
}
inline const std::vector<std::unique_ptr<LogicalCreateRelInfo>>& getInfosRef() { return infos; }

std::unique_ptr<LogicalOperator> copy() final;

private:
std::vector<std::vector<binder::expression_pair>> setItemsPerRel;
std::vector<std::unique_ptr<LogicalCreateRelInfo>> infos;
};

} // namespace planner
} // namespace kuzu
} // namespace kuzu
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "logical_update.h"
#include "planner/logical_plan/logical_operator/flatten_resolver.h"
#include "base_logical_operator.h"
#include "binder/expression/rel_expression.h"

namespace kuzu {
namespace planner {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#pragma once

#include "base_logical_operator.h"
#include "logical_update.h"
#include "planner/logical_plan/logical_operator/flatten_resolver.h"

namespace kuzu {
namespace planner {
Expand Down
62 changes: 0 additions & 62 deletions src/include/planner/logical_plan/logical_operator/logical_update.h

This file was deleted.

100 changes: 13 additions & 87 deletions src/include/processor/operator/update/create.h
Original file line number Diff line number Diff line change
@@ -1,117 +1,43 @@
#pragma once

#include "expression_evaluator/base_evaluator.h"
#include "insert_executor.h"
#include "processor/operator/physical_operator.h"
#include "storage/store/node_table.h"

namespace kuzu {
namespace processor {

struct CreateNodeInfo {
catalog::NodeTableSchema* schema;
storage::NodeTable* table;
std::unique_ptr<evaluator::BaseExpressionEvaluator> primaryKeyEvaluator;
std::vector<storage::RelTable*> relTablesToInit;
DataPos outNodeIDVectorPos;

CreateNodeInfo(catalog::NodeTableSchema* schema, storage::NodeTable* table,
std::unique_ptr<evaluator::BaseExpressionEvaluator> primaryKeyEvaluator,
std::vector<storage::RelTable*> relTablesToInit, const DataPos& dataPos)
: schema{schema}, table{table}, primaryKeyEvaluator{std::move(primaryKeyEvaluator)},
relTablesToInit{std::move(relTablesToInit)}, outNodeIDVectorPos{dataPos} {}

inline std::unique_ptr<CreateNodeInfo> clone() {
return std::make_unique<CreateNodeInfo>(schema, table,
primaryKeyEvaluator != nullptr ? primaryKeyEvaluator->clone() : nullptr,
relTablesToInit, outNodeIDVectorPos);
}
};

class CreateNode : public PhysicalOperator {
public:
CreateNode(std::vector<std::unique_ptr<CreateNodeInfo>> createNodeInfos,
CreateNode(std::vector<std::unique_ptr<NodeInsertExecutor>> executors,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::CREATE_NODE, std::move(child), id, paramsString},
createNodeInfos{std::move(createNodeInfos)} {}
executors{std::move(executors)} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) override;
bool getNextTuplesInternal(ExecutionContext* context) final;

inline std::unique_ptr<PhysicalOperator> clone() override {
std::vector<std::unique_ptr<CreateNodeInfo>> clonedCreateNodeInfos;
for (auto& createNodeInfo : createNodeInfos) {
clonedCreateNodeInfos.push_back(createNodeInfo->clone());
}
return std::make_unique<CreateNode>(
std::move(clonedCreateNodeInfos), children[0]->clone(), id, paramsString);
}
std::unique_ptr<PhysicalOperator> clone() final;

private:
std::vector<std::unique_ptr<CreateNodeInfo>> createNodeInfos;
std::vector<common::ValueVector*> outValueVectors;
};

struct CreateRelInfo {
storage::RelTable* table;
DataPos srcNodePos;
common::table_id_t srcNodeTableID;
DataPos dstNodePos;
common::table_id_t dstNodeTableID;
std::vector<std::unique_ptr<evaluator::BaseExpressionEvaluator>> evaluators;
uint32_t relIDEvaluatorIdx;

CreateRelInfo(storage::RelTable* table, const DataPos& srcNodePos,
common::table_id_t srcNodeTableID, const DataPos& dstNodePos,
common::table_id_t dstNodeTableID,
std::vector<std::unique_ptr<evaluator::BaseExpressionEvaluator>> evaluators,
uint32_t relIDEvaluatorIdx)
: table{table}, srcNodePos{srcNodePos}, srcNodeTableID{srcNodeTableID},
dstNodePos{dstNodePos}, dstNodeTableID{dstNodeTableID}, evaluators{std::move(evaluators)},
relIDEvaluatorIdx{relIDEvaluatorIdx} {}

std::unique_ptr<CreateRelInfo> clone() {
std::vector<std::unique_ptr<evaluator::BaseExpressionEvaluator>> clonedEvaluators;
for (auto& evaluator : evaluators) {
clonedEvaluators.push_back(evaluator->clone());
}
return make_unique<CreateRelInfo>(table, srcNodePos, srcNodeTableID, dstNodePos,
dstNodeTableID, std::move(clonedEvaluators), relIDEvaluatorIdx);
}
std::vector<std::unique_ptr<NodeInsertExecutor>> executors;
};

class CreateRel : public PhysicalOperator {
public:
CreateRel(storage::RelsStatistics& relsStatistics,
std::vector<std::unique_ptr<CreateRelInfo>> createRelInfos,
CreateRel(std::vector<std::unique_ptr<RelInsertExecutor>> executors,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::CREATE_REL, std::move(child), id, paramsString},
relsStatistics{relsStatistics}, createRelInfos{std::move(createRelInfos)} {}
executors{std::move(executors)} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

bool getNextTuplesInternal(ExecutionContext* context) override;
bool getNextTuplesInternal(ExecutionContext* context) final;

inline std::unique_ptr<PhysicalOperator> clone() override {
std::vector<std::unique_ptr<CreateRelInfo>> clonedCreateRelInfos;
for (auto& createRelInfo : createRelInfos) {
clonedCreateRelInfos.push_back(createRelInfo->clone());
}
return make_unique<CreateRel>(relsStatistics, std::move(clonedCreateRelInfos),
children[0]->clone(), id, paramsString);
}

private:
struct CreateRelVectors {
common::ValueVector* srcNodeIDVector;
common::ValueVector* dstNodeIDVector;
std::vector<common::ValueVector*> propertyVectors;
};
std::unique_ptr<PhysicalOperator> clone() final;

private:
storage::RelsStatistics& relsStatistics;
std::vector<std::unique_ptr<CreateRelInfo>> createRelInfos;
std::vector<std::unique_ptr<CreateRelVectors>> createRelVectorsPerRel;
std::vector<std::unique_ptr<RelInsertExecutor>> executors;
};

} // namespace processor
Expand Down
Loading

0 comments on commit 42d4bc7

Please sign in to comment.