Skip to content

Commit

Permalink
Merge pull request #1174 from kuzudb/issue-1073
Browse files Browse the repository at this point in the history
Issue 1073
  • Loading branch information
andyfengHKU committed Jan 12, 2023
2 parents a6254ff + bff7607 commit 53aa895
Show file tree
Hide file tree
Showing 21 changed files with 2,282 additions and 2,366 deletions.
4 changes: 1 addition & 3 deletions src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ grammar Cypher;
}

oC_Cypher
: SP ? oC_AnyCypherOption? SP? oC_Statement ( SP? ';' )? SP? EOF
| SP ? kU_DDL ( SP? ';' )? SP? EOF
| SP ? kU_CopyCSV ( SP? ';' )? SP? EOF ;
: SP ? oC_AnyCypherOption? SP? ( oC_Statement | kU_DDL | kU_CopyCSV ) ( SP? ';' )? SP? EOF ;

kU_CopyCSV
: COPY SP oC_SchemaName SP FROM SP StringLiteral ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;
Expand Down
3 changes: 0 additions & 3 deletions src/include/main/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,6 @@ class Connection {

unique_ptr<QueryResult> queryResultWithError(std::string& errMsg);

void setQuerySummaryAndPreparedStatement(
Statement* statement, Binder& binder, PreparedStatement* preparedStatement);

std::unique_ptr<PreparedStatement> prepareNoLock(
const std::string& query, bool enumerateAllPlans = false);

Expand Down
8 changes: 7 additions & 1 deletion src/include/main/prepared_statement.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@ class PreparedStatement {
friend class kuzu::transaction::TinySnbCopyCSVTransactionTest;

public:
inline bool allowActiveTransaction() const {
return !StatementTypeUtils::isDDLOrCopyCSV(statementType);
}

inline bool isSuccess() const { return success; }

inline string getErrorMessage() const { return errMsg; }

inline bool isReadOnly() const { return readOnly; }

inline expression_vector getExpressionsToCollect() {
return statementResult->getExpressionsToCollect();
}

private:
StatementType statementType;
bool allowActiveTransaction = false;
bool success = true;
bool readOnly = false;
string errMsg;
Expand Down
11 changes: 0 additions & 11 deletions src/include/parser/query/regular_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,9 @@ class RegularQuery : public Statement {

inline vector<bool> getIsUnionAll() const { return isUnionAll; }

inline void setEnableExplain(bool option) { enable_explain = option; }

inline bool isEnableExplain() const { return enable_explain; }

inline void setEnableProfile(bool option) { enable_profile = option; }

inline bool isEnableProfile() const { return enable_profile; }

private:
vector<unique_ptr<SingleQuery>> singleQueries;
vector<bool> isUnionAll;
// If explain is enabled, we do not execute query but return physical plan only.
bool enable_explain = false;
bool enable_profile = false;
};

} // namespace parser
Expand Down
9 changes: 9 additions & 0 deletions src/include/parser/statement.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,17 @@ class Statement {

inline StatementType getStatementType() const { return statementType; }

inline void enableExplain() { explain = true; }
inline bool isExplain() const { return explain; }

inline void enableProfile() { profile = true; }
inline bool isProfile() const { return profile; }

private:
StatementType statementType;
// If explain is enabled, we do not execute query but return physical plan only.
bool explain = false;
bool profile = false;
};

} // namespace parser
Expand Down
6 changes: 2 additions & 4 deletions src/include/parser/transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ class Transformer {
public:
explicit Transformer(CypherParser::OC_CypherContext& root) : root{root} {}

unique_ptr<RegularQuery> transformQuery();

unique_ptr<Statement> transform();

private:
Expand Down Expand Up @@ -200,7 +198,7 @@ class Transformer {

string transformSymbolicName(CypherParser::OC_SymbolicNameContext& ctx);

unique_ptr<Statement> transformDDL();
unique_ptr<Statement> transformDDL(CypherParser::KU_DDLContext& ctx);

unique_ptr<Statement> transformCreateNodeClause(CypherParser::KU_CreateNodeContext& ctx);

Expand All @@ -224,7 +222,7 @@ class Transformer {

vector<string> transformNodeLabels(CypherParser::KU_NodeLabelsContext& ctx);

unique_ptr<Statement> transformCopyCSV();
unique_ptr<Statement> transformCopyCSV(CypherParser::KU_CopyCSVContext& ctx);

unordered_map<string, unique_ptr<ParsedExpression>> transformParsingOptions(
CypherParser::KU_ParsingOptionsContext& ctx);
Expand Down
21 changes: 9 additions & 12 deletions src/include/processor/operator/copy_csv/copy_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,21 @@ class CopyCSV : public PhysicalOperator {

inline bool isSource() const override { return true; }

virtual string execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext) = 0;
string execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext);

bool getNextTuplesInternal() override {
throw InternalException("getNextTupleInternal() should not be called on CopyCSV operator.");
}

protected:
void errorIfTableIsNonEmpty(TablesStatistics* tablesStatistics) {
auto numTuples = tablesStatistics->getReadOnlyVersion()
->tableStatisticPerTable.at(tableID)
->getNumTuples();
if (numTuples > 0) {
auto tableName = catalog->getReadOnlyVersion()->getTableSchema(tableID)->tableName;
throw CopyCSVException(
"COPY CSV commands can be executed only on completely empty tables. Table: " +
tableName + " has " + to_string(numTuples) + " many tuples.");
}
}
void errorIfTableIsNonEmpty();

std::string getOutputMsg(uint64_t numTuplesCopied);

virtual uint64_t executeInternal(
TaskScheduler* taskScheduler, ExecutionContext* executionContext) = 0;

virtual uint64_t getNumTuplesInTable() = 0;

protected:
Catalog* catalog;
Expand Down
16 changes: 10 additions & 6 deletions src/include/processor/operator/copy_csv/copy_node_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@ namespace processor {
class CopyNodeCSV : public CopyCSV {
public:
CopyNodeCSV(Catalog* catalog, CSVDescription csvDescription, table_id_t tableID, WAL* wal,
uint32_t id, const string& paramsString, NodesStore* nodesStore)
NodesStatisticsAndDeletedIDs* nodesStatistics, uint32_t id, const string& paramsString)
: CopyCSV{PhysicalOperatorType::COPY_NODE_CSV, catalog, std::move(csvDescription), tableID,
wal, id, paramsString},
nodesStore{nodesStore} {}

string execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;
nodesStatistics{nodesStatistics} {}

unique_ptr<PhysicalOperator> clone() override {
return make_unique<CopyNodeCSV>(
catalog, csvDescription, tableID, wal, id, paramsString, nodesStore);
catalog, csvDescription, tableID, wal, nodesStatistics, id, paramsString);
}

protected:
uint64_t executeInternal(
common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;

uint64_t getNumTuplesInTable() override;

private:
NodesStore* nodesStore;
NodesStatisticsAndDeletedIDs* nodesStatistics;
};

} // namespace processor
Expand Down
21 changes: 12 additions & 9 deletions src/include/processor/operator/copy_csv/copy_rel_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,25 @@ namespace processor {
class CopyRelCSV : public CopyCSV {
public:
CopyRelCSV(Catalog* catalog, CSVDescription csvDescription, table_id_t tableID, WAL* wal,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs, uint32_t id,
const string& paramsString, RelsStatistics* relsStatistics)
NodesStatisticsAndDeletedIDs* nodesStatistics, RelsStatistics* relsStatistics, uint32_t id,
const string& paramsString)
: CopyCSV{PhysicalOperatorType::COPY_REL_CSV, catalog, std::move(csvDescription), tableID,
wal, id, paramsString},
nodesStatisticsAndDeletedIDs{nodesStatisticsAndDeletedIDs}, relsStatistics{
relsStatistics} {}

string execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;
nodesStatistics{nodesStatistics}, relsStatistics{relsStatistics} {}

unique_ptr<PhysicalOperator> clone() override {
return make_unique<CopyRelCSV>(catalog, csvDescription, tableID, wal,
nodesStatisticsAndDeletedIDs, id, paramsString, relsStatistics);
return make_unique<CopyRelCSV>(catalog, csvDescription, tableID, wal, nodesStatistics,
relsStatistics, id, paramsString);
}

protected:
uint64_t executeInternal(
TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;

uint64_t getNumTuplesInTable() override;

private:
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs;
NodesStatisticsAndDeletedIDs* nodesStatistics;
RelsStatistics* relsStatistics;
};

Expand Down
28 changes: 4 additions & 24 deletions src/main/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,6 @@ unique_ptr<QueryResult> Connection::queryResultWithError(std::string& errMsg) {
return queryResult;
}

void Connection::setQuerySummaryAndPreparedStatement(
Statement* statement, Binder& binder, PreparedStatement* preparedStatement) {
switch (statement->getStatementType()) {
case StatementType::QUERY: {
auto parsedQuery = (RegularQuery*)statement;
preparedStatement->preparedSummary.isExplain = parsedQuery->isEnableExplain();
preparedStatement->preparedSummary.isProfile = parsedQuery->isEnableProfile();
preparedStatement->parameterMap = binder.getParameterMap();
preparedStatement->allowActiveTransaction = true;
} break;
case StatementType::COPY_CSV:
case StatementType::CREATE_REL_CLAUSE:
case StatementType::CREATE_NODE_CLAUSE:
case StatementType::DROP_PROPERTY:
case StatementType::DROP_TABLE: {
preparedStatement->allowActiveTransaction = false;
} break;
default:
assert(false);
}
}

std::unique_ptr<PreparedStatement> Connection::prepareNoLock(
const string& query, bool enumerateAllPlans) {
auto preparedStatement = make_unique<PreparedStatement>();
Expand All @@ -79,12 +57,14 @@ std::unique_ptr<PreparedStatement> Connection::prepareNoLock(
try {
// parsing
auto statement = Parser::parseQuery(query);
preparedStatement->preparedSummary.isExplain = statement->isExplain();
preparedStatement->preparedSummary.isProfile = statement->isProfile();
// binding
auto binder = Binder(*database->catalog);
auto boundStatement = binder.bind(*statement);
setQuerySummaryAndPreparedStatement(statement.get(), binder, preparedStatement.get());
preparedStatement->statementType = boundStatement->getStatementType();
preparedStatement->readOnly = boundStatement->isReadOnly();
preparedStatement->parameterMap = binder.getParameterMap();
preparedStatement->statementResult = boundStatement->getStatementResult()->copy();
// planning
auto& nodeStatistics =
Expand Down Expand Up @@ -285,7 +265,7 @@ void Connection::beginTransactionIfAutoCommit(PreparedStatement* preparedStateme
if (!preparedStatement->isReadOnly() && activeTransaction && activeTransaction->isReadOnly()) {
throw ConnectionException("Can't execute a write query inside a read-only transaction.");
}
if (!preparedStatement->allowActiveTransaction && activeTransaction) {
if (!preparedStatement->allowActiveTransaction() && activeTransaction) {
throw ConnectionException(
"DDL and CopyCSV statements are automatically wrapped in a "
"transaction and committed. As such, they cannot be part of an "
Expand Down
30 changes: 15 additions & 15 deletions src/parser/transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,24 @@ namespace kuzu {
namespace parser {

unique_ptr<Statement> Transformer::transform() {
unique_ptr<Statement> statement;
if (root.oC_Statement()) {
return transformQuery();
statement = transformQuery(*root.oC_Statement()->oC_Query());
} else if (root.kU_DDL()) {
return transformDDL();
statement = transformDDL(*root.kU_DDL());
} else {
return transformCopyCSV();
statement = transformCopyCSV(*root.kU_CopyCSV());
}
}

unique_ptr<RegularQuery> Transformer::transformQuery() {
auto regularQuery = transformQuery(*root.oC_Statement()->oC_Query());
if (root.oC_AnyCypherOption()) {
auto cypherOption = root.oC_AnyCypherOption();
regularQuery->setEnableExplain(cypherOption->oC_Explain() != nullptr);
regularQuery->setEnableProfile(cypherOption->oC_Profile() != nullptr);
if (cypherOption->oC_Explain()) {
statement->enableExplain();
}
if (cypherOption->oC_Profile()) {
statement->enableProfile();
}
}
return regularQuery;
return statement;
}

unique_ptr<RegularQuery> Transformer::transformQuery(CypherParser::OC_QueryContext& ctx) {
Expand Down Expand Up @@ -872,9 +873,9 @@ string Transformer::transformSymbolicName(CypherParser::OC_SymbolicNameContext&
}
}

unique_ptr<Statement> Transformer::transformDDL() {
if (root.kU_DDL()->kU_CreateNode()) {
return transformCreateNodeClause(*root.kU_DDL()->kU_CreateNode());
unique_ptr<Statement> Transformer::transformDDL(CypherParser::KU_DDLContext& ctx) {
if (ctx.kU_CreateNode()) {
return transformCreateNodeClause(*ctx.kU_CreateNode());
} else if (root.kU_DDL()->kU_CreateRel()) {
return transformCreateRelClause(*root.kU_DDL()->kU_CreateRel());
} else if (root.kU_DDL()->kU_DropTable()) {
Expand Down Expand Up @@ -972,8 +973,7 @@ vector<string> Transformer::transformNodeLabels(CypherParser::KU_NodeLabelsConte
return nodeLabels;
}

unique_ptr<Statement> Transformer::transformCopyCSV() {
auto& ctx = *root.kU_CopyCSV();
unique_ptr<Statement> Transformer::transformCopyCSV(CypherParser::KU_CopyCSVContext& ctx) {
auto csvFileName = transformStringLiteral(*ctx.StringLiteral());
auto tableName = transformSchemaName(*ctx.oC_SchemaName());
auto parsingOptions = ctx.kU_ParsingOptions() ?
Expand Down
12 changes: 6 additions & 6 deletions src/processor/mapper/map_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyCSVToPhysical(
LogicalOperator* logicalOperator) {
auto copyCSV = (LogicalCopyCSV*)logicalOperator;
auto tableName = catalog->getReadOnlyVersion()->getTableName(copyCSV->getTableID());
auto nodesStatistics = &storageManager.getNodesStore().getNodesStatisticsAndDeletedIDs();
auto relsStatistics = &storageManager.getRelsStore().getRelsStatistics();
if (catalog->getReadOnlyVersion()->containNodeTable(tableName)) {
return make_unique<CopyNodeCSV>(catalog, copyCSV->getCSVDescription(),
copyCSV->getTableID(), storageManager.getWAL(), getOperatorID(),
copyCSV->getExpressionsForPrinting(), &storageManager.getNodesStore());
copyCSV->getTableID(), storageManager.getWAL(), nodesStatistics, getOperatorID(),
copyCSV->getExpressionsForPrinting());
} else {
return make_unique<CopyRelCSV>(catalog, copyCSV->getCSVDescription(), copyCSV->getTableID(),
storageManager.getWAL(),
&storageManager.getNodesStore().getNodesStatisticsAndDeletedIDs(), getOperatorID(),
copyCSV->getExpressionsForPrinting(),
&storageManager.getRelsStore().getRelsStatistics());
storageManager.getWAL(), nodesStatistics, relsStatistics, getOperatorID(),
copyCSV->getExpressionsForPrinting());
}
}

Expand Down
1 change: 1 addition & 0 deletions src/processor/operator/copy_csv/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
add_library(kuzu_processor_operator_copy_csv
OBJECT
copy_csv.cpp
copy_node_csv.cpp
copy_rel_csv.cpp)

Expand Down
32 changes: 32 additions & 0 deletions src/processor/operator/copy_csv/copy_csv.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#include "processor/operator/copy_csv/copy_csv.h"

namespace kuzu {
namespace processor {

string CopyCSV::execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext) {
registerProfilingMetrics(executionContext->profiler);
metrics->executionTime.start();
errorIfTableIsNonEmpty();
auto numTuplesCopied = executeInternal(taskScheduler, executionContext);
metrics->executionTime.stop();
metrics->numOutputTuple.increase(numTuplesCopied);
return getOutputMsg(numTuplesCopied);
}

void CopyCSV::errorIfTableIsNonEmpty() {
auto numTuples = getNumTuplesInTable();
if (numTuples > 0) {
auto tableName = catalog->getReadOnlyVersion()->getTableSchema(tableID)->tableName;
throw CopyCSVException(
"COPY CSV commands can be executed only on completely empty tables. Table: " +
tableName + " has " + to_string(numTuples) + " many tuples.");
}
}

std::string CopyCSV::getOutputMsg(uint64_t numTuplesCopied) {
return StringUtils::string_format("%d number of tuples has been copied to table: %s.",
numTuplesCopied, catalog->getReadOnlyVersion()->getTableName(tableID).c_str());
}

} // namespace processor
} // namespace kuzu
Loading

0 comments on commit 53aa895

Please sign in to comment.