Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reader compilation refactor #1985

Merged
merged 1 commit into from
Sep 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 51 additions & 21 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,68 @@
if (fileType != CopyDescription::FileType::CSV) {
throw BinderException("COPY TO currently only supports csv files.");
}
return std::make_unique<BoundCopyTo>(
CopyDescription(std::vector<std::string>{boundFilePath}, fileType, columnNames),
std::move(query));
auto copyDescription = std::make_unique<CopyDescription>(
fileType, std::vector<std::string>{boundFilePath}, columnNames);
return std::make_unique<BoundCopyTo>(std::move(copyDescription), std::move(query));
}

// As a temporary constraint, we require npy files loaded with COPY FROM BY COLUMN keyword.
// And csv and parquet files loaded with COPY FROM keyword.
static void validateCopyNpyKeyword(
CopyDescription::FileType expectedType, CopyDescription::FileType actualType) {
if (expectedType == CopyDescription::FileType::UNKNOWN &&
actualType == CopyDescription::FileType::NPY) {
throw BinderException("Please use COPY FROM BY COLUMN statement for copying npy files.");

Check warning on line 39 in src/binder/bind/bind_copy.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_copy.cpp#L39

Added line #L39 was not covered by tests
}
if (expectedType == CopyDescription::FileType::NPY && actualType != expectedType) {
throw BinderException("Please use COPY FROM statement for copying csv and parquet files.");

Check warning on line 42 in src/binder/bind/bind_copy.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_copy.cpp#L42

Added line #L42 was not covered by tests
}
}

static void validateCopyNpyFilesMatchSchema(uint32_t numFiles, catalog::TableSchema* schema) {
if (schema->properties.size() != numFiles) {
throw BinderException(StringUtils::string_format(
"Number of npy files is not equal to number of properties in table {}.",
schema->tableName));
}
}

std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& statement) {
auto& copyStatement = (CopyFrom&)statement;
auto catalogContent = catalog.getReadOnlyVersion();
auto tableName = copyStatement.getTableName();
// Bind to table schema.
validateTableExist(catalog, tableName);
auto tableID = catalogContent->getTableID(tableName);
auto tableSchema = catalogContent->getTableSchema(tableID);
// Bind csv reader configuration
auto csvReaderConfig = bindParsingOptions(copyStatement.getParsingOptions());
auto boundFilePaths = bindFilePaths(copyStatement.getFilePaths());
// Bind file type.
auto actualFileType = bindFileType(boundFilePaths);
auto expectedFileType = copyStatement.getFileType();
if (expectedFileType == CopyDescription::FileType::UNKNOWN &&
actualFileType == CopyDescription::FileType::NPY) {
throw BinderException("Please use COPY FROM BY COLUMN statement for copying npy files.");
}
if (expectedFileType == CopyDescription::FileType::NPY && actualFileType != expectedFileType) {
throw BinderException("Please use COPY FROM statement for copying csv and parquet files.");
}
validateCopyNpyKeyword(expectedFileType, actualFileType);
if (actualFileType == CopyDescription::FileType::NPY) {
auto tableSchema = catalogContent->getTableSchema(tableID);
if (tableSchema->properties.size() != boundFilePaths.size()) {
throw BinderException(StringUtils::string_format(
"Number of npy files is not equal to number of properties in table {}.",
tableSchema->tableName));
validateCopyNpyFilesMatchSchema(boundFilePaths.size(), tableSchema);
}
// Bind execution mode.
// For CSV file, and table with SERIAL columns, we need to read in serial from files.
bool preservingOrder = actualFileType == CopyDescription::FileType::CSV;
expression_vector columnExpressions;
for (auto& property : tableSchema->getProperties()) {
if (property->getDataType()->getLogicalTypeID() != common::LogicalTypeID::SERIAL) {
columnExpressions.push_back(createVariable(
property->getName(), common::LogicalType{common::LogicalTypeID::ARROW_COLUMN}));
} else {
preservingOrder = true;
}
}
return std::make_unique<BoundCopyFrom>(
CopyDescription(boundFilePaths, actualFileType, csvReaderConfig), tableID, tableName);
auto copyDescription = std::make_unique<CopyDescription>(
actualFileType, boundFilePaths, std::move(csvReaderConfig));
auto nodeOffsetExpression =
createVariable("nodeOffset", common::LogicalType{common::LogicalTypeID::INT64});
return std::make_unique<BoundCopyFrom>(std::move(copyDescription), tableSchema,
std::move(columnExpressions), std::move(nodeOffsetExpression), preservingOrder);
}

std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& filePaths) {
Expand All @@ -76,9 +106,9 @@
return boundFilePaths;
}

CSVReaderConfig Binder::bindParsingOptions(
std::unique_ptr<CSVReaderConfig> Binder::bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<ParsedExpression>>* parsingOptions) {
CSVReaderConfig csvReaderConfig;
auto csvReaderConfig = std::make_unique<CSVReaderConfig>();
for (auto& parsingOption : *parsingOptions) {
auto copyOptionName = parsingOption.first;
StringUtils::toUpper(copyOptionName);
Expand All @@ -91,7 +121,7 @@
throw BinderException(
"The value type of parsing csv option " + copyOptionName + " must be boolean.");
}
csvReaderConfig.hasHeader =
csvReaderConfig->hasHeader =
((LiteralExpression&)(*boundCopyOptionExpression)).value->getValue<bool>();
} else if (boundCopyOptionExpression->dataType.getLogicalTypeID() ==
LogicalTypeID::STRING &&
Expand All @@ -102,7 +132,7 @@
}
auto copyOptionValue =
((LiteralExpression&)(*boundCopyOptionExpression)).value->getValue<std::string>();
bindStringParsingOptions(csvReaderConfig, copyOptionName, copyOptionValue);
bindStringParsingOptions(*csvReaderConfig, copyOptionName, copyOptionValue);
} else {
throw BinderException("Unrecognized parsing csv option: " + copyOptionName + ".");
}
Expand Down
20 changes: 0 additions & 20 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,6 @@ using namespace kuzu::utf8proc;
namespace kuzu {
namespace common {

// Copy From
CopyDescription::CopyDescription(
const std::vector<std::string>& filePaths, FileType fileType, CSVReaderConfig csvReaderConfig)
: filePaths{filePaths}, csvReaderConfig{nullptr}, fileType{fileType} {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(csvReaderConfig);
}

// Copy To
CopyDescription::CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
const std::vector<std::string>& columnNames)
: filePaths{filePaths}, fileType{fileType}, columnNames{columnNames} {}

CopyDescription::CopyDescription(const CopyDescription& copyDescription)
: filePaths{copyDescription.filePaths}, csvReaderConfig{nullptr},
fileType{copyDescription.fileType}, columnNames{copyDescription.columnNames} {
if (copyDescription.csvReaderConfig != nullptr) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
}
}

CopyDescription::FileType CopyDescription::getFileTypeFromExtension(const std::string& extension) {
CopyDescription::FileType fileType = CopyDescription::fileTypeMap[extension];
if (fileType == FileType::UNKNOWN) {
Expand Down
2 changes: 1 addition & 1 deletion src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class Binder {

std::vector<std::string> bindFilePaths(const std::vector<std::string>& filePaths);

common::CSVReaderConfig bindParsingOptions(
std::unique_ptr<common::CSVReaderConfig> bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<parser::ParsedExpression>>*
parsingOptions);
void bindStringParsingOptions(common::CSVReaderConfig& csvReaderConfig,
Expand Down
32 changes: 20 additions & 12 deletions src/include/binder/copy/bound_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,30 @@ namespace binder {

class BoundCopyFrom : public BoundStatement {
public:
BoundCopyFrom(
common::CopyDescription copyDescription, common::table_id_t tableID, std::string tableName)
BoundCopyFrom(std::unique_ptr<common::CopyDescription> copyDescription,
catalog::TableSchema* tableSchema, expression_vector columnExpressions,
std::shared_ptr<Expression> nodeOffsetExpression, bool preservingOrder_)
: BoundStatement{common::StatementType::COPY_FROM,
BoundStatementResult::createSingleStringColumnResult()},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)} {}

inline common::CopyDescription getCopyDescription() const { return copyDescription; }

inline common::table_id_t getTableID() const { return tableID; }

inline std::string getTableName() const { return tableName; }
copyDescription{std::move(copyDescription)}, tableSchema{tableSchema},
columnExpressions{std::move(columnExpressions)},
nodeOffsetExpression{std::move(nodeOffsetExpression)}, preservingOrder_{
preservingOrder_} {}

inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); }
inline catalog::TableSchema* getTableSchema() const { return tableSchema; }
inline expression_vector getColumnExpressions() const { return columnExpressions; }
inline std::shared_ptr<Expression> getNodeOffsetExpression() const {
return nodeOffsetExpression;
}
inline bool preservingOrder() const { return preservingOrder_; }

private:
common::CopyDescription copyDescription;
common::table_id_t tableID;
std::string tableName;
std::unique_ptr<common::CopyDescription> copyDescription;
catalog::TableSchema* tableSchema;
expression_vector columnExpressions;
std::shared_ptr<Expression> nodeOffsetExpression;
bool preservingOrder_;
};

} // namespace binder
Expand Down
10 changes: 5 additions & 5 deletions src/include/binder/copy/bound_copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ namespace binder {

class BoundCopyTo : public BoundStatement {
public:
BoundCopyTo(
common::CopyDescription copyDescription, std::unique_ptr<BoundRegularQuery> regularQuery)
BoundCopyTo(std::unique_ptr<common::CopyDescription> copyDescription,
std::unique_ptr<BoundRegularQuery> regularQuery)
: BoundStatement{common::StatementType::COPY_TO, BoundStatementResult::createEmptyResult()},
regularQuery{std::move(regularQuery)}, copyDescription{std::move(copyDescription)} {}
copyDescription{std::move(copyDescription)}, regularQuery{std::move(regularQuery)} {}

inline common::CopyDescription getCopyDescription() const { return copyDescription; }
inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); }

inline BoundRegularQuery* getRegularQuery() const { return regularQuery.get(); }

private:
common::CopyDescription copyDescription;
std::unique_ptr<common::CopyDescription> copyDescription;
std::unique_ptr<BoundRegularQuery> regularQuery;
};

Expand Down
36 changes: 22 additions & 14 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
#include "common/types/types_include.h"
#include "common/types/value.h"

namespace spdlog {
class logger;
}

namespace kuzu {
namespace common {

Expand All @@ -33,15 +29,32 @@ struct CSVReaderConfig {
struct CopyDescription {
enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3 };

FileType fileType;
std::vector<std::string> filePaths;
std::vector<std::string> columnNames;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;

// Copy From
CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
CSVReaderConfig csvReaderConfig);
CopyDescription(FileType fileType, const std::vector<std::string>& filePaths,
std::unique_ptr<CSVReaderConfig> csvReaderConfig)
: fileType{fileType}, filePaths{filePaths}, csvReaderConfig{std::move(csvReaderConfig)} {}

// Copy To
CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
const std::vector<std::string>& columnNames);
CopyDescription(FileType fileType, const std::vector<std::string>& filePaths,
const std::vector<std::string>& columnNames)
: fileType{fileType}, filePaths{filePaths}, columnNames{columnNames}, csvReaderConfig{
nullptr} {}

CopyDescription(const CopyDescription& copyDescription);
CopyDescription(const CopyDescription& other)
: fileType{other.fileType}, filePaths{other.filePaths}, columnNames{other.columnNames} {
if (other.csvReaderConfig != nullptr) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*other.csvReaderConfig);
}
}

inline std::unique_ptr<CopyDescription> copy() const {
return std::make_unique<CopyDescription>(*this);
}

inline static std::unordered_map<std::string, FileType> fileTypeMap{
{"unknown", FileType::UNKNOWN}, {".csv", FileType::CSV}, {".parquet", FileType::PARQUET},
Expand All @@ -50,11 +63,6 @@ struct CopyDescription {
static FileType getFileTypeFromExtension(const std::string& extension);

static std::string getFileTypeName(FileType fileType);

const std::vector<std::string> filePaths;
const std::vector<std::string> columnNames;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;
FileType fileType;
};

} // namespace common
Expand Down
2 changes: 1 addition & 1 deletion src/include/parser/transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class Transformer {

std::unique_ptr<Statement> transformCopyTo(CypherParser::KU_CopyTOContext& ctx);

std::unique_ptr<Statement> transformCopyFromCSV(CypherParser::KU_CopyFromCSVContext& ctx);
std::unique_ptr<Statement> transformCopyFrom(CypherParser::KU_CopyFromCSVContext& ctx);

std::unique_ptr<Statement> transformCopyFromNPY(CypherParser::KU_CopyFromNPYContext& ctx);

Expand Down
44 changes: 18 additions & 26 deletions src/include/planner/logical_plan/copy/logical_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,53 +10,45 @@ namespace planner {
class LogicalCopyFrom : public LogicalOperator {

public:
LogicalCopyFrom(const common::CopyDescription& copyDescription, common::table_id_t tableID,
std::string tableName, bool hasSerial, binder::expression_vector dataColumnExpressions,
LogicalCopyFrom(std::unique_ptr<common::CopyDescription> copyDescription,
catalog::TableSchema* tableSchema, binder::expression_vector columnExpressions,
std::shared_ptr<binder::Expression> nodeOffsetExpression,
std::shared_ptr<binder::Expression> outputExpression)
std::shared_ptr<binder::Expression> outputExpression, bool orderPreserving_)
: LogicalOperator{LogicalOperatorType::COPY_FROM},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)},
preservingOrder{hasSerial}, dataColumnExpressions{std::move(dataColumnExpressions)},
nodeOffsetExpression{std::move(nodeOffsetExpression)}, outputExpression{
std::move(outputExpression)} {}
copyDescription{std::move(copyDescription)}, tableSchema{tableSchema},
columnExpressions{std::move(columnExpressions)}, nodeOffsetExpression{std::move(
nodeOffsetExpression)},
outputExpression{std::move(outputExpression)}, orderPreserving_{orderPreserving_} {}

inline std::string getExpressionsForPrinting() const override { return tableName; }

inline common::CopyDescription getCopyDescription() const { return copyDescription; }

inline common::table_id_t getTableID() const { return tableID; }

inline std::vector<std::shared_ptr<binder::Expression>> getDataColumnExpressions() const {
return dataColumnExpressions;
}
inline std::string getExpressionsForPrinting() const override { return tableSchema->tableName; }

inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); }
inline catalog::TableSchema* getTableSchema() const { return tableSchema; }
inline binder::expression_vector getColumnExpressions() const { return columnExpressions; }
inline std::shared_ptr<binder::Expression> getNodeOffsetExpression() const {
return nodeOffsetExpression;
}

inline std::shared_ptr<binder::Expression> getOutputExpression() const {
return outputExpression;
}

inline bool isPreservingOrder() { return preservingOrder; }
inline bool orderPreserving() { return orderPreserving_; }

void computeFactorizedSchema() override;
void computeFlatSchema() override;

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopyFrom>(copyDescription, tableID, tableName, preservingOrder,
dataColumnExpressions, nodeOffsetExpression, outputExpression);
return make_unique<LogicalCopyFrom>(copyDescription->copy(), tableSchema, columnExpressions,
nodeOffsetExpression, outputExpression, orderPreserving_);
}

private:
common::CopyDescription copyDescription;
common::table_id_t tableID;
// Used for printing only.
std::string tableName;
binder::expression_vector dataColumnExpressions;
std::unique_ptr<common::CopyDescription> copyDescription;
catalog::TableSchema* tableSchema;
binder::expression_vector columnExpressions;
std::shared_ptr<binder::Expression> nodeOffsetExpression;
std::shared_ptr<binder::Expression> outputExpression;
bool preservingOrder;
bool orderPreserving_;
};

} // namespace planner
Expand Down
13 changes: 7 additions & 6 deletions src/include/planner/logical_plan/copy/logical_copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,28 @@ namespace planner {

class LogicalCopyTo : public LogicalOperator {
public:
LogicalCopyTo(
std::shared_ptr<LogicalOperator> child, const common::CopyDescription& copyDescription)
: LogicalOperator{LogicalOperatorType::COPY_TO, child}, copyDescription{copyDescription} {}
LogicalCopyTo(std::shared_ptr<LogicalOperator> child,
std::unique_ptr<common::CopyDescription> copyDescription)
: LogicalOperator{LogicalOperatorType::COPY_TO, std::move(child)},
copyDescription{std::move(copyDescription)} {}

f_group_pos_set getGroupsPosToFlatten();

inline std::string getExpressionsForPrinting() const override { return std::string{}; }

inline common::CopyDescription getCopyDescription() const { return copyDescription; }
inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); }

void computeFactorizedSchema() override;

void computeFlatSchema() override;

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopyTo>(children[0]->copy(), copyDescription);
return make_unique<LogicalCopyTo>(children[0]->copy(), copyDescription->copy());
}

private:
std::shared_ptr<binder::Expression> outputExpression;
common::CopyDescription copyDescription;
std::unique_ptr<common::CopyDescription> copyDescription;
};

} // namespace planner
Expand Down
Loading