Skip to content

Commit

Permalink
migrate ddl to task schedular framework
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Jan 11, 2023
1 parent 5754eaf commit 5a4d0c6
Show file tree
Hide file tree
Showing 27 changed files with 182 additions and 93 deletions.
4 changes: 2 additions & 2 deletions src/binder/bind/bind_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ unique_ptr<BoundStatement> Binder::bindCreateNodeClause(const Statement& stateme
auto primaryKeyIdx = bindPrimaryKey(
createNodeClause.getPKColName(), createNodeClause.getPropertyNameDataTypes());
return make_unique<BoundCreateNodeClause>(
tableName, move(boundPropertyNameDataTypes), primaryKeyIdx);
tableName, std::move(boundPropertyNameDataTypes), primaryKeyIdx);
}

unique_ptr<BoundStatement> Binder::bindCreateRelClause(const Statement& statement) {
Expand All @@ -40,7 +40,7 @@ unique_ptr<BoundStatement> Binder::bindCreateRelClause(const Statement& statemen
srcDstTableIDs.emplace_back(bindNodeTableID(srcTableName), bindNodeTableID(dstTableName));
}
return make_unique<BoundCreateRelClause>(
tableName, move(propertyNameDataTypes), relMultiplicity, srcDstTableIDs);
tableName, std::move(propertyNameDataTypes), relMultiplicity, srcDstTableIDs);
}

unique_ptr<BoundStatement> Binder::bindDropTable(const Statement& statement) {
Expand Down
6 changes: 6 additions & 0 deletions src/include/binder/bound_statement_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ class BoundStatementResult {
return result;
}

inline shared_ptr<Expression> getSingleExpressionToCollect() {
auto expressionsToCollect = getExpressionsToCollect();
assert(expressionsToCollect.size() == 1);
return expressionsToCollect[0];
}

inline unique_ptr<BoundStatementResult> copy() {
return make_unique<BoundStatementResult>(columns, expressionsToCollectPerColumn);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ namespace planner {
class LogicalCreateNodeTable : public LogicalCreateTable {
public:
LogicalCreateNodeTable(string tableName, vector<PropertyNameDataType> propertyNameDataTypes,
uint32_t primaryKeyIdx)
uint32_t primaryKeyIdx, shared_ptr<Expression> outputExpression)
: LogicalCreateTable{LogicalOperatorType::CREATE_NODE_TABLE, std::move(tableName),
std::move(propertyNameDataTypes)},
std::move(propertyNameDataTypes), std::move(outputExpression)},
primaryKeyIdx{primaryKeyIdx} {}

inline uint32_t getPrimaryKeyIdx() const { return primaryKeyIdx; }

inline unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCreateNodeTable>(tableName, propertyNameDataTypes, primaryKeyIdx);
return make_unique<LogicalCreateNodeTable>(
tableName, propertyNameDataTypes, primaryKeyIdx, outputExpression);
}

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ namespace planner {
class LogicalCreateRelTable : public LogicalCreateTable {
public:
LogicalCreateRelTable(string tableName, vector<PropertyNameDataType> propertyNameDataTypes,
RelMultiplicity relMultiplicity, vector<pair<table_id_t, table_id_t>> srcDstTableIDs)
RelMultiplicity relMultiplicity, vector<pair<table_id_t, table_id_t>> srcDstTableIDs,
shared_ptr<Expression> outputExpression)
: LogicalCreateTable{LogicalOperatorType::CREATE_REL_TABLE, std::move(tableName),
std::move(propertyNameDataTypes)},
std::move(propertyNameDataTypes), std::move(outputExpression)},
relMultiplicity{relMultiplicity}, srcDstTableIDs{std::move(srcDstTableIDs)} {}

inline RelMultiplicity getRelMultiplicity() const { return relMultiplicity; }
Expand All @@ -20,7 +21,7 @@ class LogicalCreateRelTable : public LogicalCreateTable {

inline unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCreateRelTable>(
tableName, propertyNameDataTypes, relMultiplicity, srcDstTableIDs);
tableName, propertyNameDataTypes, relMultiplicity, srcDstTableIDs, outputExpression);
}

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ namespace planner {
class LogicalCreateTable : public LogicalDDL {
public:
LogicalCreateTable(LogicalOperatorType operatorType, string tableName,
vector<PropertyNameDataType> propertyNameDataTypes)
: LogicalDDL{operatorType, std::move(tableName)}, propertyNameDataTypes{
std::move(propertyNameDataTypes)} {}
vector<PropertyNameDataType> propertyNameDataTypes, shared_ptr<Expression> outputExpression)
: LogicalDDL{operatorType, std::move(tableName), std::move(outputExpression)},
propertyNameDataTypes{std::move(propertyNameDataTypes)} {}

inline vector<PropertyNameDataType> getPropertyNameDataTypes() const {
return propertyNameDataTypes;
Expand Down
15 changes: 12 additions & 3 deletions src/include/planner/logical_plan/logical_operator/logical_ddl.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,26 @@ namespace planner {

class LogicalDDL : public LogicalOperator {
public:
LogicalDDL(LogicalOperatorType operatorType, string tableName)
: LogicalOperator{operatorType}, tableName{std::move(tableName)} {}
LogicalDDL(
LogicalOperatorType operatorType, string tableName, shared_ptr<Expression> outputExpression)
: LogicalOperator{operatorType}, tableName{std::move(tableName)},
outputExpression{std::move(outputExpression)} {}

inline string getTableName() const { return tableName; }
inline shared_ptr<Expression> getOutputExpression() const { return outputExpression; }

inline string getExpressionsForPrinting() const override { return tableName; }

inline void computeSchema() override { schema = make_unique<Schema>(); }
inline void computeSchema() override {
schema = make_unique<Schema>();
auto groupPos = schema->createGroup();
schema->insertToGroupAndScope(outputExpression, groupPos);
schema->setGroupAsSingleState(groupPos);
}

protected:
string tableName;
shared_ptr<Expression> outputExpression;
};

} // namespace planner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ namespace kuzu {
namespace planner {

class LogicalDropProperty : public LogicalDDL {

public:
explicit LogicalDropProperty(table_id_t tableID, property_id_t propertyID, string tableName)
: LogicalDDL{LogicalOperatorType::DROP_PROPERTY, std::move(tableName)}, tableID{tableID},
propertyID{propertyID} {}
explicit LogicalDropProperty(table_id_t tableID, property_id_t propertyID, string tableName,
shared_ptr<Expression> outputExpression)
: LogicalDDL{LogicalOperatorType::DROP_PROPERTY, std::move(tableName),
std::move(outputExpression)},
tableID{tableID}, propertyID{propertyID} {}

inline table_id_t getTableID() const { return tableID; }

inline property_id_t getPropertyID() const { return propertyID; }

inline unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalDropProperty>(tableID, propertyID, tableName);
return make_unique<LogicalDropProperty>(tableID, propertyID, tableName, outputExpression);
}

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ namespace kuzu {
namespace planner {

class LogicalDropTable : public LogicalDDL {

public:
explicit LogicalDropTable(table_id_t tableID, string tableName)
: LogicalDDL{LogicalOperatorType::DROP_TABLE, std::move(tableName)}, tableID{tableID} {}
explicit LogicalDropTable(
table_id_t tableID, string tableName, shared_ptr<Expression> outputExpression)
: LogicalDDL{LogicalOperatorType::DROP_TABLE, std::move(tableName),
std::move(outputExpression)},
tableID{tableID} {}

inline table_id_t getTableID() const { return tableID; }

inline unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalDropTable>(tableID, tableName);
return make_unique<LogicalDropTable>(tableID, tableName, outputExpression);
}

private:
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/mapper/plan_mapper.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "binder/expression/node_expression.h"
#include "common/statement_type.h"
#include "planner/logical_plan/logical_plan.h"
#include "processor/mapper/expression_mapper.h"
#include "processor/operator/result_collector.h"
Expand Down
18 changes: 10 additions & 8 deletions src/include/processor/operator/ddl/create_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,25 @@ namespace processor {
class CreateNodeTable : public CreateTable {
public:
CreateNodeTable(Catalog* catalog, string tableName,
vector<PropertyNameDataType> propertyNameDataTypes, uint32_t primaryKeyIdx, uint32_t id,
const string& paramsString, NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs)
vector<PropertyNameDataType> propertyNameDataTypes, uint32_t primaryKeyIdx,
const DataPos& outputPos, uint32_t id, const string& paramsString,
NodesStatisticsAndDeletedIDs* nodesStatistics)
: CreateTable{PhysicalOperatorType::CREATE_NODE_TABLE, catalog, std::move(tableName),
std::move(propertyNameDataTypes), id, paramsString},
primaryKeyIdx{primaryKeyIdx}, nodesStatisticsAndDeletedIDs{nodesStatisticsAndDeletedIDs} {
}
std::move(propertyNameDataTypes), outputPos, id, paramsString},
primaryKeyIdx{primaryKeyIdx}, nodesStatistics{nodesStatistics} {}

void executeDDLInternal() override;

string execute() override;
std::string getOutputMsg() override;

unique_ptr<PhysicalOperator> clone() override {
return make_unique<CreateNodeTable>(catalog, tableName, propertyNameDataTypes,
primaryKeyIdx, id, paramsString, nodesStatisticsAndDeletedIDs);
primaryKeyIdx, outputPos, id, paramsString, nodesStatistics);
}

private:
uint32_t primaryKeyIdx;
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs;
NodesStatisticsAndDeletedIDs* nodesStatistics;
};

} // namespace processor
Expand Down
10 changes: 6 additions & 4 deletions src/include/processor/operator/ddl/create_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@ class CreateRelTable : public CreateTable {
public:
CreateRelTable(Catalog* catalog, string tableName,
vector<PropertyNameDataType> propertyNameDataTypes, RelMultiplicity relMultiplicity,
vector<pair<table_id_t, table_id_t>> srcDstTableIDs, uint32_t id,
vector<pair<table_id_t, table_id_t>> srcDstTableIDs, const DataPos& outputPos, uint32_t id,
const string& paramsString, RelsStatistics* relsStatistics)
: CreateTable{PhysicalOperatorType::CREATE_REL_TABLE, catalog, std::move(tableName),
std::move(propertyNameDataTypes), id, paramsString},
std::move(propertyNameDataTypes), outputPos, id, paramsString},
relMultiplicity{relMultiplicity}, srcDstTableIDs{std::move(srcDstTableIDs)},
relsStatistics{relsStatistics} {}

string execute() override;
void executeDDLInternal() override;

std::string getOutputMsg() override;

unique_ptr<PhysicalOperator> clone() override {
return make_unique<CreateRelTable>(catalog, tableName, propertyNameDataTypes,
relMultiplicity, srcDstTableIDs, id, paramsString, relsStatistics);
relMultiplicity, srcDstTableIDs, outputPos, id, paramsString, relsStatistics);
}

private:
Expand Down
11 changes: 8 additions & 3 deletions src/include/processor/operator/ddl/create_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ namespace processor {
class CreateTable : public DDL {
public:
CreateTable(PhysicalOperatorType operatorType, Catalog* catalog, string tableName,
vector<PropertyNameDataType> propertyNameDataTypes, uint32_t id, const string& paramsString)
: DDL{operatorType, catalog, id, paramsString}, tableName{std::move(tableName)},
propertyNameDataTypes{move(propertyNameDataTypes)} {}
vector<PropertyNameDataType> propertyNameDataTypes, const DataPos& outputPos, uint32_t id,
const string& paramsString)
: DDL{operatorType, catalog, outputPos, id, paramsString}, tableName{std::move(tableName)},
propertyNameDataTypes{std::move(propertyNameDataTypes)} {}

virtual ~CreateTable() override = default;

void executeDDLInternal() override = 0;

std::string getOutputMsg() override = 0;

protected:
string tableName;
vector<PropertyNameDataType> propertyNameDataTypes;
Expand Down
19 changes: 13 additions & 6 deletions src/include/processor/operator/ddl/ddl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,28 @@ namespace processor {

class DDL : public PhysicalOperator {
public:
DDL(PhysicalOperatorType operatorType, Catalog* catalog, uint32_t id,
DDL(PhysicalOperatorType operatorType, Catalog* catalog, const DataPos& outputPos, uint32_t id,
const string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, catalog{catalog} {}
: PhysicalOperator{operatorType, id, paramsString}, catalog{catalog}, outputPos{outputPos} {
}
virtual ~DDL() override = default;

inline bool isSource() const override { return true; }

virtual string execute() = 0;
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override {
throw InternalException("getNextTupleInternal() should not be called on DDL operator.");
}
bool getNextTuplesInternal() override;

protected:
virtual std::string getOutputMsg() = 0;
virtual void executeDDLInternal() = 0;

protected:
Catalog* catalog;
DataPos outputPos;
ValueVector* outputVector;

bool hasExecuted = false;
};

} // namespace processor
Expand Down
13 changes: 8 additions & 5 deletions src/include/processor/operator/ddl/drop_property.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@ namespace processor {
class DropProperty : public DDL {
public:
DropProperty(Catalog* catalog, table_id_t tableID, property_id_t propertyID,
StorageManager& storageManager, uint32_t id, const string& paramsString)
: DDL{PhysicalOperatorType::DROP_PROPERTY, catalog, id, paramsString}, tableID{tableID},
propertyID{propertyID}, storageManager{storageManager} {}
StorageManager& storageManager, const DataPos& outputPos, uint32_t id,
const string& paramsString)
: DDL{PhysicalOperatorType::DROP_PROPERTY, catalog, outputPos, id, paramsString},
tableID{tableID}, propertyID{propertyID}, storageManager{storageManager} {}

string execute() override;
void executeDDLInternal() override;

std::string getOutputMsg() override;

unique_ptr<PhysicalOperator> clone() override {
return make_unique<DropProperty>(
catalog, tableID, propertyID, storageManager, id, paramsString);
catalog, tableID, propertyID, storageManager, outputPos, id, paramsString);
}

protected:
Expand Down
22 changes: 10 additions & 12 deletions src/include/processor/operator/ddl/drop_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,18 @@ namespace processor {

class DropTable : public DDL {
public:
DropTable(Catalog* catalog, table_id_t tableID, StorageManager& storageManager, uint32_t id,
const string& paramsString)
: DDL{PhysicalOperatorType::DROP_TABLE, catalog, id, paramsString}, tableID{tableID},
storageManager{storageManager} {}

string execute() override {
auto tableSchema = catalog->getReadOnlyVersion()->getTableSchema(tableID);
catalog->removeTableSchema(tableSchema);
return StringUtils::string_format("%sTable: %s has been dropped.",
tableSchema->isNodeTable ? "Node" : "Rel", tableSchema->tableName.c_str());
}
DropTable(Catalog* catalog, table_id_t tableID, StorageManager& storageManager,
const DataPos& outputPos, uint32_t id, const string& paramsString)
: DDL{PhysicalOperatorType::DROP_TABLE, catalog, outputPos, id, paramsString},
tableID{tableID}, storageManager{storageManager} {}

void executeDDLInternal() override;

std::string getOutputMsg() override;

unique_ptr<PhysicalOperator> clone() override {
return make_unique<DropTable>(catalog, tableID, storageManager, id, paramsString);
return make_unique<DropTable>(
catalog, tableID, storageManager, outputPos, id, paramsString);
}

protected:
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ enum class PhysicalOperatorType : uint8_t {
class PhysicalOperatorUtils {
public:
static std::string operatorTypeToString(PhysicalOperatorType operatorType);
static bool isDDLOperator(PhysicalOperatorType operatorType);
};

struct OperatorMetrics {
Expand Down
13 changes: 6 additions & 7 deletions src/include/processor/physical_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,23 @@

#include <memory>

#include "common/statement_type.h"
#include "processor/operator/physical_operator.h"

namespace kuzu {
namespace processor {

class PhysicalPlan {
public:
explicit PhysicalPlan(
unique_ptr<PhysicalOperator> lastOperator, common::StatementType statementType)
: lastOperator{std::move(lastOperator)}, statementType{statementType} {}
explicit PhysicalPlan(unique_ptr<PhysicalOperator> lastOperator)
: lastOperator{std::move(lastOperator)} {}

inline bool isDDL() const { return StatementTypeUtils::isDDL(statementType); }
inline bool isCopyCSV() const { return StatementTypeUtils::isCopyCSV(statementType); }
inline bool isCopyCSV() const {
return lastOperator->getOperatorType() == PhysicalOperatorType::COPY_NODE_CSV ||
lastOperator->getOperatorType() == PhysicalOperatorType::COPY_REL_CSV;
}

public:
unique_ptr<PhysicalOperator> lastOperator;
common::StatementType statementType;
};

class PhysicalPlanUtil {
Expand Down
Loading

0 comments on commit 5a4d0c6

Please sign in to comment.