Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add turtle file type, refactor copy compilation #2026

Merged
merged 1 commit into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,20 @@ grammar Cypher;
oC_Cypher
: SP ? oC_AnyCypherOption? SP? ( oC_Statement ) ( SP? ';' )? SP? EOF ;

kU_CopyFromCSV
oC_Statement
: oC_Query
| kU_DDL
| kU_CopyFrom
| kU_CopyFromByColumn
| kU_CopyTO
| kU_StandaloneCall
| kU_CreateMacro
| kU_Transaction ;

kU_CopyFrom
: COPY SP oC_SchemaName SP FROM SP kU_FilePaths ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;

kU_CopyFromNPY
kU_CopyFromByColumn
: COPY SP oC_SchemaName SP FROM SP '(' SP? StringLiteral ( SP? ',' SP? StringLiteral )* ')' SP BY SP COLUMN ;

kU_CopyTO
Expand Down Expand Up @@ -167,16 +177,6 @@ oC_Profile

PROFILE : ( 'P' | 'p' ) ( 'R' | 'r' ) ( 'O' | 'o' ) ( 'F' | 'f' ) ( 'I' | 'i' ) ( 'L' | 'l' ) ( 'E' | 'e' ) ;

oC_Statement
: oC_Query
| kU_DDL
| kU_CopyFromNPY
| kU_CopyFromCSV
| kU_CopyTO
| kU_StandaloneCall
| kU_CreateMacro
| kU_Transaction ;

kU_Transaction
: BEGIN SP READ SP TRANSACTION
| BEGIN SP WRITE SP TRANSACTION
Expand Down
279 changes: 158 additions & 121 deletions src/binder/bind/bind_copy.cpp

Large diffs are not rendered by default.

9 changes: 0 additions & 9 deletions src/binder/binder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,6 @@ void Binder::validateNodeRelTableExist(const std::string& tableName) {
}
}

bool Binder::validateStringParsingOptionName(std::string& parsingOptionName) {
for (auto i = 0; i < std::size(CopyConstants::STRING_CSV_PARSING_OPTIONS); i++) {
if (parsingOptionName == CopyConstants::STRING_CSV_PARSING_OPTIONS[i]) {
return true;
}
}
return false;
}

void Binder::validateNodeTableHasNoEdge(const Catalog& _catalog, table_id_t tableID) {
for (auto& tableSchema : _catalog.getReadOnlyVersion()->getRelTableSchemas()) {
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
Expand Down
9 changes: 0 additions & 9 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,5 @@ CopyDescription::FileType CopyDescription::getFileTypeFromExtension(const std::s
return fileType;
}

std::string CopyDescription::getFileTypeName(FileType fileType) {
for (const auto& fileTypeItem : fileTypeMap) {
if (fileTypeItem.second == fileType) {
return fileTypeItem.first;
}
}
throw InternalException("Unimplemented getFileTypeName().");
}

} // namespace common
} // namespace kuzu
26 changes: 11 additions & 15 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,23 +103,21 @@ class Binder {
common::property_id_t bindPropertyName(
catalog::TableSchema* tableSchema, const std::string& propertyName);

/*** bind copy from/to ***/
expression_vector bindColumnExpressions(
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
/*** bind copy ***/
std::unique_ptr<BoundStatement> bindCopyFromClause(const parser::Statement& statement);
std::unique_ptr<BoundStatement> bindCopyNodeFrom(
std::unique_ptr<common::CopyDescription> copyDescription,
catalog::TableSchema* tableSchema);
std::unique_ptr<BoundStatement> bindCopyRelFrom(
std::unique_ptr<common::CopyDescription> copyDescription,
catalog::TableSchema* tableSchema);
expression_vector bindCopyNodeColumns(
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
expression_vector bindCopyRelColumns(catalog::TableSchema* tableSchema);
std::unique_ptr<BoundStatement> bindCopyToClause(const parser::Statement& statement);

static std::vector<std::string> bindFilePaths(const std::vector<std::string>& filePaths);

std::unique_ptr<common::CSVReaderConfig> bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<parser::ParsedExpression>>*
const std::unordered_map<std::string, std::unique_ptr<parser::ParsedExpression>>&
parsingOptions);
void bindStringParsingOptions(common::CSVReaderConfig& csvReaderConfig,
const std::string& optionName, std::string& optionValue);
char bindParsingOptionValue(std::string value);
static common::CopyDescription::FileType bindFileType(
const std::vector<std::string>& filePaths);
static common::CopyDescription::FileType bindFileType(const std::string& filePath);

/*** bind query ***/
std::unique_ptr<BoundRegularQuery> bindQuery(const parser::RegularQuery& regularQuery);
Expand Down Expand Up @@ -244,8 +242,6 @@ class Binder {
// TODO(Xiyang): remove this validation once we refactor DDL.
void validateNodeRelTableExist(const std::string& tableName);

static bool validateStringParsingOptionName(std::string& parsingOptionName);

static void validateNodeTableHasNoEdge(
const catalog::Catalog& _catalog, common::table_id_t tableID);

Expand Down
10 changes: 6 additions & 4 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct CSVReaderConfig {
};

struct CopyDescription {
enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3 };
enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3, TURTLE = 4 };

FileType fileType;
std::vector<std::string> filePaths;
Expand All @@ -52,18 +52,20 @@ struct CopyDescription {
}
}

inline bool parallelRead() const {
return fileType != FileType::CSV && fileType != FileType::TURTLE;
}

inline std::unique_ptr<CopyDescription> copy() const {
assert(this);
return std::make_unique<CopyDescription>(*this);
}

inline static std::unordered_map<std::string, FileType> fileTypeMap{
{"unknown", FileType::UNKNOWN}, {".csv", FileType::CSV}, {".parquet", FileType::PARQUET},
{".npy", FileType::NPY}};
{".npy", FileType::NPY}, {".ttl", FileType::TURTLE}};

static FileType getFileTypeFromExtension(const std::string& extension);

static std::string getFileTypeName(FileType fileType);
};

} // namespace common
Expand Down
21 changes: 10 additions & 11 deletions src/include/parser/copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,23 @@ namespace parser {

class CopyFrom : public Statement {
public:
explicit CopyFrom(std::vector<std::string> filePaths, std::string tableName,
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> parsingOptions,
common::CopyDescription::FileType fileType)
: Statement{common::StatementType::COPY_FROM}, filePaths{std::move(filePaths)},
tableName{std::move(tableName)},
parsingOptions{std::move(parsingOptions)}, fileType{fileType} {}
explicit CopyFrom(bool byColumn_, std::vector<std::string> filePaths, std::string tableName,
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> parsingOptions)
: Statement{common::StatementType::COPY_FROM}, byColumn_{byColumn_}, filePaths{std::move(
filePaths)},
tableName{std::move(tableName)}, parsingOptions{std::move(parsingOptions)} {}

inline bool byColumn() const { return byColumn_; }
inline std::vector<std::string> getFilePaths() const { return filePaths; }
inline std::string getTableName() const { return tableName; }
inline std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> const*
getParsingOptions() const {
return &parsingOptions;
inline const std::unordered_map<std::string, std::unique_ptr<ParsedExpression>>&
getParsingOptionsRef() const {
return parsingOptions;
}
inline common::CopyDescription::FileType getFileType() const { return fileType; }

private:
bool byColumn_;
std::vector<std::string> filePaths;
common::CopyDescription::FileType fileType;
std::string tableName;
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> parsingOptions;
};
Expand Down
24 changes: 11 additions & 13 deletions src/include/parser/transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,17 @@ class Transformer {
std::unique_ptr<Statement> transform();

private:
std::unique_ptr<Statement> transformOcStatement(CypherParser::OC_StatementContext& ctx);
std::unique_ptr<Statement> transformStatement(CypherParser::OC_StatementContext& ctx);

/* Copy From */
std::unique_ptr<Statement> transformCopyTo(CypherParser::KU_CopyTOContext& ctx);
std::unique_ptr<Statement> transformCopyFrom(CypherParser::KU_CopyFromContext& ctx);
std::unique_ptr<Statement> transformCopyFromByColumn(
CypherParser::KU_CopyFromByColumnContext& ctx);
std::vector<std::string> transformFilePaths(
std::vector<antlr4::tree::TerminalNode*> stringLiteral);
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> transformParsingOptions(
CypherParser::KU_ParsingOptionsContext& ctx);

std::unique_ptr<RegularQuery> transformQuery(CypherParser::OC_QueryContext& ctx);

Expand Down Expand Up @@ -265,12 +275,6 @@ class Transformer {
std::vector<std::pair<std::string, std::string>> transformPropertyDefinitions(
CypherParser::KU_PropertyDefinitionsContext& ctx);

std::unique_ptr<Statement> transformCopyTo(CypherParser::KU_CopyTOContext& ctx);

std::unique_ptr<Statement> transformCopyFrom(CypherParser::KU_CopyFromCSVContext& ctx);

std::unique_ptr<Statement> transformCopyFromNPY(CypherParser::KU_CopyFromNPYContext& ctx);

std::unique_ptr<Statement> transformStandaloneCall(CypherParser::KU_StandaloneCallContext& ctx);

std::vector<std::string> transformPositionalArgs(CypherParser::KU_PositionalArgsContext& ctx);
Expand All @@ -279,12 +283,6 @@ class Transformer {

std::unique_ptr<Statement> transformTransaction(CypherParser::KU_TransactionContext& ctx);

std::vector<std::string> transformFilePaths(
std::vector<antlr4::tree::TerminalNode*> stringLiteral);

std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> transformParsingOptions(
CypherParser::KU_ParsingOptionsContext& ctx);

std::string transformStringLiteral(antlr4::tree::TerminalNode& stringLiteral);

private:
Expand Down
22 changes: 11 additions & 11 deletions src/parser/transform/transform_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,24 @@ std::unique_ptr<Statement> Transformer::transformCopyTo(CypherParser::KU_CopyTOC
return std::make_unique<CopyTo>(std::move(filePath), std::move(regularQuery));
}

std::unique_ptr<Statement> Transformer::transformCopyFrom(
CypherParser::KU_CopyFromCSVContext& ctx) {
std::unique_ptr<Statement> Transformer::transformCopyFrom(CypherParser::KU_CopyFromContext& ctx) {
auto filePaths = transformFilePaths(ctx.kU_FilePaths()->StringLiteral());
auto tableName = transformSchemaName(*ctx.oC_SchemaName());
auto parsingOptions = ctx.kU_ParsingOptions() ?
transformParsingOptions(*ctx.kU_ParsingOptions()) :
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>>();
return std::make_unique<CopyFrom>(std::move(filePaths), std::move(tableName),
std::move(parsingOptions), CopyDescription::FileType::UNKNOWN);
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> parsingOptions;
if (ctx.kU_ParsingOptions()) {
parsingOptions = transformParsingOptions(*ctx.kU_ParsingOptions());
}
return std::make_unique<CopyFrom>(false /* byColumn */, std::move(filePaths),
std::move(tableName), std::move(parsingOptions));
}

std::unique_ptr<Statement> Transformer::transformCopyFromNPY(
CypherParser::KU_CopyFromNPYContext& ctx) {
std::unique_ptr<Statement> Transformer::transformCopyFromByColumn(
CypherParser::KU_CopyFromByColumnContext& ctx) {
auto filePaths = transformFilePaths(ctx.StringLiteral());
auto tableName = transformSchemaName(*ctx.oC_SchemaName());
auto parsingOptions = std::unordered_map<std::string, std::unique_ptr<ParsedExpression>>();
return std::make_unique<CopyFrom>(std::move(filePaths), std::move(tableName),
std::move(parsingOptions), CopyDescription::FileType::NPY);
return std::make_unique<CopyFrom>(
true /* byColumn */, std::move(filePaths), std::move(tableName), std::move(parsingOptions));
}

std::vector<std::string> Transformer::transformFilePaths(
Expand Down
13 changes: 6 additions & 7 deletions src/parser/transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace kuzu {
namespace parser {

std::unique_ptr<Statement> Transformer::transform() {
auto statement = transformOcStatement(*root.oC_Statement());
auto statement = transformStatement(*root.oC_Statement());
if (root.oC_AnyCypherOption()) {
auto cypherOption = root.oC_AnyCypherOption();
auto explainType =
Expand All @@ -21,16 +21,15 @@ std::unique_ptr<Statement> Transformer::transform() {
return statement;
}

std::unique_ptr<Statement> Transformer::transformOcStatement(
CypherParser::OC_StatementContext& ctx) {
std::unique_ptr<Statement> Transformer::transformStatement(CypherParser::OC_StatementContext& ctx) {
if (ctx.oC_Query()) {
return transformQuery(*ctx.oC_Query());
} else if (ctx.kU_DDL()) {
return transformDDL(*ctx.kU_DDL());
} else if (ctx.kU_CopyFromNPY()) {
return transformCopyFromNPY(*ctx.kU_CopyFromNPY());
} else if (ctx.kU_CopyFromCSV()) {
return transformCopyFrom(*ctx.kU_CopyFromCSV());
} else if (ctx.kU_CopyFromByColumn()) {
return transformCopyFromByColumn(*ctx.kU_CopyFromByColumn());
} else if (ctx.kU_CopyFrom()) {
return transformCopyFrom(*ctx.kU_CopyFrom());
} else if (ctx.kU_CopyTO()) {
return transformCopyTo(*ctx.kU_CopyTO());
} else if (ctx.kU_StandaloneCall()) {
Expand Down
4 changes: 4 additions & 0 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyFrom(LogicalOperator* logicalOperator) {
auto copyFrom = (LogicalCopyFrom*)logicalOperator;
auto info = copyFrom->getInfo();
if (info->copyDesc->fileType == CopyDescription::FileType::TURTLE) {
throw NotImplementedException("PlanMapper::mapCopyFrom");

Check warning on line 23 in src/processor/map/map_copy_from.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/map/map_copy_from.cpp#L23

Added line #L23 was not covered by tests
}
switch (copyFrom->getInfo()->tableSchema->getTableType()) {
case TableType::NODE:
return mapCopyNodeFrom(logicalOperator);
Expand Down
6 changes: 3 additions & 3 deletions src/processor/operator/persistent/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ void Reader::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* cont
}

bool Reader::getNextTuplesInternal(ExecutionContext* context) {
sharedState->copyDescription->fileType == common::CopyDescription::FileType::CSV ?
readNextDataChunk<ReaderSharedState::ReadMode::SERIAL>() :
readNextDataChunk<ReaderSharedState::ReadMode::PARALLEL>();
sharedState->copyDescription->parallelRead() ?
readNextDataChunk<ReaderSharedState::ReadMode::PARALLEL>() :
readNextDataChunk<ReaderSharedState::ReadMode::SERIAL>();
return dataChunk->state->selVector->selectedSize != 0;
}

Expand Down
3 changes: 3 additions & 0 deletions test/test_files/rdf/ddl.test
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@
-STATEMENT Call table_info('test_TRIPLES') RETURN *;
---- 1
1|_PREDICT_ID|INTERNAL_ID
#-STATEMENT COPY test_RESOURCE FROM '/Users/xiyangfeng/Desktop/ku_play/test.ttl';
#---- error
#PlanMapper::mapCopyFrom
Loading