Skip to content

Commit

Permalink
Merge pull request #1910 from kuzudb/dml-insert
Browse files Browse the repository at this point in the history
Rework node table insert interface
  • Loading branch information
ray6080 committed Aug 10, 2023
2 parents 1f1da48 + 2cf6cc2 commit c8708f6
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 161 deletions.
5 changes: 2 additions & 3 deletions src/binder/bind/bind_updating_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,8 @@ std::unique_ptr<BoundCreateInfo> Binder::bindCreateNodeInfo(
throw BinderException("Create node " + node->toString() + " expects primary key " +
primaryKey->getName() + " as input.");
}
auto extraInfo = std::make_unique<ExtraCreateNodeInfo>(std::move(primaryKeyExpression));
return std::make_unique<BoundCreateInfo>(
UpdateTableType::NODE, std::move(node), std::move(setItems), std::move(extraInfo));
UpdateTableType::NODE, std::move(node), std::move(setItems));
}

std::unique_ptr<BoundCreateInfo> Binder::bindCreateRelInfo(
Expand Down Expand Up @@ -180,7 +179,7 @@ std::unique_ptr<BoundCreateInfo> Binder::bindCreateRelInfo(
}
}
return std::make_unique<BoundCreateInfo>(
UpdateTableType::REL, std::move(rel), std::move(setItems), nullptr /* extraInfo */);
UpdateTableType::REL, std::move(rel), std::move(setItems));
}

std::unique_ptr<BoundUpdatingClause> Binder::bindSetClause(const UpdatingClause& updatingClause) {
Expand Down
30 changes: 4 additions & 26 deletions src/include/binder/query/updating_clause/bound_create_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,18 @@
namespace kuzu {
namespace binder {

struct ExtraCreateInfo {
virtual ~ExtraCreateInfo() = default;
virtual std::unique_ptr<ExtraCreateInfo> copy() const = 0;
};

struct ExtraCreateNodeInfo : public ExtraCreateInfo {
std::shared_ptr<Expression> primaryKey;

explicit ExtraCreateNodeInfo(std::shared_ptr<Expression> primaryKey)
: primaryKey{std::move(primaryKey)} {}
ExtraCreateNodeInfo(const ExtraCreateNodeInfo& other) : primaryKey{other.primaryKey} {}

inline std::unique_ptr<ExtraCreateInfo> copy() const final {
return std::make_unique<ExtraCreateNodeInfo>(*this);
}
};

struct BoundCreateInfo {
UpdateTableType updateTableType;
std::shared_ptr<Expression> nodeOrRel;
std::vector<expression_pair> setItems;
std::unique_ptr<ExtraCreateInfo> extraInfo;

BoundCreateInfo(UpdateTableType updateTableType, std::shared_ptr<Expression> nodeOrRel,
std::vector<expression_pair> setItems, std::unique_ptr<ExtraCreateInfo> extraInfo)
: updateTableType{updateTableType}, nodeOrRel{std::move(nodeOrRel)},
setItems{std::move(setItems)}, extraInfo{std::move(extraInfo)} {}
std::vector<expression_pair> setItems)
: updateTableType{updateTableType}, nodeOrRel{std::move(nodeOrRel)}, setItems{std::move(
setItems)} {}
BoundCreateInfo(const BoundCreateInfo& other)
: updateTableType{other.updateTableType}, nodeOrRel{other.nodeOrRel}, setItems{
other.setItems} {
if (other.extraInfo) {
extraInfo = other.extraInfo->copy();
}
}
other.setItems} {}

inline std::unique_ptr<BoundCreateInfo> copy() {
return std::make_unique<BoundCreateInfo>(*this);
Expand Down
13 changes: 6 additions & 7 deletions src/include/planner/logical_plan/persistent/logical_create.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,16 @@ namespace planner {

struct LogicalCreateNodeInfo {
std::shared_ptr<binder::NodeExpression> node;
std::shared_ptr<binder::Expression> primaryKey;
std::vector<binder::expression_pair> setItems;
binder::expression_vector propertiesToReturn;

LogicalCreateNodeInfo(std::shared_ptr<binder::NodeExpression> node,
std::shared_ptr<binder::Expression> primaryKey,
binder::expression_vector propertiesToReturn)
: node{std::move(node)}, primaryKey{std::move(primaryKey)}, propertiesToReturn{std::move(
propertiesToReturn)} {}
std::vector<binder::expression_pair> setItems, binder::expression_vector propertiesToReturn)
: node{std::move(node)}, setItems{std::move(setItems)}, propertiesToReturn{std::move(
propertiesToReturn)} {}
LogicalCreateNodeInfo(const LogicalCreateNodeInfo& other)
: node{other.node}, primaryKey{other.primaryKey}, propertiesToReturn{
other.propertiesToReturn} {}
: node{other.node}, setItems{other.setItems}, propertiesToReturn{other.propertiesToReturn} {
}

inline std::unique_ptr<LogicalCreateNodeInfo> copy() const {
return std::make_unique<LogicalCreateNodeInfo>(*this);
Expand Down
43 changes: 25 additions & 18 deletions src/include/processor/operator/persistent/insert_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ namespace processor {
// TODO(Guodong): the following class should be moved to storage.
class NodeInsertExecutor {
public:
NodeInsertExecutor(storage::NodeTable* table,
std::unique_ptr<evaluator::ExpressionEvaluator> primaryKeyEvaluator,
std::vector<storage::RelTable*> relTablesToInit, const DataPos& outNodeIDVectorPos)
: table{table}, primaryKeyEvaluator{std::move(primaryKeyEvaluator)},
relTablesToInit{std::move(relTablesToInit)}, outNodeIDVectorPos{outNodeIDVectorPos} {}
NodeInsertExecutor(storage::NodeTable* table, std::vector<storage::RelTable*> relTablesToInit,
const DataPos& nodeIDVectorPos, std::vector<DataPos> propertyLhsPositions,
std::vector<std::unique_ptr<evaluator::ExpressionEvaluator>> propertyRhsEvaluators,
std::unordered_map<common::property_id_t, common::vector_idx_t> propertyIDToVectorIdx)
: table{table}, relTablesToInit{std::move(relTablesToInit)},
nodeIDVectorPos{nodeIDVectorPos}, propertyLhsPositions{std::move(propertyLhsPositions)},
propertyRhsEvaluators{std::move(propertyRhsEvaluators)}, propertyIDToVectorIdx{std::move(
propertyIDToVectorIdx)} {}
NodeInsertExecutor(const NodeInsertExecutor& other);

void init(ResultSet* resultSet, ExecutionContext* context);
Expand All @@ -30,23 +33,27 @@ class NodeInsertExecutor {

private:
storage::NodeTable* table;
std::unique_ptr<evaluator::ExpressionEvaluator> primaryKeyEvaluator;
std::vector<storage::RelTable*> relTablesToInit;
DataPos outNodeIDVectorPos;

common::ValueVector* primaryKeyVector = nullptr;
common::ValueVector* outNodeIDVector = nullptr;
DataPos nodeIDVectorPos;
std::vector<DataPos> propertyLhsPositions;
std::vector<std::unique_ptr<evaluator::ExpressionEvaluator>> propertyRhsEvaluators;
// TODO(Guodong): remove this.
std::unordered_map<common::property_id_t, common::vector_idx_t> propertyIDToVectorIdx;

common::ValueVector* nodeIDVector;
std::vector<common::ValueVector*> propertyLhsVectors;
std::vector<common::ValueVector*> propertyRhsVectors;
};

class RelInsertExecutor {
public:
RelInsertExecutor(storage::RelsStatistics& relsStatistics, storage::RelTable* table,
const DataPos& srcNodePos, const DataPos& dstNodePos,
std::vector<DataPos> lhsVectorPositions,
std::vector<std::unique_ptr<evaluator::ExpressionEvaluator>> evaluators)
std::vector<DataPos> propertyLhsPositions,
std::vector<std::unique_ptr<evaluator::ExpressionEvaluator>> propertyRhsEvaluators)
: relsStatistics{relsStatistics}, table{table}, srcNodePos{srcNodePos},
dstNodePos{dstNodePos}, lhsVectorPositions{std::move(lhsVectorPositions)},
evaluators{std::move(evaluators)} {}
dstNodePos{dstNodePos}, propertyLhsPositions{std::move(propertyLhsPositions)},
propertyRhsEvaluators{std::move(propertyRhsEvaluators)} {}
RelInsertExecutor(const RelInsertExecutor& other);

void init(ResultSet* resultSet, ExecutionContext* context);
Expand All @@ -65,13 +72,13 @@ class RelInsertExecutor {
storage::RelTable* table;
DataPos srcNodePos;
DataPos dstNodePos;
std::vector<DataPos> lhsVectorPositions;
std::vector<std::unique_ptr<evaluator::ExpressionEvaluator>> evaluators;
std::vector<DataPos> propertyLhsPositions;
std::vector<std::unique_ptr<evaluator::ExpressionEvaluator>> propertyRhsEvaluators;

common::ValueVector* srcNodeIDVector = nullptr;
common::ValueVector* dstNodeIDVector = nullptr;
std::vector<common::ValueVector*> lhsVectors;
std::vector<common::ValueVector*> rhsVectors;
std::vector<common::ValueVector*> propertyLhsVectors;
std::vector<common::ValueVector*> propertyRhsVectors;
};

} // namespace processor
Expand Down
10 changes: 7 additions & 3 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ class NodeTable {
void read(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIds,
const std::vector<common::ValueVector*>& outputVectors);
common::offset_t insert(transaction::Transaction* transaction, common::ValueVector* primaryKey);
void insert(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors,
const std::unordered_map<common::property_id_t, common::vector_idx_t>&
propertyIDToVectorIdx);

void update(common::property_id_t propertyID, common::ValueVector* nodeIDVector,
common::ValueVector* vectorToWriteFrom);
void delete_(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
Expand Down Expand Up @@ -82,14 +86,14 @@ class NodeTable {
const std::vector<common::column_id_t>& columnIds,
const std::vector<common::ValueVector*>& outputVectors);

void setPropertiesToNull(common::offset_t offset);
void insertPK(common::offset_t offset, common::ValueVector* primaryKeyVector);
void insertPK(common::ValueVector* nodeIDVector, common::ValueVector* primaryKeyVector);
inline uint64_t getNumNodeGroups(transaction::Transaction* transaction) const {
return propertyColumns.begin()->second->getNumNodeGroups(transaction);
}

private:
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs;
// TODO(Guodong): use vector here
std::map<common::property_id_t, std::unique_ptr<NodeColumn>> propertyColumns;
BMFileHandle* dataFH;
BMFileHandle* metadataFH;
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class RelTable {
common::ValueVector* relIDVector);
void updateRel(common::ValueVector* srcNodeIDVector, common::ValueVector* dstNodeIDVector,
common::ValueVector* relIDVector, common::ValueVector* propertyVector, uint32_t propertyID);
void initEmptyRelsForNewNode(common::nodeID_t& nodeID);
void initEmptyRelsForNewNode(common::ValueVector* nodeIDVector);
void batchInitEmptyRelsForNewNodes(common::table_id_t relTableID, uint64_t numNodesInTable);
void addProperty(const catalog::Property& property, catalog::RelTableSchema& relTableSchema);

Expand Down
8 changes: 4 additions & 4 deletions src/optimizer/projection_push_down_optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ void ProjectionPushDownOptimizer::visitUnwind(planner::LogicalOperator* op) {
void ProjectionPushDownOptimizer::visitCreateNode(planner::LogicalOperator* op) {
auto createNode = (LogicalCreateNode*)op;
for (auto& info : createNode->getInfosRef()) {
if (info->primaryKey != nullptr) {
collectExpressionsInUse(info->primaryKey);
for (auto& setItem : info->setItems) {
collectExpressionsInUse(setItem.second);
}
}
}
Expand Down Expand Up @@ -202,8 +202,8 @@ void ProjectionPushDownOptimizer::visitMerge(planner::LogicalOperator* op) {
auto merge = (LogicalMerge*)op;
collectExpressionsInUse(merge->getMark());
for (auto& info : merge->getCreateNodeInfosRef()) {
if (info->primaryKey != nullptr) {
collectExpressionsInUse(info->primaryKey);
for (auto& setItem : info->setItems) {
collectExpressionsInUse(setItem.second);
}
}
for (auto& info : merge->getCreateRelInfosRef()) {
Expand Down
10 changes: 1 addition & 9 deletions src/planner/plan/append_create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ std::unique_ptr<LogicalCreateNodeInfo> QueryPlanner::createLogicalCreateNodeInfo
BoundCreateInfo* boundCreateInfo) {
auto node = std::static_pointer_cast<NodeExpression>(boundCreateInfo->nodeOrRel);
auto propertiesToReturn = getProperties(*node);
auto extraInfo = (ExtraCreateNodeInfo*)boundCreateInfo->extraInfo.get();
return std::make_unique<LogicalCreateNodeInfo>(
node, extraInfo->primaryKey, std::move(propertiesToReturn));
node, boundCreateInfo->setItems, std::move(propertiesToReturn));
}

std::unique_ptr<LogicalCreateRelInfo> QueryPlanner::createLogicalCreateRelInfo(
Expand Down Expand Up @@ -51,13 +50,6 @@ void QueryPlanner::appendCreateNode(
createNode->setChild(0, plan.getLastOperator());
createNode->computeFactorizedSchema();
plan.setLastOperator(createNode);
// Apply SET after CREATE
auto boundSetNodePropertyInfos = createLogicalSetPropertyInfo(boundCreateInfos);
std::vector<BoundSetPropertyInfo*> setInfoPtrs;
for (auto& setInfo : boundSetNodePropertyInfos) {
setInfoPtrs.push_back(setInfo.get());
}
appendSetNodeProperty(setInfoPtrs, plan);
}

void QueryPlanner::appendCreateRel(
Expand Down
37 changes: 25 additions & 12 deletions src/processor/map/map_create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,44 @@ using namespace kuzu::catalog;
namespace kuzu {
namespace processor {

static std::vector<DataPos> populateLhsVectorPositions(
const std::vector<binder::expression_pair>& setItems, const Schema& outSchema) {
std::vector<DataPos> result;
for (auto& [lhs, rhs] : setItems) {
if (outSchema.isExpressionInScope(*lhs)) {
result.emplace_back(outSchema.getExpressionPos(*lhs));
} else {
result.emplace_back(INVALID_DATA_CHUNK_POS, INVALID_DATA_CHUNK_POS);
}
}
return result;
}

std::unique_ptr<NodeInsertExecutor> PlanMapper::getNodeInsertExecutor(
storage::NodesStore* nodesStore, storage::RelsStore* relsStore,
planner::LogicalCreateNodeInfo* info, const planner::Schema& inSchema,
const planner::Schema& outSchema) {
auto node = info->node;
auto nodeTableID = node->getSingleTableID();
auto table = nodesStore->getNodeTable(nodeTableID);
std::unique_ptr<ExpressionEvaluator> evaluator = nullptr;
if (info->primaryKey != nullptr) {
evaluator = expressionMapper.mapExpression(info->primaryKey, inSchema);
}
std::vector<RelTable*> relTablesToInit;
for (auto& schema : catalog->getReadOnlyVersion()->getRelTableSchemas()) {
if (schema->isSrcOrDstTable(nodeTableID)) {
relTablesToInit.push_back(relsStore->getRelTable(schema->tableID));
}
}
auto nodeIDPos = DataPos(outSchema.getExpressionPos(*node->getInternalIDProperty()));
return std::make_unique<NodeInsertExecutor>(
table, std::move(evaluator), std::move(relTablesToInit), nodeIDPos);
std::vector<DataPos> lhsVectorPositions = populateLhsVectorPositions(info->setItems, outSchema);
std::vector<std::unique_ptr<ExpressionEvaluator>> evaluators;
std::unordered_map<common::property_id_t, common::vector_idx_t> propertyIDToVectorIdx;
for (auto i = 0u; i < info->setItems.size(); ++i) {
auto& [lhs, rhs] = info->setItems[i];
auto propertyExpression = (binder::PropertyExpression*)lhs.get();
evaluators.push_back(expressionMapper.mapExpression(rhs, inSchema));
propertyIDToVectorIdx.insert({propertyExpression->getPropertyID(nodeTableID), i});
}
return std::make_unique<NodeInsertExecutor>(table, std::move(relTablesToInit), nodeIDPos,
std::move(lhsVectorPositions), std::move(evaluators), std::move(propertyIDToVectorIdx));
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapCreateNode(LogicalOperator* logicalOperator) {
Expand All @@ -57,14 +75,9 @@ std::unique_ptr<RelInsertExecutor> PlanMapper::getRelInsertExecutor(storage::Rel
auto dstNode = rel->getDstNode();
auto srcNodePos = DataPos(inSchema.getExpressionPos(*srcNode->getInternalIDProperty()));
auto dstNodePos = DataPos(inSchema.getExpressionPos(*dstNode->getInternalIDProperty()));
std::vector<DataPos> lhsVectorPositions;
auto lhsVectorPositions = populateLhsVectorPositions(info->setItems, outSchema);
std::vector<std::unique_ptr<ExpressionEvaluator>> evaluators;
for (auto& [lhs, rhs] : info->setItems) {
if (outSchema.isExpressionInScope(*lhs)) {
lhsVectorPositions.emplace_back(outSchema.getExpressionPos(*lhs));
} else {
lhsVectorPositions.emplace_back(INVALID_DATA_CHUNK_POS, INVALID_DATA_CHUNK_POS);
}
evaluators.push_back(expressionMapper.mapExpression(rhs, inSchema));
}
return std::make_unique<RelInsertExecutor>(relsStore->getRelsStatistics(), table, srcNodePos,
Expand Down
Loading

0 comments on commit c8708f6

Please sign in to comment.