Skip to content

Commit

Permalink
Support COPY TO to write query result to .csv file
Browse files Browse the repository at this point in the history
  • Loading branch information
rfdavid committed Jul 31, 2023
1 parent ed09f19 commit 8e015b1
Show file tree
Hide file tree
Showing 41 changed files with 3,444 additions and 2,760 deletions.
6 changes: 5 additions & 1 deletion src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ kU_CopyCSV
kU_CopyNPY
: COPY SP oC_SchemaName SP FROM SP '(' SP? StringLiteral ( SP? ',' SP? StringLiteral )* ')' SP BY SP COLUMN ;

kU_CopyTO
: COPY SP '(' oC_Query ')' SP TO SP StringLiteral ;

kU_StandaloneCall
: CALL SP oC_SymbolicName SP? '=' SP? oC_Literal ;

Expand Down Expand Up @@ -53,7 +56,7 @@ kU_ParsingOption

COPY : ( 'C' | 'c' ) ( 'O' | 'o' ) ( 'P' | 'p') ( 'Y' | 'y' ) ;

FROM : ( 'F' | 'f' ) ( 'R' | 'r' ) ( 'O' | 'o' ) ( 'M' | 'm' );
FROM : ( 'F' | 'f' ) ( 'R' | 'r' ) ( 'O' | 'o' ) ( 'M' | 'm' ) ;

NPY : ( 'N' | 'n' ) ( 'P' | 'p' ) ( 'Y' | 'y' ) ;

Expand Down Expand Up @@ -151,6 +154,7 @@ oC_Statement
| kU_DDL
| kU_CopyNPY
| kU_CopyCSV
| kU_CopyTO
| kU_StandaloneCall
| kU_CreateMacro ;

Expand Down
70 changes: 42 additions & 28 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "binder/binder.h"
#include "binder/copy/bound_copy.h"
#include "binder/copy/bound_copy_from.h"
#include "binder/copy/bound_copy_to.h"
#include "binder/expression/literal_expression.h"
#include "common/string_utils.h"
#include "parser/copy.h"
Expand All @@ -10,8 +11,27 @@ using namespace kuzu::parser;
namespace kuzu {
namespace binder {

std::unique_ptr<BoundStatement> Binder::bindCopyClause(const Statement& statement) {
auto& copyStatement = (Copy&)statement;
std::unique_ptr<BoundStatement> Binder::bindCopyToClause(const Statement& statement) {
auto& copyToStatement = (CopyTo&)statement;
auto boundFilePath = copyToStatement.getFilePath();
auto fileType = bindFileType(boundFilePath);
std::vector<std::string> columnNames;
auto query = bindQuery(*copyToStatement.getRegularQuery());
auto columns = query->getStatementResult()->getColumns();
for (auto& column : columns) {
auto columnName = column->hasAlias() ? column->getAlias() : column->toString();
columnNames.push_back(columnName);
}
if (fileType != CopyDescription::FileType::CSV) {
throw BinderException("COPY TO currently only supports csv files.");
}
return std::make_unique<BoundCopyTo>(
CopyDescription(std::vector<std::string>{boundFilePath}, fileType, columnNames),
std::move(query));
}

std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& statement) {
auto& copyStatement = (CopyFrom&)statement;
auto catalogContent = catalog.getReadOnlyVersion();
auto tableName = copyStatement.getTableName();
validateTableExist(catalog, tableName);
Expand All @@ -36,8 +56,8 @@ std::unique_ptr<BoundStatement> Binder::bindCopyClause(const Statement& statemen
tableSchema->tableName));
}
}
return std::make_unique<BoundCopy>(
CopyDescription(boundFilePaths, csvReaderConfig, actualFileType), tableID, tableName);
return std::make_unique<BoundCopyFrom>(
CopyDescription(boundFilePaths, actualFileType, csvReaderConfig), tableID, tableName);
}

std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& filePaths) {
Expand Down Expand Up @@ -115,33 +135,27 @@ char Binder::bindParsingOptionValue(std::string value) {
return value[value.length() - 1];
}

CopyDescription::FileType Binder::bindFileType(std::vector<std::string> filePaths) {
// We currently only support loading from files with the same type. Loading files with different
// types is not supported.
auto fileName = filePaths[0];
auto csvSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::CSV);
auto parquetSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::PARQUET);
auto npySuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::NPY);
CopyDescription::FileType fileType;
std::string expectedSuffix;
if (fileName.ends_with(csvSuffix)) {
fileType = CopyDescription::FileType::CSV;
expectedSuffix = csvSuffix;
} else if (fileName.ends_with(parquetSuffix)) {
fileType = CopyDescription::FileType::PARQUET;
expectedSuffix = parquetSuffix;
} else if (fileName.ends_with(npySuffix)) {
fileType = CopyDescription::FileType::NPY;
expectedSuffix = npySuffix;
} else {
throw CopyException("Unsupported file type: " + fileName);
CopyDescription::FileType Binder::bindFileType(const std::string& filePath) {
std::filesystem::path fileName(filePath);
auto extension = FileUtils::getFileExtension(fileName);
auto fileType = CopyDescription::getFileTypeFromExtension(extension);
if (fileType == CopyDescription::FileType::UNKNOWN) {
throw CopyException("Unsupported file type: " + filePath);
}
for (auto& path : filePaths) {
if (!path.ends_with(expectedSuffix)) {
return fileType;
}

CopyDescription::FileType Binder::bindFileType(const std::vector<std::string>& filePaths) {
auto expectedFileType = CopyDescription::FileType::UNKNOWN;
for (auto& filePath : filePaths) {
auto fileType = bindFileType(filePath);
expectedFileType =
(expectedFileType == CopyDescription::FileType::UNKNOWN) ? fileType : expectedFileType;
if (fileType != expectedFileType) {
throw CopyException("Loading files with different types is not currently supported.");
}
}
return fileType;
return expectedFileType;
}

} // namespace binder
Expand Down
7 changes: 5 additions & 2 deletions src/binder/binder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ std::unique_ptr<BoundStatement> Binder::bind(const Statement& statement) {
case StatementType::CREATE_REL_TABLE: {
return bindCreateRelTableClause(statement);
}
case StatementType::COPY: {
return bindCopyClause(statement);
case StatementType::COPY_FROM: {
return bindCopyFromClause(statement);
}
case StatementType::COPY_TO: {
return bindCopyToClause(statement);
}
case StatementType::DROP_TABLE: {
return bindDropTableClause(statement);
Expand Down
3 changes: 2 additions & 1 deletion src/binder/bound_statement_visitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ void BoundStatementVisitor::visit(const kuzu::binder::BoundStatement& statement)
case StatementType::RENAME_PROPERTY: {
visitRenameProperty(statement);
} break;
case StatementType::COPY: {
case StatementType::COPY_FROM:
case StatementType::COPY_TO: {
visitCopy(statement);
} break;
case StatementType::STANDALONE_CALL: {
Expand Down
41 changes: 25 additions & 16 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,41 @@ using namespace kuzu::utf8proc;
namespace kuzu {
namespace common {

// Copy From
CopyDescription::CopyDescription(
const std::vector<std::string>& filePaths, CSVReaderConfig csvReaderConfig, FileType fileType)
const std::vector<std::string>& filePaths, FileType fileType, CSVReaderConfig csvReaderConfig)
: filePaths{filePaths}, csvReaderConfig{nullptr}, fileType{fileType} {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(csvReaderConfig);
}

// Copy To
CopyDescription::CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
const std::vector<std::string>& columnNames)
: filePaths{filePaths}, fileType{fileType}, columnNames{columnNames} {}

CopyDescription::CopyDescription(const CopyDescription& copyDescription)
: filePaths{copyDescription.filePaths},
csvReaderConfig{nullptr}, fileType{copyDescription.fileType} {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
: filePaths{copyDescription.filePaths}, csvReaderConfig{nullptr},
fileType{copyDescription.fileType}, columnNames{copyDescription.columnNames} {
if (copyDescription.csvReaderConfig != nullptr) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
}
}

std::string CopyDescription::getFileTypeName(FileType fileType) {
switch (fileType) {
case FileType::CSV: {
return "csv";
}
case FileType::PARQUET: {
return "parquet";
CopyDescription::FileType CopyDescription::getFileTypeFromExtension(const std::string& extension) {
CopyDescription::FileType fileType = CopyDescription::fileTypeMap[extension];
if (fileType == FileType::UNKNOWN) {
throw CopyException("Unsupported file type " + extension);
}
case FileType::NPY: {
return "npy";
}
default:
throw InternalException("Unimplemented getFileTypeName().");
return fileType;
}

std::string CopyDescription::getFileTypeName(FileType fileType) {
for (const auto& fileTypeItem : fileTypeMap) {
if (fileTypeItem.second == fileType) {
return fileTypeItem.first;
}
}
throw InternalException("Unimplemented getFileTypeName().");
}

} // namespace common
Expand Down
6 changes: 4 additions & 2 deletions src/common/type_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ std::string TypeUtils::toString(const list_entry_t& val, void* valueVector) {
auto dataVector = ListVector::getDataVector(listVector);
for (auto i = 0u; i < val.size; ++i) {
result += castValueToString(*childType, values, dataVector);
result += (val.size - 1 == i ? "]" : ",");
result += (val.size - 1 == i ? "" : ",");
values += ListVector::getDataVector(listVector)->getNumBytesPerValue();
}
result += "]";
return result;
}

Expand All @@ -90,8 +91,9 @@ std::string TypeUtils::toString(const struct_entry_t& val, void* valVector) {
auto fieldVector = StructVector::getFieldVector(structVector, i);
auto value = fieldVector->getData() + fieldVector->getNumBytesPerValue() * val.pos;
result += castValueToString(*field->getType(), value, fieldVector.get());
result += (fields.size() - 1 == i ? "}" : ",");
result += (fields.size() - 1 == i ? "" : ",");
}
result += "}";
return result;
}

Expand Down
9 changes: 6 additions & 3 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "binder/query/bound_regular_query.h"
#include "common/copier_config/copier_config.h"
#include "expression_binder.h"
#include "parser/copy.h"
#include "parser/query/regular_query.h"
#include "query_normalizer.h"

Expand Down Expand Up @@ -92,8 +93,9 @@ class Binder {
catalog::NodeTableSchema::TableSchema* tableSchema, const std::string& propertyName);
common::LogicalType bindDataType(const std::string& dataType);

/*** bind copy csv ***/
std::unique_ptr<BoundStatement> bindCopyClause(const parser::Statement& statement);
/*** bind copy from/to ***/
std::unique_ptr<BoundStatement> bindCopyFromClause(const parser::Statement& statement);
std::unique_ptr<BoundStatement> bindCopyToClause(const parser::Statement& statement);

std::vector<std::string> bindFilePaths(const std::vector<std::string>& filePaths);

Expand All @@ -103,7 +105,8 @@ class Binder {
void bindStringParsingOptions(common::CSVReaderConfig& csvReaderConfig,
const std::string& optionName, std::string& optionValue);
char bindParsingOptionValue(std::string value);
common::CopyDescription::FileType bindFileType(std::vector<std::string> filePaths);
common::CopyDescription::FileType bindFileType(const std::vector<std::string>& filePaths);
common::CopyDescription::FileType bindFileType(const std::string& filePath);

/*** bind query ***/
std::unique_ptr<BoundRegularQuery> bindQuery(const parser::RegularQuery& regularQuery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
#include <vector>

#include "binder/bound_statement.h"
#include "binder/query/bound_regular_query.h"
#include "catalog/catalog_structs.h"
#include "common/copier_config/copier_config.h"

namespace kuzu {
namespace binder {

class BoundCopy : public BoundStatement {
class BoundCopyFrom : public BoundStatement {
public:
BoundCopy(
BoundCopyFrom(
common::CopyDescription copyDescription, common::table_id_t tableID, std::string tableName)
: BoundStatement{common::StatementType::COPY,
: BoundStatement{common::StatementType::COPY_FROM,
BoundStatementResult::createSingleStringColumnResult()},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)} {}

Expand Down
38 changes: 38 additions & 0 deletions src/include/binder/copy/bound_copy_to.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#pragma once

#include <string>
#include <unordered_map>
#include <vector>

#include "binder/bound_statement.h"
#include "binder/query/bound_regular_query.h"
#include "catalog/catalog_structs.h"
#include "common/copier_config/copier_config.h"

namespace kuzu {
namespace binder {

class BoundCopyTo : public BoundStatement {
public:
BoundCopyTo(
common::CopyDescription copyDescription, std::unique_ptr<BoundRegularQuery> regularQuery)
: BoundStatement{common::StatementType::COPY_TO, BoundStatementResult::createEmptyResult()},
regularQuery{std::move(regularQuery)}, copyDescription{std::move(copyDescription)} {}

inline common::CopyDescription getCopyDescription() const { return copyDescription; }

inline BoundRegularQuery* getRegularQuery() const { return regularQuery.get(); }

inline common::table_id_t getTableID() const { return tableID; }

inline std::string getTableName() const { return tableName; }

private:
common::CopyDescription copyDescription;
common::table_id_t tableID;
std::string tableName;
std::unique_ptr<BoundRegularQuery> regularQuery;
};

} // namespace binder
} // namespace kuzu
18 changes: 13 additions & 5 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,26 @@ struct CSVReaderConfig {
struct CopyDescription {
enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3 };

CopyDescription(const std::vector<std::string>& filePaths, CSVReaderConfig csvReaderConfig,
FileType fileType);
// Copy From
CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
CSVReaderConfig csvReaderConfig);

// Copy To
CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
const std::vector<std::string>& columnNames);

CopyDescription(const CopyDescription& copyDescription);

inline static std::string getFileTypeSuffix(FileType fileType) {
return "." + getFileTypeName(fileType);
}
inline static std::unordered_map<std::string, FileType> fileTypeMap{
{"unknown", FileType::UNKNOWN}, {".csv", FileType::CSV}, {".parquet", FileType::PARQUET},
{".npy", FileType::NPY}};

static FileType getFileTypeFromExtension(const std::string& extension);

static std::string getFileTypeName(FileType fileType);

const std::vector<std::string> filePaths;
const std::vector<std::string> columnNames;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;
FileType fileType;
};
Expand Down
5 changes: 3 additions & 2 deletions src/include/common/statement_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ enum class StatementType : uint8_t {
ADD_PROPERTY = 6,
DROP_PROPERTY = 7,
RENAME_PROPERTY = 8,
COPY = 20,
COPY_TO = 19,
COPY_FROM = 20,
STANDALONE_CALL = 21,
EXPLAIN = 22,
CREATE_MACRO = 23,
Expand All @@ -30,7 +31,7 @@ class StatementTypeUtils {
}

static bool isCopyCSV(StatementType statementType) {
return statementType == StatementType::COPY;
return statementType == StatementType::COPY_FROM;
}

static bool isCreateMacro(StatementType statementType) {
Expand Down
Loading

0 comments on commit 8e015b1

Please sign in to comment.