Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move CopyRel to query processing pipeline #1990

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.8.2 LANGUAGES CXX)
project(Kuzu VERSION 0.0.8.3 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
1 change: 1 addition & 0 deletions src/binder/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
add_subdirectory(bind)
add_subdirectory(bind_expression)
add_subdirectory(copy)
add_subdirectory(expression)
add_subdirectory(query)
add_subdirectory(rewriter)
Expand Down
78 changes: 62 additions & 16 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
#include "binder/copy/bound_copy_to.h"
#include "binder/expression/literal_expression.h"
#include "common/string_utils.h"
#include "common/table_type.h"
#include "parser/copy.h"

using namespace kuzu::binder;
using namespace kuzu::catalog;
using namespace kuzu::common;
using namespace kuzu::parser;

Expand Down Expand Up @@ -36,21 +39,60 @@
CopyDescription::FileType expectedType, CopyDescription::FileType actualType) {
if (expectedType == CopyDescription::FileType::UNKNOWN &&
actualType == CopyDescription::FileType::NPY) {
throw BinderException("Please use COPY FROM BY COLUMN statement for copying npy files.");
throw BinderException(ExceptionMessage::validateCopyNPYByColumnException());

Check warning on line 42 in src/binder/bind/bind_copy.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_copy.cpp#L42

Added line #L42 was not covered by tests
}
if (expectedType == CopyDescription::FileType::NPY && actualType != expectedType) {
throw BinderException("Please use COPY FROM statement for copying csv and parquet files.");
throw BinderException(ExceptionMessage::validateCopyCSVParquetByColumnException());

Check warning on line 45 in src/binder/bind/bind_copy.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_copy.cpp#L45

Added line #L45 was not covered by tests
}
}

static void validateCopyNpyFilesMatchSchema(uint32_t numFiles, catalog::TableSchema* schema) {
static void validateCopyNpyFilesMatchSchema(uint32_t numFiles, TableSchema* schema) {
if (schema->properties.size() != numFiles) {
throw BinderException(StringUtils::string_format(
"Number of npy files is not equal to number of properties in table {}.",
schema->tableName));
}
}

static void validateCopyNpyNotForRelTables(TableSchema* schema) {
if (schema->tableType == TableType::REL) {
throw BinderException(
ExceptionMessage::validateCopyNpyNotForRelTablesException(schema->tableName));
}
}

expression_vector Binder::bindColumnExpressions(TableSchema* tableSchema) {
expression_vector columnExpressions;
if (tableSchema->tableType == TableType::REL) {
ray6080 marked this conversation as resolved.
Show resolved Hide resolved
// For rel table, add FROM and TO columns as data columns to be read from file.
columnExpressions.push_back(createVariable(std::string(Property::REL_FROM_PROPERTY_NAME),
LogicalType{LogicalTypeID::ARROW_COLUMN}));
columnExpressions.push_back(createVariable(
std::string(Property::REL_TO_PROPERTY_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN}));
}
for (auto& property : tableSchema->properties) {
if (property->getDataType()->getLogicalTypeID() == LogicalTypeID::SERIAL ||
TableSchema::isReservedPropertyName(property->getName())) {
continue;
} else {
columnExpressions.push_back(
createVariable(property->getName(), LogicalType{LogicalTypeID::ARROW_COLUMN}));
}
}
return columnExpressions;
}

Check warning on line 83 in src/binder/bind/bind_copy.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_copy.cpp#L83

Added line #L83 was not covered by tests

bool Binder::bindPreservingOrder(TableSchema* tableSchema, CopyDescription::FileType fileType) {
bool preservingOrder = fileType == CopyDescription::FileType::CSV;
for (auto& property : tableSchema->properties) {
if (property->getDataType()->getLogicalTypeID() == LogicalTypeID::SERIAL) {
preservingOrder = true;
break;
}
}
return preservingOrder;
}

std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& statement) {
auto& copyStatement = (CopyFrom&)statement;
auto catalogContent = catalog.getReadOnlyVersion();
Expand All @@ -68,25 +110,29 @@
validateCopyNpyKeyword(expectedFileType, actualFileType);
if (actualFileType == CopyDescription::FileType::NPY) {
validateCopyNpyFilesMatchSchema(boundFilePaths.size(), tableSchema);
validateCopyNpyNotForRelTables(tableSchema);
}
// Bind execution mode.
// For CSV file, and table with SERIAL columns, we need to read in serial from files.
bool preservingOrder = actualFileType == CopyDescription::FileType::CSV;
expression_vector columnExpressions;
for (auto& property : tableSchema->getProperties()) {
if (property->getDataType()->getLogicalTypeID() != common::LogicalTypeID::SERIAL) {
columnExpressions.push_back(createVariable(
property->getName(), common::LogicalType{common::LogicalTypeID::ARROW_COLUMN}));
} else {
preservingOrder = true;
}
}
auto preservingOrder = bindPreservingOrder(tableSchema, actualFileType);
// Bind expressions.
auto columnExpressions = bindColumnExpressions(tableSchema);
auto copyDescription = std::make_unique<CopyDescription>(
actualFileType, boundFilePaths, std::move(csvReaderConfig));
auto nodeOffsetExpression =
createVariable("nodeOffset", common::LogicalType{common::LogicalTypeID::INT64});
return std::make_unique<BoundCopyFrom>(std::move(copyDescription), tableSchema,
std::move(columnExpressions), std::move(nodeOffsetExpression), preservingOrder);
createVariable(std::string(Property::OFFSET_NAME), LogicalType{LogicalTypeID::INT64});
auto boundOffsetExpression = tableSchema->tableType == TableType::REL ?
createVariable(std::string(Property::REL_BOUND_OFFSET_NAME),
LogicalType{LogicalTypeID::ARROW_COLUMN}) :
nullptr;
auto nbrOffsetExpression = tableSchema->tableType == TableType::REL ?
createVariable(std::string(Property::REL_NBR_OFFSET_NAME),
LogicalType{LogicalTypeID::ARROW_COLUMN}) :
nullptr;
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(std::move(copyDescription),
tableSchema, std::move(columnExpressions), std::move(nodeOffsetExpression),
std::move(boundOffsetExpression), std::move(nbrOffsetExpression), preservingOrder);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& filePaths) {
Expand Down
13 changes: 6 additions & 7 deletions src/binder/bind/bind_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ std::unique_ptr<BoundStatement> Binder::bindCreateRdfGraphClause(
StatementType::CREATE_RDF_GRAPH, rdfGraphName, std::move(boundCreateInfo));
}

std::unique_ptr<BoundStatement> Binder::bindDropTableClause(const parser::Statement& statement) {
std::unique_ptr<BoundStatement> Binder::bindDropTableClause(const Statement& statement) {
auto& dropTable = (DropTable&)statement;
auto tableName = dropTable.getTableName();
validateTableExist(catalog, tableName);
Expand All @@ -132,7 +132,7 @@ std::unique_ptr<BoundStatement> Binder::bindDropTableClause(const parser::Statem
return make_unique<BoundDropTable>(tableID, tableName);
}

std::unique_ptr<BoundStatement> Binder::bindRenameTableClause(const parser::Statement& statement) {
std::unique_ptr<BoundStatement> Binder::bindRenameTableClause(const Statement& statement) {
auto renameTable = (RenameTable&)statement;
auto tableName = renameTable.getTableName();
auto catalogContent = catalog.getReadOnlyVersion();
Expand All @@ -144,7 +144,7 @@ std::unique_ptr<BoundStatement> Binder::bindRenameTableClause(const parser::Stat
catalogContent->getTableID(tableName), tableName, renameTable.getNewName());
}

std::unique_ptr<BoundStatement> Binder::bindAddPropertyClause(const parser::Statement& statement) {
std::unique_ptr<BoundStatement> Binder::bindAddPropertyClause(const Statement& statement) {
auto& addProperty = (AddProperty&)statement;
auto tableName = addProperty.getTableName();
validateTableExist(catalog, tableName);
Expand All @@ -163,23 +163,22 @@ std::unique_ptr<BoundStatement> Binder::bindAddPropertyClause(const parser::Stat
tableID, addProperty.getPropertyName(), std::move(dataType), defaultVal, tableName);
}

std::unique_ptr<BoundStatement> Binder::bindDropPropertyClause(const parser::Statement& statement) {
std::unique_ptr<BoundStatement> Binder::bindDropPropertyClause(const Statement& statement) {
auto& dropProperty = (DropProperty&)statement;
auto tableName = dropProperty.getTableName();
validateTableExist(catalog, tableName);
auto catalogContent = catalog.getReadOnlyVersion();
auto tableID = catalogContent->getTableID(tableName);
auto tableSchema = catalogContent->getTableSchema(tableID);
auto propertyID = bindPropertyName(tableSchema, dropProperty.getPropertyName());
if (tableSchema->getTableType() == catalog::TableType::NODE &&
if (tableSchema->getTableType() == TableType::NODE &&
reinterpret_cast<NodeTableSchema*>(tableSchema)->getPrimaryKeyPropertyID() == propertyID) {
throw BinderException("Cannot drop primary key of a node table.");
}
return make_unique<BoundDropProperty>(tableID, propertyID, tableName);
}

std::unique_ptr<BoundStatement> Binder::bindRenamePropertyClause(
const parser::Statement& statement) {
std::unique_ptr<BoundStatement> Binder::bindRenamePropertyClause(const Statement& statement) {
auto& renameProperty = (RenameProperty&)statement;
auto tableName = renameProperty.getTableName();
validateTableExist(catalog, tableName);
Expand Down
8 changes: 8 additions & 0 deletions src/binder/copy/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
add_library(
kuzu_binder_copy
OBJECT
bound_copy_from.cpp)

set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:kuzu_binder_copy>
PARENT_SCOPE)
20 changes: 20 additions & 0 deletions src/binder/copy/bound_copy_from.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include "binder/copy/bound_copy_from.h"

namespace kuzu {
namespace binder {

std::unique_ptr<BoundCopyFromInfo> BoundCopyFromInfo::copy() {
expression_vector copiedColumnExpressions;
copiedColumnExpressions.reserve(columnExpressions.size());
for (const auto& columnExpression : columnExpressions) {
copiedColumnExpressions.push_back(columnExpression->copy());
}
return std::make_unique<BoundCopyFromInfo>(copyDesc->copy(), tableSchema,
std::move(copiedColumnExpressions), offsetExpression->copy(),
tableSchema->tableType == common::TableType::REL ? boundOffsetExpression->copy() : nullptr,
tableSchema->tableType == common::TableType::REL ? nbrOffsetExpression->copy() : nullptr,
preservingOrder);
}

} // namespace binder
} // namespace kuzu
17 changes: 9 additions & 8 deletions src/catalog/catalog_content.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "common/string_utils.h"
#include "storage/storage_utils.h"

using namespace kuzu::binder;
using namespace kuzu::common;
using namespace kuzu::storage;
using namespace kuzu::transaction;
Expand All @@ -28,11 +29,11 @@ static void assignPropertyIDAndTableID(
}
}

table_id_t CatalogContent::addNodeTableSchema(const binder::BoundCreateTableInfo& info) {
table_id_t CatalogContent::addNodeTableSchema(const BoundCreateTableInfo& info) {
table_id_t tableID = assignNextTableID();
auto properties = Property::copy(info.properties);
assignPropertyIDAndTableID(properties, tableID);
auto extraInfo = (binder::BoundNodeExtraCreateTableInfo*)info.extraInfo.get();
auto extraInfo = (BoundNodeExtraCreateTableInfo*)info.extraInfo.get();
auto nodeTableSchema = std::make_unique<NodeTableSchema>(
info.tableName, tableID, extraInfo->primaryKeyIdx, std::move(properties));
tableNameToIDMap.emplace(nodeTableSchema->tableName, tableID);
Expand All @@ -47,12 +48,12 @@ static void addRelInternalIDProperty(std::vector<std::unique_ptr<Property>>& pro
properties.insert(properties.begin(), std::move(relInternalIDProperty));
}

table_id_t CatalogContent::addRelTableSchema(const binder::BoundCreateTableInfo& info) {
table_id_t CatalogContent::addRelTableSchema(const BoundCreateTableInfo& info) {
table_id_t tableID = assignNextTableID();
auto properties = Property::copy(info.properties);
addRelInternalIDProperty(properties);
assignPropertyIDAndTableID(properties, tableID);
auto extraInfo = (binder::BoundRelExtraCreateTableInfo*)info.extraInfo.get();
auto extraInfo = (BoundRelExtraCreateTableInfo*)info.extraInfo.get();
getNodeTableSchema(extraInfo->srcTableID)->addFwdRelTableID(tableID);
getNodeTableSchema(extraInfo->dstTableID)->addBwdRelTableID(tableID);
auto relTableSchema = std::make_unique<RelTableSchema>(info.tableName, tableID,
Expand All @@ -63,13 +64,13 @@ table_id_t CatalogContent::addRelTableSchema(const binder::BoundCreateTableInfo&
return tableID;
}

common::table_id_t CatalogContent::addRdfGraphSchema(const binder::BoundCreateTableInfo& info) {
common::table_id_t CatalogContent::addRdfGraphSchema(const BoundCreateTableInfo& info) {
table_id_t rdfGraphID = assignNextTableID();
auto extraInfo = (binder::BoundRdfExtraCreateTableInfo*)info.extraInfo.get();
auto extraInfo = (BoundRdfExtraCreateTableInfo*)info.extraInfo.get();
auto nodeInfo = extraInfo->nodeInfo.get();
auto relInfo = extraInfo->relInfo.get();
auto nodeExtraInfo = (binder::BoundNodeExtraCreateTableInfo*)nodeInfo->extraInfo.get();
auto relExtraInfo = (binder::BoundRelExtraCreateTableInfo*)relInfo->extraInfo.get();
auto nodeExtraInfo = (BoundNodeExtraCreateTableInfo*)nodeInfo->extraInfo.get();
auto relExtraInfo = (BoundRelExtraCreateTableInfo*)relInfo->extraInfo.get();
// Node table schema
auto nodeTableID = addNodeTableSchema(*nodeInfo);
// Rel table schema
Expand Down
13 changes: 13 additions & 0 deletions src/common/exception.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,18 @@ std::string ExceptionMessage::overLargeStringValueException(const std::string& l
"Maximum length of strings is 4096. Input string's length is {}.", length);
}

std::string ExceptionMessage::violateUniquenessOfRelAdjColumn(const std::string& tableName,
ray6080 marked this conversation as resolved.
Show resolved Hide resolved
const std::string& offset, const std::string& multiplicity, const std::string& direction) {
return StringUtils::string_format("RelTable {} is a {} table, but node(nodeOffset: {}) "
"has more than one neighbour in the {} direction.",
tableName, offset, multiplicity, direction);
}

std::string ExceptionMessage::validateCopyNpyNotForRelTablesException(
const std::string& tableName) {
return StringUtils::string_format(
"Copy from npy files to rel table {} is not supported yet.", tableName);
}

} // namespace common
} // namespace kuzu
1 change: 1 addition & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ void ValueVector::initializeValueBuffer() {

void ArrowColumnVector::setArrowColumn(
ValueVector* vector, std::shared_ptr<arrow::ChunkedArray> column) {
assert(vector->dataType.getLogicalTypeID() == LogicalTypeID::ARROW_COLUMN);
auto arrowColumnBuffer =
reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get());
arrowColumnBuffer->column = std::move(column);
Expand Down
10 changes: 7 additions & 3 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,23 @@ class Binder {
std::unique_ptr<common::LogicalType> bindDataType(const std::string& dataType);

/*** bind copy from/to ***/
static bool bindPreservingOrder(
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
expression_vector bindColumnExpressions(catalog::TableSchema* tableSchema);
std::unique_ptr<BoundStatement> bindCopyFromClause(const parser::Statement& statement);
std::unique_ptr<BoundStatement> bindCopyToClause(const parser::Statement& statement);

std::vector<std::string> bindFilePaths(const std::vector<std::string>& filePaths);
static std::vector<std::string> bindFilePaths(const std::vector<std::string>& filePaths);

std::unique_ptr<common::CSVReaderConfig> bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<parser::ParsedExpression>>*
parsingOptions);
void bindStringParsingOptions(common::CSVReaderConfig& csvReaderConfig,
const std::string& optionName, std::string& optionValue);
char bindParsingOptionValue(std::string value);
common::CopyDescription::FileType bindFileType(const std::vector<std::string>& filePaths);
common::CopyDescription::FileType bindFileType(const std::string& filePath);
static common::CopyDescription::FileType bindFileType(
const std::vector<std::string>& filePaths);
static common::CopyDescription::FileType bindFileType(const std::string& filePath);

/*** bind query ***/
std::unique_ptr<BoundRegularQuery> bindQuery(const parser::RegularQuery& regularQuery);
Expand Down
50 changes: 30 additions & 20 deletions src/include/binder/copy/bound_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,42 @@
namespace kuzu {
namespace binder {

struct BoundCopyFromInfo {
std::unique_ptr<common::CopyDescription> copyDesc;
catalog::TableSchema* tableSchema;
expression_vector columnExpressions;
std::shared_ptr<Expression> offsetExpression;
// `boundOffsetExpression` and `nbrOffsetExpression` are for rel tables only.
std::shared_ptr<Expression> boundOffsetExpression;
ray6080 marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<Expression> nbrOffsetExpression;

bool preservingOrder;

BoundCopyFromInfo(std::unique_ptr<common::CopyDescription> copyDesc,
catalog::TableSchema* tableSchema, expression_vector columnExpressions,
std::shared_ptr<Expression> offsetExpression,
std::shared_ptr<Expression> boundOffsetExpression,
std::shared_ptr<Expression> nbrOffsetExpression, bool preservingOrder)
: copyDesc{std::move(copyDesc)}, tableSchema{tableSchema}, columnExpressions{std::move(
columnExpressions)},
offsetExpression{std::move(offsetExpression)}, boundOffsetExpression{std::move(
boundOffsetExpression)},
nbrOffsetExpression{std::move(nbrOffsetExpression)}, preservingOrder{preservingOrder} {}

std::unique_ptr<BoundCopyFromInfo> copy();
};

class BoundCopyFrom : public BoundStatement {
public:
BoundCopyFrom(std::unique_ptr<common::CopyDescription> copyDescription,
catalog::TableSchema* tableSchema, expression_vector columnExpressions,
std::shared_ptr<Expression> nodeOffsetExpression, bool preservingOrder_)
explicit BoundCopyFrom(std::unique_ptr<BoundCopyFromInfo> info)
: BoundStatement{common::StatementType::COPY_FROM,
BoundStatementResult::createSingleStringColumnResult()},
copyDescription{std::move(copyDescription)}, tableSchema{tableSchema},
columnExpressions{std::move(columnExpressions)},
nodeOffsetExpression{std::move(nodeOffsetExpression)}, preservingOrder_{
preservingOrder_} {}

inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); }
inline catalog::TableSchema* getTableSchema() const { return tableSchema; }
inline expression_vector getColumnExpressions() const { return columnExpressions; }
inline std::shared_ptr<Expression> getNodeOffsetExpression() const {
return nodeOffsetExpression;
}
inline bool preservingOrder() const { return preservingOrder_; }
info{std::move(info)} {}

inline BoundCopyFromInfo* getInfo() const { return info.get(); }

private:
std::unique_ptr<common::CopyDescription> copyDescription;
catalog::TableSchema* tableSchema;
expression_vector columnExpressions;
std::shared_ptr<Expression> nodeOffsetExpression;
bool preservingOrder_;
std::unique_ptr<BoundCopyFromInfo> info;
};

} // namespace binder
Expand Down
Loading