Skip to content

Commit

Permalink
add support for import db
Browse files Browse the repository at this point in the history
  • Loading branch information
Kuzu CI committed Feb 29, 2024
1 parent 29f2534 commit 181f31f
Show file tree
Hide file tree
Showing 48 changed files with 3,552 additions and 3,176 deletions.
10 changes: 9 additions & 1 deletion scripts/antlr4/Cypher.g4.copy
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ oC_Statement
| kU_CommentOn
| kU_Transaction
| kU_Extension
| kU_ExportDatabase;
| kU_ExportDatabase
| kU_ImportDatabase;

kU_CopyFrom
: COPY SP oC_SchemaName ( ( SP? '(' SP? kU_ColumnNames SP? ')' SP? ) | SP ) FROM SP (kU_FilePaths | oC_Variable) ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;
Expand All @@ -47,6 +48,10 @@ kU_CopyTO
kU_ExportDatabase
: EXPORT SP DATABASE SP StringLiteral ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;

kU_ImportDatabase
: IMPORT SP DATABASE SP StringLiteral;


kU_StandaloneCall
: CALL SP oC_SymbolicName SP? '=' SP? oC_Literal ;

Expand Down Expand Up @@ -89,6 +94,8 @@ COLUMN : ( 'C' | 'c' ) ( 'O' | 'o' ) ( 'L' | 'l' ) ( 'U' | 'u' ) ( 'M' | 'm' ) (

EXPORT: ( 'E' | 'e') ( 'X' | 'x') ( 'P' | 'p') ( 'O' | 'o') ( 'R' | 'r') ( 'T' | 't');

IMPORT: ( 'I' | 'i') ( 'M' | 'm') ( 'P' | 'p') ( 'O' | 'o') ( 'R' | 'r') ( 'T' | 't');

DATABASE: ( 'D' | 'd') ( 'A' | 'a') ( 'T' | 't') ( 'A' | 'a') ( 'B' | 'b') ( 'A' | 'a') ( 'S' | 's')( 'E' | 'e');

kU_DDL
Expand Down Expand Up @@ -754,6 +761,7 @@ kU_NonReservedKeywords
| BEGIN
| END
| IN
| IMPORT
| EXPORT
| DATABASE
;
Expand Down
10 changes: 9 additions & 1 deletion src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ oC_Statement
| kU_CommentOn
| kU_Transaction
| kU_Extension
| kU_ExportDatabase;
| kU_ExportDatabase
| kU_ImportDatabase;

kU_CopyFrom
: COPY SP oC_SchemaName ( ( SP? '(' SP? kU_ColumnNames SP? ')' SP? ) | SP ) FROM SP (kU_FilePaths | oC_Variable) ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;
Expand All @@ -47,6 +48,10 @@ kU_CopyTO
kU_ExportDatabase
: EXPORT SP DATABASE SP StringLiteral ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;

kU_ImportDatabase
: IMPORT SP DATABASE SP StringLiteral;


kU_StandaloneCall
: CALL SP oC_SymbolicName SP? '=' SP? oC_Literal ;

Expand Down Expand Up @@ -89,6 +94,8 @@ COLUMN : ( 'C' | 'c' ) ( 'O' | 'o' ) ( 'L' | 'l' ) ( 'U' | 'u' ) ( 'M' | 'm' ) (

EXPORT: ( 'E' | 'e') ( 'X' | 'x') ( 'P' | 'p') ( 'O' | 'o') ( 'R' | 'r') ( 'T' | 't');

IMPORT: ( 'I' | 'i') ( 'M' | 'm') ( 'P' | 'p') ( 'O' | 'o') ( 'R' | 'r') ( 'T' | 't');

DATABASE: ( 'D' | 'd') ( 'A' | 'a') ( 'T' | 't') ( 'A' | 'a') ( 'B' | 'b') ( 'A' | 'a') ( 'S' | 's')( 'E' | 'e');

kU_DDL
Expand Down Expand Up @@ -754,6 +761,7 @@ kU_NonReservedKeywords
| BEGIN
| END
| IN
| IMPORT
| EXPORT
| DATABASE
;
Expand Down
3 changes: 2 additions & 1 deletion src/binder/bind/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ add_library(
bind_transaction.cpp
bind_updating_clause.cpp
bind_extension.cpp
bind_export_database.cpp)
bind_export_database.cpp
bind_import_database.cpp)

set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:kuzu_binder_bind>
Expand Down
42 changes: 42 additions & 0 deletions src/binder/bind/bind_import_database.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include "binder/binder.h"
#include "binder/copy/bound_import_database.h"
#include "common/exception/binder.h"
#include "parser/port_db.h"

using namespace kuzu::common;
using namespace kuzu::parser;

namespace kuzu {
namespace binder {

std::string getFilePath(
common::VirtualFileSystem* vfs, const std::string boundFilePath, const std::string fileName) {
auto filePath = vfs->joinPath(boundFilePath, fileName);
if (!vfs->fileOrPathExists(filePath)) {
throw BinderException(stringFormat("File {} does not exist.", filePath));
}
auto fileInfo = vfs->openFile(filePath, O_RDONLY
#ifdef _WIN32
| _O_BINARY
#endif
);
auto fsize = fileInfo->getFileSize();
auto buffer = std::make_unique<char[]>(fsize);
fileInfo->readFile(buffer.get(), fsize);
return std::string(buffer.get(), fsize);
}

std::unique_ptr<BoundStatement> Binder::bindImportDatabaseClause(const Statement& statement) {
auto& importDatabaseStatement = ku_dynamic_cast<const Statement&, const ImportDB&>(statement);
auto boundFilePath = importDatabaseStatement.getFilePath();
if (!vfs->fileOrPathExists(boundFilePath)) {
throw BinderException(stringFormat("Directory {} does not exist.", boundFilePath));
}
std::string finalQueryStatements;
finalQueryStatements += getFilePath(vfs, boundFilePath, ImportDBConstants::SCHEMA_NAME);
finalQueryStatements += getFilePath(vfs, boundFilePath, ImportDBConstants::COPY_NAME);
finalQueryStatements += getFilePath(vfs, boundFilePath, ImportDBConstants::MACRO_NAME);
return std::make_unique<BoundImportDatabase>(boundFilePath, finalQueryStatements);
}
} // namespace binder
} // namespace kuzu
3 changes: 3 additions & 0 deletions src/binder/binder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ std::unique_ptr<BoundStatement> Binder::bind(const Statement& statement) {
case StatementType::EXPORT_DATABASE: {
boundStatement = bindExportDatabaseClause(statement);
} break;
case StatementType::IMPORT_DATABASE: {
boundStatement = bindImportDatabaseClause(statement);
} break;
default: {
KU_UNREACHABLE;
}
Expand Down
3 changes: 3 additions & 0 deletions src/binder/bound_statement_visitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ void BoundStatementVisitor::visit(const BoundStatement& statement) {
case StatementType::EXPORT_DATABASE: {
visitExportDatabase(statement);
} break;
case StatementType::IMPORT_DATABASE: {
visitImportDatabase(statement);
} break;
default:
KU_UNREACHABLE;
}
Expand Down
1 change: 1 addition & 0 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class Binder {
std::unique_ptr<BoundStatement> bindCopyToClause(const parser::Statement& statement);

std::unique_ptr<BoundStatement> bindExportDatabaseClause(const parser::Statement& statement);
std::unique_ptr<BoundStatement> bindImportDatabaseClause(const parser::Statement& statement);

/*** bind file scan ***/
std::unordered_map<std::string, common::Value> bindParsingOptions(
Expand Down
1 change: 1 addition & 0 deletions src/include/binder/bound_statement_visitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class BoundStatementVisitor {
virtual void visitCopyFrom(const BoundStatement&) {}
virtual void visitCopyTo(const BoundStatement&) {}
virtual void visitExportDatabase(const BoundStatement&) {}
virtual void visitImportDatabase(const BoundStatement&) {}
virtual void visitStandaloneCall(const BoundStatement&) {}
virtual void visitCommentOn(const BoundStatement&) {}
virtual void visitExplain(const BoundStatement&);
Expand Down
23 changes: 23 additions & 0 deletions src/include/binder/copy/bound_import_database.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#pragma once
#include "binder/bound_statement.h"

namespace kuzu {
namespace binder {

class BoundImportDatabase : public BoundStatement {
public:
BoundImportDatabase(std::string filePath, std::string query)
: BoundStatement{common::StatementType::IMPORT_DATABASE,
BoundStatementResult::createEmptyResult()},
filePath{std::move(filePath)}, query{query} {}

inline std::string getFilePath() const { return filePath; }
inline std::string getQuery() const { return query; }

private:
std::string filePath;
std::string query;
};

} // namespace binder
} // namespace kuzu
6 changes: 6 additions & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,5 +208,11 @@ struct CopyToCSVConstants {
static constexpr const uint64_t DEFAULT_CSV_FLUSH_SIZE = 4096 * 8;
};

struct ImportDBConstants {
static constexpr char SCHEMA_NAME[] = "schema.cypher";
static constexpr char COPY_NAME[] = "copy.cypher";
static constexpr char MACRO_NAME[] = "macro.cypher";
};

} // namespace common
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/common/enums/statement_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ enum class StatementType : uint8_t {
TRANSACTION = 30,
EXTENSION = 31,
EXPORT_DATABASE = 32,
IMPORT_DATABASE = 33,
};

struct StatementTypeUtils {
Expand Down
2 changes: 2 additions & 0 deletions src/include/main/client_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class ClientContext {

std::unique_ptr<QueryResult> query(std::string_view queryStatement);

void runQuery(std::string query);

private:
inline void resetActiveQuery() { activeQuery.reset(); }

Expand Down
1 change: 1 addition & 0 deletions src/include/parser/parsed_statement_visitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class StatementVisitor {
virtual void visitTransaction(const Statement& /*statement*/) {}
virtual void visitExtension(const Statement& /*statement*/) {}
virtual void visitExportDatabase(const Statement& /*statement*/) {}
virtual void visitImportDatabase(const Statement& /*statement*/) {}
// LCOV_EXCL_STOP
};

Expand Down
11 changes: 11 additions & 0 deletions src/include/parser/port_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,16 @@ class ExportDB : public Statement {
std::string filePath;
};

class ImportDB : public Statement {
public:
explicit ImportDB(std::string filePath)
: Statement{common::StatementType::IMPORT_DATABASE}, filePath{std::move(filePath)} {}

inline std::string getFilePath() const { return filePath; }

private:
std::string filePath;
};

} // namespace parser
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/parser/transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class Transformer {
parsing_option_t transformParsingOptions(CypherParser::KU_ParsingOptionsContext& ctx);

std::unique_ptr<Statement> transformExportDatabase(CypherParser::KU_ExportDatabaseContext& ctx);
std::unique_ptr<Statement> transformImportDatabase(CypherParser::KU_ImportDatabaseContext& ctx);

// Transform query statement.
std::unique_ptr<Statement> transformQuery(CypherParser::OC_QueryContext& ctx);
Expand Down
3 changes: 2 additions & 1 deletion src/include/planner/operator/logical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ enum class LogicalOperatorType : uint8_t {
UNION_ALL,
UNWIND,
EXTENSION,
EXPORT_DATABASE
EXPORT_DATABASE,
IMPORT_DATABASE,
};

class LogicalOperatorUtils {
Expand Down
29 changes: 29 additions & 0 deletions src/include/planner/operator/persistent/logical_import_db.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

#include "planner/operator/logical_operator.h"

namespace kuzu {
namespace planner {

class LogicalImportDatabase : public LogicalOperator {
public:
explicit LogicalImportDatabase(std::string query)
: LogicalOperator{LogicalOperatorType::IMPORT_DATABASE}, query{query} {}

inline std::string getExpressionsForPrinting() const override { return std::string{}; }

void computeFactorizedSchema() override;
void computeFlatSchema() override;

inline std::string getQuery() const { return query; }

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalImportDatabase>(query);
}

private:
std::string query;
};

} // namespace planner
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/planner/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class Planner {

// Plan export database
std::unique_ptr<LogicalPlan> planExportDatabase(const binder::BoundStatement& statement);
std::unique_ptr<LogicalPlan> planImportDatabase(const binder::BoundStatement& statement);

// Plan query.
std::vector<std::unique_ptr<LogicalPlan>> planQuery(
Expand Down
27 changes: 27 additions & 0 deletions src/include/processor/operator/persistent/import_db.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "processor/operator/physical_operator.h"

namespace kuzu {
namespace processor {

class ImportDB : public PhysicalOperator {
public:
ImportDB(std::string query, uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::IMPORT_DATABASE, id, paramsString}, query{query} {}

bool canParallel() const override { return false; }

bool isSource() const override { return true; }

bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ImportDB>(query, id, paramsString);
}

private:
std::string query;
};
} // namespace processor
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ enum class PhysicalOperatorType : uint8_t {
FLATTEN,
HASH_JOIN_BUILD,
HASH_JOIN_PROBE,
IMPORT_DATABASE,
INDEX_LOOKUP,
INDEX_SCAN,
INSERT,
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/plan_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class PlanMapper {
std::unique_ptr<PhysicalOperator> mapTransaction(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapExtension(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapExportDatabase(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapImportDatabase(planner::LogicalOperator* logicalOperator);

std::unique_ptr<PhysicalOperator> createCopyRel(
std::shared_ptr<PartitionerSharedState> partitionerSharedState,
Expand Down
35 changes: 31 additions & 4 deletions src/main/client_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,10 @@ std::unique_ptr<PreparedStatement> ClientContext::prepareNoLock(
try {
// parsing
if (parsedStatement->getStatementType() != StatementType::TRANSACTION) {
auto txContext = this->transactionContext.get();
if (txContext->isAutoTransaction()) {
txContext->beginAutoTransaction(preparedStatement->readOnly);
if (transactionContext->isAutoTransaction()) {
transactionContext->beginAutoTransaction(preparedStatement->readOnly);
} else {
txContext->validateManualTransaction(
transactionContext->validateManualTransaction(
preparedStatement->allowActiveTransaction(), preparedStatement->readOnly);
}
if (!this->getTx()->isReadOnly()) {
Expand Down Expand Up @@ -425,5 +424,33 @@ void ClientContext::commitUDFTrx(bool isAutoCommitTrx) {
}
}

void ClientContext::runQuery(std::string query) {
// TODO(Jimain): this is special for "Import database". Should refactor after we support
// multiple query statements in one Tx.
// Currently, we split multiple query statements into single query and execute them one by one,
// each with an auto transaction. The correct way is to execute them in one transaction. But we
// do not support DDL and copy in one Tx.
if (transactionContext->hasActiveTransaction()) {
transactionContext->commit();
}
auto parsedStatements = std::vector<std::unique_ptr<Statement>>();
try {
parsedStatements = parseQuery(query);
} catch (std::exception& exception) { throw ConnectionException(exception.what()); }

Check warning on line 439 in src/main/client_context.cpp

View check run for this annotation

Codecov / codecov/patch

src/main/client_context.cpp#L439

Added line #L439 was not covered by tests
if (parsedStatements.empty()) {
throw ConnectionException("Connection Exception: Query is empty.");

Check warning on line 441 in src/main/client_context.cpp

View check run for this annotation

Codecov / codecov/patch

src/main/client_context.cpp#L441

Added line #L441 was not covered by tests
}
try {
for (auto& statement : parsedStatements) {
auto preparedStatement = prepareNoLock(statement.get());
auto currentQueryResult =
executeAndAutoCommitIfNecessaryNoLock(preparedStatement.get());
if (!currentQueryResult->isSuccess()) {
throw ConnectionException(currentQueryResult->errMsg);

Check warning on line 449 in src/main/client_context.cpp

View check run for this annotation

Codecov / codecov/patch

src/main/client_context.cpp#L449

Added line #L449 was not covered by tests
}
}
} catch (std::exception& exception) { throw ConnectionException(exception.what()); }

Check warning on line 452 in src/main/client_context.cpp

View check run for this annotation

Codecov / codecov/patch

src/main/client_context.cpp#L452

Added line #L452 was not covered by tests
return;
}
} // namespace main
} // namespace kuzu
Loading

0 comments on commit 181f31f

Please sign in to comment.