Skip to content

Commit

Permalink
Merge pull request #1985 from kuzudb/node-copy-refactor
Browse files Browse the repository at this point in the history
Reader compilation refactor
  • Loading branch information
andyfengHKU committed Sep 2, 2023
2 parents 8dd9ba4 + cb81646 commit bc16021
Show file tree
Hide file tree
Showing 31 changed files with 350 additions and 349 deletions.
72 changes: 51 additions & 21 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,68 @@ std::unique_ptr<BoundStatement> Binder::bindCopyToClause(const Statement& statem
if (fileType != CopyDescription::FileType::CSV) {
throw BinderException("COPY TO currently only supports csv files.");
}
return std::make_unique<BoundCopyTo>(
CopyDescription(std::vector<std::string>{boundFilePath}, fileType, columnNames),
std::move(query));
auto copyDescription = std::make_unique<CopyDescription>(
fileType, std::vector<std::string>{boundFilePath}, columnNames);
return std::make_unique<BoundCopyTo>(std::move(copyDescription), std::move(query));
}

// As a temporary constraint, we require npy files loaded with COPY FROM BY COLUMN keyword.
// And csv and parquet files loaded with COPY FROM keyword.
static void validateCopyNpyKeyword(
CopyDescription::FileType expectedType, CopyDescription::FileType actualType) {
if (expectedType == CopyDescription::FileType::UNKNOWN &&
actualType == CopyDescription::FileType::NPY) {
throw BinderException("Please use COPY FROM BY COLUMN statement for copying npy files.");
}
if (expectedType == CopyDescription::FileType::NPY && actualType != expectedType) {
throw BinderException("Please use COPY FROM statement for copying csv and parquet files.");
}
}

static void validateCopyNpyFilesMatchSchema(uint32_t numFiles, catalog::TableSchema* schema) {
if (schema->properties.size() != numFiles) {
throw BinderException(StringUtils::string_format(
"Number of npy files is not equal to number of properties in table {}.",
schema->tableName));
}
}

std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& statement) {
auto& copyStatement = (CopyFrom&)statement;
auto catalogContent = catalog.getReadOnlyVersion();
auto tableName = copyStatement.getTableName();
// Bind to table schema.
validateTableExist(catalog, tableName);
auto tableID = catalogContent->getTableID(tableName);
auto tableSchema = catalogContent->getTableSchema(tableID);
// Bind csv reader configuration
auto csvReaderConfig = bindParsingOptions(copyStatement.getParsingOptions());
auto boundFilePaths = bindFilePaths(copyStatement.getFilePaths());
// Bind file type.
auto actualFileType = bindFileType(boundFilePaths);
auto expectedFileType = copyStatement.getFileType();
if (expectedFileType == CopyDescription::FileType::UNKNOWN &&
actualFileType == CopyDescription::FileType::NPY) {
throw BinderException("Please use COPY FROM BY COLUMN statement for copying npy files.");
}
if (expectedFileType == CopyDescription::FileType::NPY && actualFileType != expectedFileType) {
throw BinderException("Please use COPY FROM statement for copying csv and parquet files.");
}
validateCopyNpyKeyword(expectedFileType, actualFileType);
if (actualFileType == CopyDescription::FileType::NPY) {
auto tableSchema = catalogContent->getTableSchema(tableID);
if (tableSchema->properties.size() != boundFilePaths.size()) {
throw BinderException(StringUtils::string_format(
"Number of npy files is not equal to number of properties in table {}.",
tableSchema->tableName));
validateCopyNpyFilesMatchSchema(boundFilePaths.size(), tableSchema);
}
// Bind execution mode.
// For CSV file, and table with SERIAL columns, we need to read in serial from files.
bool preservingOrder = actualFileType == CopyDescription::FileType::CSV;
expression_vector columnExpressions;
for (auto& property : tableSchema->getProperties()) {
if (property->getDataType()->getLogicalTypeID() != common::LogicalTypeID::SERIAL) {
columnExpressions.push_back(createVariable(
property->getName(), common::LogicalType{common::LogicalTypeID::ARROW_COLUMN}));
} else {
preservingOrder = true;
}
}
return std::make_unique<BoundCopyFrom>(
CopyDescription(boundFilePaths, actualFileType, csvReaderConfig), tableID, tableName);
auto copyDescription = std::make_unique<CopyDescription>(
actualFileType, boundFilePaths, std::move(csvReaderConfig));
auto nodeOffsetExpression =
createVariable("nodeOffset", common::LogicalType{common::LogicalTypeID::INT64});
return std::make_unique<BoundCopyFrom>(std::move(copyDescription), tableSchema,
std::move(columnExpressions), std::move(nodeOffsetExpression), preservingOrder);
}

std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& filePaths) {
Expand All @@ -76,9 +106,9 @@ std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& f
return boundFilePaths;
}

CSVReaderConfig Binder::bindParsingOptions(
std::unique_ptr<CSVReaderConfig> Binder::bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<ParsedExpression>>* parsingOptions) {
CSVReaderConfig csvReaderConfig;
auto csvReaderConfig = std::make_unique<CSVReaderConfig>();
for (auto& parsingOption : *parsingOptions) {
auto copyOptionName = parsingOption.first;
StringUtils::toUpper(copyOptionName);
Expand All @@ -91,7 +121,7 @@ CSVReaderConfig Binder::bindParsingOptions(
throw BinderException(
"The value type of parsing csv option " + copyOptionName + " must be boolean.");
}
csvReaderConfig.hasHeader =
csvReaderConfig->hasHeader =
((LiteralExpression&)(*boundCopyOptionExpression)).value->getValue<bool>();
} else if (boundCopyOptionExpression->dataType.getLogicalTypeID() ==
LogicalTypeID::STRING &&
Expand All @@ -102,7 +132,7 @@ CSVReaderConfig Binder::bindParsingOptions(
}
auto copyOptionValue =
((LiteralExpression&)(*boundCopyOptionExpression)).value->getValue<std::string>();
bindStringParsingOptions(csvReaderConfig, copyOptionName, copyOptionValue);
bindStringParsingOptions(*csvReaderConfig, copyOptionName, copyOptionValue);
} else {
throw BinderException("Unrecognized parsing csv option: " + copyOptionName + ".");
}
Expand Down
20 changes: 0 additions & 20 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,6 @@ using namespace kuzu::utf8proc;
namespace kuzu {
namespace common {

// Copy From
CopyDescription::CopyDescription(
const std::vector<std::string>& filePaths, FileType fileType, CSVReaderConfig csvReaderConfig)
: filePaths{filePaths}, csvReaderConfig{nullptr}, fileType{fileType} {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(csvReaderConfig);
}

// Copy To
CopyDescription::CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
const std::vector<std::string>& columnNames)
: filePaths{filePaths}, fileType{fileType}, columnNames{columnNames} {}

CopyDescription::CopyDescription(const CopyDescription& copyDescription)
: filePaths{copyDescription.filePaths}, csvReaderConfig{nullptr},
fileType{copyDescription.fileType}, columnNames{copyDescription.columnNames} {
if (copyDescription.csvReaderConfig != nullptr) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
}
}

CopyDescription::FileType CopyDescription::getFileTypeFromExtension(const std::string& extension) {
CopyDescription::FileType fileType = CopyDescription::fileTypeMap[extension];
if (fileType == FileType::UNKNOWN) {
Expand Down
2 changes: 1 addition & 1 deletion src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class Binder {

std::vector<std::string> bindFilePaths(const std::vector<std::string>& filePaths);

common::CSVReaderConfig bindParsingOptions(
std::unique_ptr<common::CSVReaderConfig> bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<parser::ParsedExpression>>*
parsingOptions);
void bindStringParsingOptions(common::CSVReaderConfig& csvReaderConfig,
Expand Down
32 changes: 20 additions & 12 deletions src/include/binder/copy/bound_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,30 @@ namespace binder {

class BoundCopyFrom : public BoundStatement {
public:
BoundCopyFrom(
common::CopyDescription copyDescription, common::table_id_t tableID, std::string tableName)
BoundCopyFrom(std::unique_ptr<common::CopyDescription> copyDescription,
catalog::TableSchema* tableSchema, expression_vector columnExpressions,
std::shared_ptr<Expression> nodeOffsetExpression, bool preservingOrder_)
: BoundStatement{common::StatementType::COPY_FROM,
BoundStatementResult::createSingleStringColumnResult()},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)} {}

inline common::CopyDescription getCopyDescription() const { return copyDescription; }

inline common::table_id_t getTableID() const { return tableID; }

inline std::string getTableName() const { return tableName; }
copyDescription{std::move(copyDescription)}, tableSchema{tableSchema},
columnExpressions{std::move(columnExpressions)},
nodeOffsetExpression{std::move(nodeOffsetExpression)}, preservingOrder_{
preservingOrder_} {}

inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); }
inline catalog::TableSchema* getTableSchema() const { return tableSchema; }
inline expression_vector getColumnExpressions() const { return columnExpressions; }
inline std::shared_ptr<Expression> getNodeOffsetExpression() const {
return nodeOffsetExpression;
}
inline bool preservingOrder() const { return preservingOrder_; }

private:
common::CopyDescription copyDescription;
common::table_id_t tableID;
std::string tableName;
std::unique_ptr<common::CopyDescription> copyDescription;
catalog::TableSchema* tableSchema;
expression_vector columnExpressions;
std::shared_ptr<Expression> nodeOffsetExpression;
bool preservingOrder_;
};

} // namespace binder
Expand Down
10 changes: 5 additions & 5 deletions src/include/binder/copy/bound_copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ namespace binder {

class BoundCopyTo : public BoundStatement {
public:
BoundCopyTo(
common::CopyDescription copyDescription, std::unique_ptr<BoundRegularQuery> regularQuery)
BoundCopyTo(std::unique_ptr<common::CopyDescription> copyDescription,
std::unique_ptr<BoundRegularQuery> regularQuery)
: BoundStatement{common::StatementType::COPY_TO, BoundStatementResult::createEmptyResult()},
regularQuery{std::move(regularQuery)}, copyDescription{std::move(copyDescription)} {}
copyDescription{std::move(copyDescription)}, regularQuery{std::move(regularQuery)} {}

inline common::CopyDescription getCopyDescription() const { return copyDescription; }
inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); }

inline BoundRegularQuery* getRegularQuery() const { return regularQuery.get(); }

private:
common::CopyDescription copyDescription;
std::unique_ptr<common::CopyDescription> copyDescription;
std::unique_ptr<BoundRegularQuery> regularQuery;
};

Expand Down
36 changes: 22 additions & 14 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
#include "common/types/types_include.h"
#include "common/types/value.h"

namespace spdlog {
class logger;
}

namespace kuzu {
namespace common {

Expand All @@ -33,15 +29,32 @@ struct CSVReaderConfig {
struct CopyDescription {
enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3 };

FileType fileType;
std::vector<std::string> filePaths;
std::vector<std::string> columnNames;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;

// Copy From
CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
CSVReaderConfig csvReaderConfig);
CopyDescription(FileType fileType, const std::vector<std::string>& filePaths,
std::unique_ptr<CSVReaderConfig> csvReaderConfig)
: fileType{fileType}, filePaths{filePaths}, csvReaderConfig{std::move(csvReaderConfig)} {}

// Copy To
CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
const std::vector<std::string>& columnNames);
CopyDescription(FileType fileType, const std::vector<std::string>& filePaths,
const std::vector<std::string>& columnNames)
: fileType{fileType}, filePaths{filePaths}, columnNames{columnNames}, csvReaderConfig{
nullptr} {}

CopyDescription(const CopyDescription& copyDescription);
CopyDescription(const CopyDescription& other)
: fileType{other.fileType}, filePaths{other.filePaths}, columnNames{other.columnNames} {
if (other.csvReaderConfig != nullptr) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*other.csvReaderConfig);
}
}

inline std::unique_ptr<CopyDescription> copy() const {
return std::make_unique<CopyDescription>(*this);
}

inline static std::unordered_map<std::string, FileType> fileTypeMap{
{"unknown", FileType::UNKNOWN}, {".csv", FileType::CSV}, {".parquet", FileType::PARQUET},
Expand All @@ -50,11 +63,6 @@ struct CopyDescription {
static FileType getFileTypeFromExtension(const std::string& extension);

static std::string getFileTypeName(FileType fileType);

const std::vector<std::string> filePaths;
const std::vector<std::string> columnNames;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;
FileType fileType;
};

} // namespace common
Expand Down
2 changes: 1 addition & 1 deletion src/include/parser/transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class Transformer {

std::unique_ptr<Statement> transformCopyTo(CypherParser::KU_CopyTOContext& ctx);

std::unique_ptr<Statement> transformCopyFromCSV(CypherParser::KU_CopyFromCSVContext& ctx);
std::unique_ptr<Statement> transformCopyFrom(CypherParser::KU_CopyFromCSVContext& ctx);

std::unique_ptr<Statement> transformCopyFromNPY(CypherParser::KU_CopyFromNPYContext& ctx);

Expand Down
44 changes: 18 additions & 26 deletions src/include/planner/logical_plan/copy/logical_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,53 +10,45 @@ namespace planner {
class LogicalCopyFrom : public LogicalOperator {

public:
LogicalCopyFrom(const common::CopyDescription& copyDescription, common::table_id_t tableID,
std::string tableName, bool hasSerial, binder::expression_vector dataColumnExpressions,
LogicalCopyFrom(std::unique_ptr<common::CopyDescription> copyDescription,
catalog::TableSchema* tableSchema, binder::expression_vector columnExpressions,
std::shared_ptr<binder::Expression> nodeOffsetExpression,
std::shared_ptr<binder::Expression> outputExpression)
std::shared_ptr<binder::Expression> outputExpression, bool orderPreserving_)
: LogicalOperator{LogicalOperatorType::COPY_FROM},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)},
preservingOrder{hasSerial}, dataColumnExpressions{std::move(dataColumnExpressions)},
nodeOffsetExpression{std::move(nodeOffsetExpression)}, outputExpression{
std::move(outputExpression)} {}
copyDescription{std::move(copyDescription)}, tableSchema{tableSchema},
columnExpressions{std::move(columnExpressions)}, nodeOffsetExpression{std::move(
nodeOffsetExpression)},
outputExpression{std::move(outputExpression)}, orderPreserving_{orderPreserving_} {}

inline std::string getExpressionsForPrinting() const override { return tableName; }

inline common::CopyDescription getCopyDescription() const { return copyDescription; }

inline common::table_id_t getTableID() const { return tableID; }

inline std::vector<std::shared_ptr<binder::Expression>> getDataColumnExpressions() const {
return dataColumnExpressions;
}
inline std::string getExpressionsForPrinting() const override { return tableSchema->tableName; }

inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); }
inline catalog::TableSchema* getTableSchema() const { return tableSchema; }
inline binder::expression_vector getColumnExpressions() const { return columnExpressions; }
inline std::shared_ptr<binder::Expression> getNodeOffsetExpression() const {
return nodeOffsetExpression;
}

inline std::shared_ptr<binder::Expression> getOutputExpression() const {
return outputExpression;
}

inline bool isPreservingOrder() { return preservingOrder; }
inline bool orderPreserving() { return orderPreserving_; }

void computeFactorizedSchema() override;
void computeFlatSchema() override;

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopyFrom>(copyDescription, tableID, tableName, preservingOrder,
dataColumnExpressions, nodeOffsetExpression, outputExpression);
return make_unique<LogicalCopyFrom>(copyDescription->copy(), tableSchema, columnExpressions,
nodeOffsetExpression, outputExpression, orderPreserving_);
}

private:
common::CopyDescription copyDescription;
common::table_id_t tableID;
// Used for printing only.
std::string tableName;
binder::expression_vector dataColumnExpressions;
std::unique_ptr<common::CopyDescription> copyDescription;
catalog::TableSchema* tableSchema;
binder::expression_vector columnExpressions;
std::shared_ptr<binder::Expression> nodeOffsetExpression;
std::shared_ptr<binder::Expression> outputExpression;
bool preservingOrder;
bool orderPreserving_;
};

} // namespace planner
Expand Down
13 changes: 7 additions & 6 deletions src/include/planner/logical_plan/copy/logical_copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,28 @@ namespace planner {

class LogicalCopyTo : public LogicalOperator {
public:
LogicalCopyTo(
std::shared_ptr<LogicalOperator> child, const common::CopyDescription& copyDescription)
: LogicalOperator{LogicalOperatorType::COPY_TO, child}, copyDescription{copyDescription} {}
LogicalCopyTo(std::shared_ptr<LogicalOperator> child,
std::unique_ptr<common::CopyDescription> copyDescription)
: LogicalOperator{LogicalOperatorType::COPY_TO, std::move(child)},
copyDescription{std::move(copyDescription)} {}

f_group_pos_set getGroupsPosToFlatten();

inline std::string getExpressionsForPrinting() const override { return std::string{}; }

inline common::CopyDescription getCopyDescription() const { return copyDescription; }
inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); }

void computeFactorizedSchema() override;

void computeFlatSchema() override;

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopyTo>(children[0]->copy(), copyDescription);
return make_unique<LogicalCopyTo>(children[0]->copy(), copyDescription->copy());
}

private:
std::shared_ptr<binder::Expression> outputExpression;
common::CopyDescription copyDescription;
std::unique_ptr<common::CopyDescription> copyDescription;
};

} // namespace planner
Expand Down
Loading

0 comments on commit bc16021

Please sign in to comment.