Skip to content

Commit

Permalink
Merge pull request #2026 from kuzudb/copy-rdf-compilation
Browse files Browse the repository at this point in the history
Add turtle file type, refactor copy compilation
  • Loading branch information
andyfengHKU committed Sep 12, 2023
2 parents 6840827 + d1a26f9 commit c578381
Show file tree
Hide file tree
Showing 15 changed files with 2,052 additions and 2,029 deletions.
24 changes: 12 additions & 12 deletions src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,20 @@ grammar Cypher;
oC_Cypher
: SP ? oC_AnyCypherOption? SP? ( oC_Statement ) ( SP? ';' )? SP? EOF ;

kU_CopyFromCSV
oC_Statement
: oC_Query
| kU_DDL
| kU_CopyFrom
| kU_CopyFromByColumn
| kU_CopyTO
| kU_StandaloneCall
| kU_CreateMacro
| kU_Transaction ;

kU_CopyFrom
: COPY SP oC_SchemaName SP FROM SP kU_FilePaths ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;

kU_CopyFromNPY
kU_CopyFromByColumn
: COPY SP oC_SchemaName SP FROM SP '(' SP? StringLiteral ( SP? ',' SP? StringLiteral )* ')' SP BY SP COLUMN ;

kU_CopyTO
Expand Down Expand Up @@ -167,16 +177,6 @@ oC_Profile

PROFILE : ( 'P' | 'p' ) ( 'R' | 'r' ) ( 'O' | 'o' ) ( 'F' | 'f' ) ( 'I' | 'i' ) ( 'L' | 'l' ) ( 'E' | 'e' ) ;

oC_Statement
: oC_Query
| kU_DDL
| kU_CopyFromNPY
| kU_CopyFromCSV
| kU_CopyTO
| kU_StandaloneCall
| kU_CreateMacro
| kU_Transaction ;

kU_Transaction
: BEGIN SP READ SP TRANSACTION
| BEGIN SP WRITE SP TRANSACTION
Expand Down
279 changes: 158 additions & 121 deletions src/binder/bind/bind_copy.cpp

Large diffs are not rendered by default.

9 changes: 0 additions & 9 deletions src/binder/binder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,6 @@ void Binder::validateNodeRelTableExist(const std::string& tableName) {
}
}

bool Binder::validateStringParsingOptionName(std::string& parsingOptionName) {
for (auto i = 0; i < std::size(CopyConstants::STRING_CSV_PARSING_OPTIONS); i++) {
if (parsingOptionName == CopyConstants::STRING_CSV_PARSING_OPTIONS[i]) {
return true;
}
}
return false;
}

void Binder::validateNodeTableHasNoEdge(const Catalog& _catalog, table_id_t tableID) {
for (auto& tableSchema : _catalog.getReadOnlyVersion()->getRelTableSchemas()) {
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
Expand Down
9 changes: 0 additions & 9 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,5 @@ CopyDescription::FileType CopyDescription::getFileTypeFromExtension(const std::s
return fileType;
}

std::string CopyDescription::getFileTypeName(FileType fileType) {
for (const auto& fileTypeItem : fileTypeMap) {
if (fileTypeItem.second == fileType) {
return fileTypeItem.first;
}
}
throw InternalException("Unimplemented getFileTypeName().");
}

} // namespace common
} // namespace kuzu
26 changes: 11 additions & 15 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,23 +103,21 @@ class Binder {
common::property_id_t bindPropertyName(
catalog::TableSchema* tableSchema, const std::string& propertyName);

/*** bind copy from/to ***/
expression_vector bindColumnExpressions(
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
/*** bind copy ***/
std::unique_ptr<BoundStatement> bindCopyFromClause(const parser::Statement& statement);
std::unique_ptr<BoundStatement> bindCopyNodeFrom(
std::unique_ptr<common::CopyDescription> copyDescription,
catalog::TableSchema* tableSchema);
std::unique_ptr<BoundStatement> bindCopyRelFrom(
std::unique_ptr<common::CopyDescription> copyDescription,
catalog::TableSchema* tableSchema);
expression_vector bindCopyNodeColumns(
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
expression_vector bindCopyRelColumns(catalog::TableSchema* tableSchema);
std::unique_ptr<BoundStatement> bindCopyToClause(const parser::Statement& statement);

static std::vector<std::string> bindFilePaths(const std::vector<std::string>& filePaths);

std::unique_ptr<common::CSVReaderConfig> bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<parser::ParsedExpression>>*
const std::unordered_map<std::string, std::unique_ptr<parser::ParsedExpression>>&
parsingOptions);
void bindStringParsingOptions(common::CSVReaderConfig& csvReaderConfig,
const std::string& optionName, std::string& optionValue);
char bindParsingOptionValue(std::string value);
static common::CopyDescription::FileType bindFileType(
const std::vector<std::string>& filePaths);
static common::CopyDescription::FileType bindFileType(const std::string& filePath);

/*** bind query ***/
std::unique_ptr<BoundRegularQuery> bindQuery(const parser::RegularQuery& regularQuery);
Expand Down Expand Up @@ -244,8 +242,6 @@ class Binder {
// TODO(Xiyang): remove this validation once we refactor DDL.
void validateNodeRelTableExist(const std::string& tableName);

static bool validateStringParsingOptionName(std::string& parsingOptionName);

static void validateNodeTableHasNoEdge(
const catalog::Catalog& _catalog, common::table_id_t tableID);

Expand Down
10 changes: 6 additions & 4 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct CSVReaderConfig {
};

struct CopyDescription {
enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3 };
enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3, TURTLE = 4 };

FileType fileType;
std::vector<std::string> filePaths;
Expand All @@ -52,18 +52,20 @@ struct CopyDescription {
}
}

inline bool parallelRead() const {
return fileType != FileType::CSV && fileType != FileType::TURTLE;
}

inline std::unique_ptr<CopyDescription> copy() const {
assert(this);
return std::make_unique<CopyDescription>(*this);
}

inline static std::unordered_map<std::string, FileType> fileTypeMap{
{"unknown", FileType::UNKNOWN}, {".csv", FileType::CSV}, {".parquet", FileType::PARQUET},
{".npy", FileType::NPY}};
{".npy", FileType::NPY}, {".ttl", FileType::TURTLE}};

static FileType getFileTypeFromExtension(const std::string& extension);

static std::string getFileTypeName(FileType fileType);
};

} // namespace common
Expand Down
21 changes: 10 additions & 11 deletions src/include/parser/copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,23 @@ namespace parser {

class CopyFrom : public Statement {
public:
explicit CopyFrom(std::vector<std::string> filePaths, std::string tableName,
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> parsingOptions,
common::CopyDescription::FileType fileType)
: Statement{common::StatementType::COPY_FROM}, filePaths{std::move(filePaths)},
tableName{std::move(tableName)},
parsingOptions{std::move(parsingOptions)}, fileType{fileType} {}
explicit CopyFrom(bool byColumn_, std::vector<std::string> filePaths, std::string tableName,
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> parsingOptions)
: Statement{common::StatementType::COPY_FROM}, byColumn_{byColumn_}, filePaths{std::move(
filePaths)},
tableName{std::move(tableName)}, parsingOptions{std::move(parsingOptions)} {}

inline bool byColumn() const { return byColumn_; }
inline std::vector<std::string> getFilePaths() const { return filePaths; }
inline std::string getTableName() const { return tableName; }
inline std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> const*
getParsingOptions() const {
return &parsingOptions;
inline const std::unordered_map<std::string, std::unique_ptr<ParsedExpression>>&
getParsingOptionsRef() const {
return parsingOptions;
}
inline common::CopyDescription::FileType getFileType() const { return fileType; }

private:
bool byColumn_;
std::vector<std::string> filePaths;
common::CopyDescription::FileType fileType;
std::string tableName;
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> parsingOptions;
};
Expand Down
24 changes: 11 additions & 13 deletions src/include/parser/transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,17 @@ class Transformer {
std::unique_ptr<Statement> transform();

private:
std::unique_ptr<Statement> transformOcStatement(CypherParser::OC_StatementContext& ctx);
std::unique_ptr<Statement> transformStatement(CypherParser::OC_StatementContext& ctx);

/* Copy From */
std::unique_ptr<Statement> transformCopyTo(CypherParser::KU_CopyTOContext& ctx);
std::unique_ptr<Statement> transformCopyFrom(CypherParser::KU_CopyFromContext& ctx);
std::unique_ptr<Statement> transformCopyFromByColumn(
CypherParser::KU_CopyFromByColumnContext& ctx);
std::vector<std::string> transformFilePaths(
std::vector<antlr4::tree::TerminalNode*> stringLiteral);
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> transformParsingOptions(
CypherParser::KU_ParsingOptionsContext& ctx);

std::unique_ptr<RegularQuery> transformQuery(CypherParser::OC_QueryContext& ctx);

Expand Down Expand Up @@ -265,12 +275,6 @@ class Transformer {
std::vector<std::pair<std::string, std::string>> transformPropertyDefinitions(
CypherParser::KU_PropertyDefinitionsContext& ctx);

std::unique_ptr<Statement> transformCopyTo(CypherParser::KU_CopyTOContext& ctx);

std::unique_ptr<Statement> transformCopyFrom(CypherParser::KU_CopyFromCSVContext& ctx);

std::unique_ptr<Statement> transformCopyFromNPY(CypherParser::KU_CopyFromNPYContext& ctx);

std::unique_ptr<Statement> transformStandaloneCall(CypherParser::KU_StandaloneCallContext& ctx);

std::vector<std::string> transformPositionalArgs(CypherParser::KU_PositionalArgsContext& ctx);
Expand All @@ -279,12 +283,6 @@ class Transformer {

std::unique_ptr<Statement> transformTransaction(CypherParser::KU_TransactionContext& ctx);

std::vector<std::string> transformFilePaths(
std::vector<antlr4::tree::TerminalNode*> stringLiteral);

std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> transformParsingOptions(
CypherParser::KU_ParsingOptionsContext& ctx);

std::string transformStringLiteral(antlr4::tree::TerminalNode& stringLiteral);

private:
Expand Down
22 changes: 11 additions & 11 deletions src/parser/transform/transform_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,24 @@ std::unique_ptr<Statement> Transformer::transformCopyTo(CypherParser::KU_CopyTOC
return std::make_unique<CopyTo>(std::move(filePath), std::move(regularQuery));
}

std::unique_ptr<Statement> Transformer::transformCopyFrom(
CypherParser::KU_CopyFromCSVContext& ctx) {
std::unique_ptr<Statement> Transformer::transformCopyFrom(CypherParser::KU_CopyFromContext& ctx) {
auto filePaths = transformFilePaths(ctx.kU_FilePaths()->StringLiteral());
auto tableName = transformSchemaName(*ctx.oC_SchemaName());
auto parsingOptions = ctx.kU_ParsingOptions() ?
transformParsingOptions(*ctx.kU_ParsingOptions()) :
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>>();
return std::make_unique<CopyFrom>(std::move(filePaths), std::move(tableName),
std::move(parsingOptions), CopyDescription::FileType::UNKNOWN);
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> parsingOptions;
if (ctx.kU_ParsingOptions()) {
parsingOptions = transformParsingOptions(*ctx.kU_ParsingOptions());
}
return std::make_unique<CopyFrom>(false /* byColumn */, std::move(filePaths),
std::move(tableName), std::move(parsingOptions));
}

std::unique_ptr<Statement> Transformer::transformCopyFromNPY(
CypherParser::KU_CopyFromNPYContext& ctx) {
std::unique_ptr<Statement> Transformer::transformCopyFromByColumn(
CypherParser::KU_CopyFromByColumnContext& ctx) {
auto filePaths = transformFilePaths(ctx.StringLiteral());
auto tableName = transformSchemaName(*ctx.oC_SchemaName());
auto parsingOptions = std::unordered_map<std::string, std::unique_ptr<ParsedExpression>>();
return std::make_unique<CopyFrom>(std::move(filePaths), std::move(tableName),
std::move(parsingOptions), CopyDescription::FileType::NPY);
return std::make_unique<CopyFrom>(
true /* byColumn */, std::move(filePaths), std::move(tableName), std::move(parsingOptions));
}

std::vector<std::string> Transformer::transformFilePaths(
Expand Down
13 changes: 6 additions & 7 deletions src/parser/transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace kuzu {
namespace parser {

std::unique_ptr<Statement> Transformer::transform() {
auto statement = transformOcStatement(*root.oC_Statement());
auto statement = transformStatement(*root.oC_Statement());
if (root.oC_AnyCypherOption()) {
auto cypherOption = root.oC_AnyCypherOption();
auto explainType =
Expand All @@ -21,16 +21,15 @@ std::unique_ptr<Statement> Transformer::transform() {
return statement;
}

std::unique_ptr<Statement> Transformer::transformOcStatement(
CypherParser::OC_StatementContext& ctx) {
std::unique_ptr<Statement> Transformer::transformStatement(CypherParser::OC_StatementContext& ctx) {
if (ctx.oC_Query()) {
return transformQuery(*ctx.oC_Query());
} else if (ctx.kU_DDL()) {
return transformDDL(*ctx.kU_DDL());
} else if (ctx.kU_CopyFromNPY()) {
return transformCopyFromNPY(*ctx.kU_CopyFromNPY());
} else if (ctx.kU_CopyFromCSV()) {
return transformCopyFrom(*ctx.kU_CopyFromCSV());
} else if (ctx.kU_CopyFromByColumn()) {
return transformCopyFromByColumn(*ctx.kU_CopyFromByColumn());
} else if (ctx.kU_CopyFrom()) {
return transformCopyFrom(*ctx.kU_CopyFrom());
} else if (ctx.kU_CopyTO()) {
return transformCopyTo(*ctx.kU_CopyTO());
} else if (ctx.kU_StandaloneCall()) {
Expand Down
4 changes: 4 additions & 0 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ namespace processor {

std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyFrom(LogicalOperator* logicalOperator) {
auto copyFrom = (LogicalCopyFrom*)logicalOperator;
auto info = copyFrom->getInfo();
if (info->copyDesc->fileType == CopyDescription::FileType::TURTLE) {
throw NotImplementedException("PlanMapper::mapCopyFrom");
}
switch (copyFrom->getInfo()->tableSchema->getTableType()) {
case TableType::NODE:
return mapCopyNodeFrom(logicalOperator);
Expand Down
6 changes: 3 additions & 3 deletions src/processor/operator/persistent/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ void Reader::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* cont
}

bool Reader::getNextTuplesInternal(ExecutionContext* context) {
sharedState->copyDescription->fileType == common::CopyDescription::FileType::CSV ?
readNextDataChunk<ReaderSharedState::ReadMode::SERIAL>() :
readNextDataChunk<ReaderSharedState::ReadMode::PARALLEL>();
sharedState->copyDescription->parallelRead() ?
readNextDataChunk<ReaderSharedState::ReadMode::PARALLEL>() :
readNextDataChunk<ReaderSharedState::ReadMode::SERIAL>();
return dataChunk->state->selVector->selectedSize != 0;
}

Expand Down
3 changes: 3 additions & 0 deletions test/test_files/rdf/ddl.test
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@
-STATEMENT Call table_info('test_TRIPLES') RETURN *;
---- 1
1|_PREDICT_ID|INTERNAL_ID
#-STATEMENT COPY test_RESOURCE FROM '/Users/xiyangfeng/Desktop/ku_play/test.ttl';
#---- error
#PlanMapper::mapCopyFrom
Loading

0 comments on commit c578381

Please sign in to comment.