Skip to content

Commit

Permalink
clean
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Jan 12, 2023
1 parent c2c8959 commit d2cdd02
Show file tree
Hide file tree
Showing 15 changed files with 107 additions and 60 deletions.
3 changes: 0 additions & 3 deletions src/include/main/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,6 @@ class Connection {

unique_ptr<QueryResult> queryResultWithError(std::string& errMsg);

// void setQuerySummaryAndPreparedStatement(
// Statement* statement, Binder& binder, PreparedStatement* preparedStatement);

std::unique_ptr<PreparedStatement> prepareNoLock(
const std::string& query, bool enumerateAllPlans = false);

Expand Down
9 changes: 5 additions & 4 deletions src/include/main/prepared_statement.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,22 @@ class PreparedStatement {
friend class kuzu::transaction::TinySnbCopyCSVTransactionTest;

public:
inline void setStatementType(StatementType statementType_) {
statementType = statementType_;
allowActiveTransaction = !StatementTypeUtils::isDDLOrCopyCSV(statementType);
inline bool allowActiveTransaction() const {
return !StatementTypeUtils::isDDLOrCopyCSV(statementType);
}

inline bool isSuccess() const { return success; }

inline string getErrorMessage() const { return errMsg; }

inline bool isReadOnly() const { return readOnly; }

inline expression_vector getExpressionsToCollect() {
return statementResult->getExpressionsToCollect();
}

private:
StatementType statementType;
bool allowActiveTransaction = false;
bool success = true;
bool readOnly = false;
string errMsg;
Expand Down
21 changes: 9 additions & 12 deletions src/include/processor/operator/copy_csv/copy_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,21 @@ class CopyCSV : public PhysicalOperator {

inline bool isSource() const override { return true; }

virtual string execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext) = 0;
string execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext);

bool getNextTuplesInternal() override {
throw InternalException("getNextTupleInternal() should not be called on CopyCSV operator.");
}

protected:
void errorIfTableIsNonEmpty(TablesStatistics* tablesStatistics) {
auto numTuples = tablesStatistics->getReadOnlyVersion()
->tableStatisticPerTable.at(tableID)
->getNumTuples();
if (numTuples > 0) {
auto tableName = catalog->getReadOnlyVersion()->getTableSchema(tableID)->tableName;
throw CopyCSVException(
"COPY CSV commands can be executed only on completely empty tables. Table: " +
tableName + " has " + to_string(numTuples) + " many tuples.");
}
}
void errorIfTableIsNonEmpty();

std::string getOutputMsg(uint64_t numTuplesCopied);

virtual uint64_t executeInternal(
TaskScheduler* taskScheduler, ExecutionContext* executionContext) = 0;

virtual uint64_t getNumTuplesInTable() = 0;

protected:
Catalog* catalog;
Expand Down
16 changes: 10 additions & 6 deletions src/include/processor/operator/copy_csv/copy_node_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@ namespace processor {
class CopyNodeCSV : public CopyCSV {
public:
CopyNodeCSV(Catalog* catalog, CSVDescription csvDescription, table_id_t tableID, WAL* wal,
uint32_t id, const string& paramsString, NodesStore* nodesStore)
NodesStatisticsAndDeletedIDs* nodesStatistics, uint32_t id, const string& paramsString)
: CopyCSV{PhysicalOperatorType::COPY_NODE_CSV, catalog, std::move(csvDescription), tableID,
wal, id, paramsString},
nodesStore{nodesStore} {}

string execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;
nodesStatistics{nodesStatistics} {}

unique_ptr<PhysicalOperator> clone() override {
return make_unique<CopyNodeCSV>(
catalog, csvDescription, tableID, wal, id, paramsString, nodesStore);
catalog, csvDescription, tableID, wal, nodesStatistics, id, paramsString);
}

protected:
uint64_t executeInternal(
common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;

uint64_t getNumTuplesInTable() override;

private:
NodesStore* nodesStore;
NodesStatisticsAndDeletedIDs* nodesStatistics;
};

} // namespace processor
Expand Down
21 changes: 12 additions & 9 deletions src/include/processor/operator/copy_csv/copy_rel_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,25 @@ namespace processor {
class CopyRelCSV : public CopyCSV {
public:
CopyRelCSV(Catalog* catalog, CSVDescription csvDescription, table_id_t tableID, WAL* wal,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs, uint32_t id,
const string& paramsString, RelsStatistics* relsStatistics)
NodesStatisticsAndDeletedIDs* nodesStatistics, RelsStatistics* relsStatistics, uint32_t id,
const string& paramsString)
: CopyCSV{PhysicalOperatorType::COPY_REL_CSV, catalog, std::move(csvDescription), tableID,
wal, id, paramsString},
nodesStatisticsAndDeletedIDs{nodesStatisticsAndDeletedIDs}, relsStatistics{
relsStatistics} {}

string execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;
nodesStatistics{nodesStatistics}, relsStatistics{relsStatistics} {}

unique_ptr<PhysicalOperator> clone() override {
return make_unique<CopyRelCSV>(catalog, csvDescription, tableID, wal,
nodesStatisticsAndDeletedIDs, id, paramsString, relsStatistics);
return make_unique<CopyRelCSV>(catalog, csvDescription, tableID, wal, nodesStatistics,
relsStatistics, id, paramsString);
}

protected:
uint64_t executeInternal(
TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;

uint64_t getNumTuplesInTable() override;

private:
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs;
NodesStatisticsAndDeletedIDs* nodesStatistics;
RelsStatistics* relsStatistics;
};

Expand Down
4 changes: 2 additions & 2 deletions src/main/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ std::unique_ptr<PreparedStatement> Connection::prepareNoLock(
// binding
auto binder = Binder(*database->catalog);
auto boundStatement = binder.bind(*statement);
preparedStatement->setStatementType(boundStatement->getStatementType());
preparedStatement->statementType = boundStatement->getStatementType();
preparedStatement->readOnly = boundStatement->isReadOnly();
preparedStatement->parameterMap = binder.getParameterMap();
preparedStatement->statementResult = boundStatement->getStatementResult()->copy();
Expand Down Expand Up @@ -265,7 +265,7 @@ void Connection::beginTransactionIfAutoCommit(PreparedStatement* preparedStateme
if (!preparedStatement->isReadOnly() && activeTransaction && activeTransaction->isReadOnly()) {
throw ConnectionException("Can't execute a write query inside a read-only transaction.");
}
if (!preparedStatement->allowActiveTransaction && activeTransaction) {
if (!preparedStatement->allowActiveTransaction() && activeTransaction) {
throw ConnectionException(
"DDL and CopyCSV statements are automatically wrapped in a "
"transaction and committed. As such, they cannot be part of an "
Expand Down
12 changes: 6 additions & 6 deletions src/processor/mapper/map_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyCSVToPhysical(
LogicalOperator* logicalOperator) {
auto copyCSV = (LogicalCopyCSV*)logicalOperator;
auto tableName = catalog->getReadOnlyVersion()->getTableName(copyCSV->getTableID());
auto nodesStatistics = &storageManager.getNodesStore().getNodesStatisticsAndDeletedIDs();
auto relsStatistics = &storageManager.getRelsStore().getRelsStatistics();
if (catalog->getReadOnlyVersion()->containNodeTable(tableName)) {
return make_unique<CopyNodeCSV>(catalog, copyCSV->getCSVDescription(),
copyCSV->getTableID(), storageManager.getWAL(), getOperatorID(),
copyCSV->getExpressionsForPrinting(), &storageManager.getNodesStore());
copyCSV->getTableID(), storageManager.getWAL(), nodesStatistics, getOperatorID(),
copyCSV->getExpressionsForPrinting());
} else {
return make_unique<CopyRelCSV>(catalog, copyCSV->getCSVDescription(), copyCSV->getTableID(),
storageManager.getWAL(),
&storageManager.getNodesStore().getNodesStatisticsAndDeletedIDs(), getOperatorID(),
copyCSV->getExpressionsForPrinting(),
&storageManager.getRelsStore().getRelsStatistics());
storageManager.getWAL(), nodesStatistics, relsStatistics, getOperatorID(),
copyCSV->getExpressionsForPrinting());
}
}

Expand Down
1 change: 1 addition & 0 deletions src/processor/operator/copy_csv/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
add_library(kuzu_processor_operator_copy_csv
OBJECT
copy_csv.cpp
copy_node_csv.cpp
copy_rel_csv.cpp)

Expand Down
32 changes: 32 additions & 0 deletions src/processor/operator/copy_csv/copy_csv.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#include "processor/operator/copy_csv/copy_csv.h"

namespace kuzu {
namespace processor {

string CopyCSV::execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext) {
registerProfilingMetrics(executionContext->profiler);
metrics->executionTime.start();
errorIfTableIsNonEmpty();
auto numTuplesCopied = executeInternal(taskScheduler, executionContext);
metrics->executionTime.stop();
metrics->numOutputTuple.increase(numTuplesCopied);
return getOutputMsg(numTuplesCopied);
}

void CopyCSV::errorIfTableIsNonEmpty() {
auto numTuples = getNumTuplesInTable();
if (getNumTuplesInTable() > 0) {
auto tableName = catalog->getReadOnlyVersion()->getTableSchema(tableID)->tableName;
throw CopyCSVException(
"COPY CSV commands can be executed only on completely empty tables. Table: " +
tableName + " has " + to_string(numTuples) + " many tuples.");
}
}

std::string CopyCSV::getOutputMsg(uint64_t numTuplesCopied) {
return StringUtils::string_format("%d number of tuples has been copied to table: %s.",
numTuplesCopied, catalog->getReadOnlyVersion()->getTableName(tableID).c_str());
}

} // namespace processor
} // namespace kuzu
21 changes: 14 additions & 7 deletions src/processor/operator/copy_csv/copy_node_csv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@
namespace kuzu {
namespace processor {

string CopyNodeCSV::execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext) {
auto nodeCSVCopier = make_unique<InMemArrowNodeCopier>(csvDescription, wal->getDirectory(),
*taskScheduler, *catalog, tableID, &nodesStore->getNodesStatisticsAndDeletedIDs());
errorIfTableIsNonEmpty(&nodesStore->getNodesStatisticsAndDeletedIDs());
// Note: This copy function will update the maxNodeOffset in nodesStatisticsAndDeletedIDs.
uint64_t CopyNodeCSV::executeInternal(
common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) {
auto nodeCSVCopier = make_unique<InMemArrowNodeCopier>(
csvDescription, wal->getDirectory(), *taskScheduler, *catalog, tableID, nodesStatistics);
auto numNodesCopied = nodeCSVCopier->copy();
wal->logCopyNodeCSVRecord(tableID);
return StringUtils::string_format("%d number of nodes has been copied to nodeTable: %s.",
numNodesCopied, catalog->getReadOnlyVersion()->getTableName(tableID).c_str());
return numNodesCopied;
}

uint64_t CopyNodeCSV::getNumTuplesInTable() {
// TODO(Ziyi): this chains looks weird. Fix when refactoring table statistics. Ditto in
// CopyRelCSV.
return ((TablesStatistics*)nodesStatistics)
->getReadOnlyVersion()
->tableStatisticPerTable[tableID]
->getNumTuples();
}

} // namespace processor
Expand Down
18 changes: 11 additions & 7 deletions src/processor/operator/copy_csv/copy_rel_csv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@
namespace kuzu {
namespace processor {

string CopyRelCSV::execute(TaskScheduler* taskScheduler, ExecutionContext* executionContext) {
uint64_t CopyRelCSV::executeInternal(
kuzu::common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) {
auto relCSVCopier = make_unique<InMemRelCSVCopier>(csvDescription, wal->getDirectory(),
*taskScheduler, *catalog, nodesStatisticsAndDeletedIDs->getMaxNodeOffsetPerTable(),
*taskScheduler, *catalog, nodesStatistics->getMaxNodeOffsetPerTable(),
executionContext->bufferManager, tableID, relsStatistics);
errorIfTableIsNonEmpty(relsStatistics);
// Note: This copy function will update the numRelsPerDirectionBoundTable and numRels
// information in relsStatistics for this relTable.
auto numRelsCopied = relCSVCopier->copy();
wal->logCopyRelCSVRecord(tableID);
return StringUtils::string_format("%d number of rels has been copied to relTable: %s.",
numRelsCopied, catalog->getReadOnlyVersion()->getTableName(tableID).c_str());
return numRelsCopied;
}

uint64_t CopyRelCSV::getNumTuplesInTable() {
return ((TablesStatistics*)relsStatistics)
->getReadOnlyVersion()
->tableStatisticPerTable[tableID]
->getNumTuples();
}

} // namespace processor
Expand Down
1 change: 1 addition & 0 deletions src/processor/operator/ddl/ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ bool DDL::getNextTuplesInternal() {
hasExecuted = true;
executeDDLInternal();
outputVector->setValue<std::string>(0, getOutputMsg());
metrics->numOutputTuple.increase(1);
return true;
}

Expand Down
4 changes: 2 additions & 2 deletions test/runner/e2e_copy_csv_transaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,10 @@ TEST_F(TinySnbCopyCSVTransactionTest, CopyNodeCSVOutputMsg) {
conn->query(createKnowsTableCMD);
auto result = conn->query(copyPersonTableCMD);
ASSERT_EQ(TestHelper::convertResultToString(*result),
vector<string>{"8 number of nodes has been copied to nodeTable: person."});
vector<string>{"8 number of tuples has been copied to table: person."});
result = conn->query(copyKnowsTableCMD);
ASSERT_EQ(TestHelper::convertResultToString(*result),
vector<string>{"14 number of rels has been copied to relTable: knows."});
vector<string>{"14 number of tuples has been copied to table: knows."});
}

TEST_F(TinySnbCopyCSVTransactionTest, CopyCSVStatementWithActiveTransactionErrorTest) {
Expand Down
2 changes: 1 addition & 1 deletion third_party/antlr4_cypher/cypher_lexer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

// Generated from /Users/xiyangfeng/kuzu/kuzu/src/antlr4/Cypher.g4 by ANTLR 4.9
// Generated from src/antlr4/Cypher.g4 by ANTLR 4.9


#include "cypher_lexer.h"
Expand Down
2 changes: 1 addition & 1 deletion third_party/antlr4_cypher/cypher_parser.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

// Generated from /Users/xiyangfeng/kuzu/kuzu/src/antlr4/Cypher.g4 by ANTLR 4.9
// Generated from src/antlr4/Cypher.g4 by ANTLR 4.9



Expand Down

0 comments on commit d2cdd02

Please sign in to comment.