Skip to content

Commit

Permalink
Merge pull request #1990 from kuzudb/node-group
Browse files Browse the repository at this point in the history
Move CopyRel to query processing pipeline
  • Loading branch information
ray6080 committed Sep 6, 2023
2 parents 650cabf + af05d5f commit bf7414f
Show file tree
Hide file tree
Showing 81 changed files with 1,361 additions and 764 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.8.2 LANGUAGES CXX)
project(Kuzu VERSION 0.0.8.3 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
1 change: 1 addition & 0 deletions src/binder/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
add_subdirectory(bind)
add_subdirectory(bind_expression)
add_subdirectory(copy)
add_subdirectory(expression)
add_subdirectory(query)
add_subdirectory(rewriter)
Expand Down
78 changes: 62 additions & 16 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
#include "binder/copy/bound_copy_to.h"
#include "binder/expression/literal_expression.h"
#include "common/string_utils.h"
#include "common/table_type.h"
#include "parser/copy.h"

using namespace kuzu::binder;
using namespace kuzu::catalog;
using namespace kuzu::common;
using namespace kuzu::parser;

Expand Down Expand Up @@ -36,21 +39,60 @@ static void validateCopyNpyKeyword(
CopyDescription::FileType expectedType, CopyDescription::FileType actualType) {
if (expectedType == CopyDescription::FileType::UNKNOWN &&
actualType == CopyDescription::FileType::NPY) {
throw BinderException("Please use COPY FROM BY COLUMN statement for copying npy files.");
throw BinderException(ExceptionMessage::validateCopyNPYByColumnException());
}
if (expectedType == CopyDescription::FileType::NPY && actualType != expectedType) {
throw BinderException("Please use COPY FROM statement for copying csv and parquet files.");
throw BinderException(ExceptionMessage::validateCopyCSVParquetByColumnException());
}
}

static void validateCopyNpyFilesMatchSchema(uint32_t numFiles, catalog::TableSchema* schema) {
static void validateCopyNpyFilesMatchSchema(uint32_t numFiles, TableSchema* schema) {
if (schema->properties.size() != numFiles) {
throw BinderException(StringUtils::string_format(
"Number of npy files is not equal to number of properties in table {}.",
schema->tableName));
}
}

static void validateCopyNpyNotForRelTables(TableSchema* schema) {
if (schema->tableType == TableType::REL) {
throw BinderException(
ExceptionMessage::validateCopyNpyNotForRelTablesException(schema->tableName));
}
}

expression_vector Binder::bindColumnExpressions(TableSchema* tableSchema) {
expression_vector columnExpressions;
if (tableSchema->tableType == TableType::REL) {
// For rel table, add FROM and TO columns as data columns to be read from file.
columnExpressions.push_back(createVariable(std::string(Property::REL_FROM_PROPERTY_NAME),
LogicalType{LogicalTypeID::ARROW_COLUMN}));
columnExpressions.push_back(createVariable(
std::string(Property::REL_TO_PROPERTY_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN}));
}
for (auto& property : tableSchema->properties) {
if (property->getDataType()->getLogicalTypeID() == LogicalTypeID::SERIAL ||
TableSchema::isReservedPropertyName(property->getName())) {
continue;
} else {
columnExpressions.push_back(
createVariable(property->getName(), LogicalType{LogicalTypeID::ARROW_COLUMN}));
}
}
return columnExpressions;
}

bool Binder::bindPreservingOrder(TableSchema* tableSchema, CopyDescription::FileType fileType) {
bool preservingOrder = fileType == CopyDescription::FileType::CSV;
for (auto& property : tableSchema->properties) {
if (property->getDataType()->getLogicalTypeID() == LogicalTypeID::SERIAL) {
preservingOrder = true;
break;
}
}
return preservingOrder;
}

std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& statement) {
auto& copyStatement = (CopyFrom&)statement;
auto catalogContent = catalog.getReadOnlyVersion();
Expand All @@ -68,25 +110,29 @@ std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& stat
validateCopyNpyKeyword(expectedFileType, actualFileType);
if (actualFileType == CopyDescription::FileType::NPY) {
validateCopyNpyFilesMatchSchema(boundFilePaths.size(), tableSchema);
validateCopyNpyNotForRelTables(tableSchema);
}
// Bind execution mode.
// For CSV file, and table with SERIAL columns, we need to read in serial from files.
bool preservingOrder = actualFileType == CopyDescription::FileType::CSV;
expression_vector columnExpressions;
for (auto& property : tableSchema->getProperties()) {
if (property->getDataType()->getLogicalTypeID() != common::LogicalTypeID::SERIAL) {
columnExpressions.push_back(createVariable(
property->getName(), common::LogicalType{common::LogicalTypeID::ARROW_COLUMN}));
} else {
preservingOrder = true;
}
}
auto preservingOrder = bindPreservingOrder(tableSchema, actualFileType);
// Bind expressions.
auto columnExpressions = bindColumnExpressions(tableSchema);
auto copyDescription = std::make_unique<CopyDescription>(
actualFileType, boundFilePaths, std::move(csvReaderConfig));
auto nodeOffsetExpression =
createVariable("nodeOffset", common::LogicalType{common::LogicalTypeID::INT64});
return std::make_unique<BoundCopyFrom>(std::move(copyDescription), tableSchema,
std::move(columnExpressions), std::move(nodeOffsetExpression), preservingOrder);
createVariable(std::string(Property::OFFSET_NAME), LogicalType{LogicalTypeID::INT64});
auto boundOffsetExpression = tableSchema->tableType == TableType::REL ?
createVariable(std::string(Property::REL_BOUND_OFFSET_NAME),
LogicalType{LogicalTypeID::ARROW_COLUMN}) :
nullptr;
auto nbrOffsetExpression = tableSchema->tableType == TableType::REL ?
createVariable(std::string(Property::REL_NBR_OFFSET_NAME),
LogicalType{LogicalTypeID::ARROW_COLUMN}) :
nullptr;
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(std::move(copyDescription),
tableSchema, std::move(columnExpressions), std::move(nodeOffsetExpression),
std::move(boundOffsetExpression), std::move(nbrOffsetExpression), preservingOrder);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& filePaths) {
Expand Down
13 changes: 6 additions & 7 deletions src/binder/bind/bind_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ std::unique_ptr<BoundStatement> Binder::bindCreateRdfGraphClause(
StatementType::CREATE_RDF_GRAPH, rdfGraphName, std::move(boundCreateInfo));
}

std::unique_ptr<BoundStatement> Binder::bindDropTableClause(const parser::Statement& statement) {
std::unique_ptr<BoundStatement> Binder::bindDropTableClause(const Statement& statement) {
auto& dropTable = (DropTable&)statement;
auto tableName = dropTable.getTableName();
validateTableExist(catalog, tableName);
Expand All @@ -132,7 +132,7 @@ std::unique_ptr<BoundStatement> Binder::bindDropTableClause(const parser::Statem
return make_unique<BoundDropTable>(tableID, tableName);
}

std::unique_ptr<BoundStatement> Binder::bindRenameTableClause(const parser::Statement& statement) {
std::unique_ptr<BoundStatement> Binder::bindRenameTableClause(const Statement& statement) {
auto renameTable = (RenameTable&)statement;
auto tableName = renameTable.getTableName();
auto catalogContent = catalog.getReadOnlyVersion();
Expand All @@ -144,7 +144,7 @@ std::unique_ptr<BoundStatement> Binder::bindRenameTableClause(const parser::Stat
catalogContent->getTableID(tableName), tableName, renameTable.getNewName());
}

std::unique_ptr<BoundStatement> Binder::bindAddPropertyClause(const parser::Statement& statement) {
std::unique_ptr<BoundStatement> Binder::bindAddPropertyClause(const Statement& statement) {
auto& addProperty = (AddProperty&)statement;
auto tableName = addProperty.getTableName();
validateTableExist(catalog, tableName);
Expand All @@ -163,23 +163,22 @@ std::unique_ptr<BoundStatement> Binder::bindAddPropertyClause(const parser::Stat
tableID, addProperty.getPropertyName(), std::move(dataType), defaultVal, tableName);
}

std::unique_ptr<BoundStatement> Binder::bindDropPropertyClause(const parser::Statement& statement) {
std::unique_ptr<BoundStatement> Binder::bindDropPropertyClause(const Statement& statement) {
auto& dropProperty = (DropProperty&)statement;
auto tableName = dropProperty.getTableName();
validateTableExist(catalog, tableName);
auto catalogContent = catalog.getReadOnlyVersion();
auto tableID = catalogContent->getTableID(tableName);
auto tableSchema = catalogContent->getTableSchema(tableID);
auto propertyID = bindPropertyName(tableSchema, dropProperty.getPropertyName());
if (tableSchema->getTableType() == catalog::TableType::NODE &&
if (tableSchema->getTableType() == TableType::NODE &&
reinterpret_cast<NodeTableSchema*>(tableSchema)->getPrimaryKeyPropertyID() == propertyID) {
throw BinderException("Cannot drop primary key of a node table.");
}
return make_unique<BoundDropProperty>(tableID, propertyID, tableName);
}

std::unique_ptr<BoundStatement> Binder::bindRenamePropertyClause(
const parser::Statement& statement) {
std::unique_ptr<BoundStatement> Binder::bindRenamePropertyClause(const Statement& statement) {
auto& renameProperty = (RenameProperty&)statement;
auto tableName = renameProperty.getTableName();
validateTableExist(catalog, tableName);
Expand Down
8 changes: 8 additions & 0 deletions src/binder/copy/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
add_library(
kuzu_binder_copy
OBJECT
bound_copy_from.cpp)

set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:kuzu_binder_copy>
PARENT_SCOPE)
20 changes: 20 additions & 0 deletions src/binder/copy/bound_copy_from.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include "binder/copy/bound_copy_from.h"

namespace kuzu {
namespace binder {

std::unique_ptr<BoundCopyFromInfo> BoundCopyFromInfo::copy() {
expression_vector copiedColumnExpressions;
copiedColumnExpressions.reserve(columnExpressions.size());
for (const auto& columnExpression : columnExpressions) {
copiedColumnExpressions.push_back(columnExpression->copy());
}
return std::make_unique<BoundCopyFromInfo>(copyDesc->copy(), tableSchema,
std::move(copiedColumnExpressions), offsetExpression->copy(),
tableSchema->tableType == common::TableType::REL ? boundOffsetExpression->copy() : nullptr,
tableSchema->tableType == common::TableType::REL ? nbrOffsetExpression->copy() : nullptr,
preservingOrder);
}

} // namespace binder
} // namespace kuzu
17 changes: 9 additions & 8 deletions src/catalog/catalog_content.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "common/string_utils.h"
#include "storage/storage_utils.h"

using namespace kuzu::binder;
using namespace kuzu::common;
using namespace kuzu::storage;
using namespace kuzu::transaction;
Expand All @@ -28,11 +29,11 @@ static void assignPropertyIDAndTableID(
}
}

table_id_t CatalogContent::addNodeTableSchema(const binder::BoundCreateTableInfo& info) {
table_id_t CatalogContent::addNodeTableSchema(const BoundCreateTableInfo& info) {
table_id_t tableID = assignNextTableID();
auto properties = Property::copy(info.properties);
assignPropertyIDAndTableID(properties, tableID);
auto extraInfo = (binder::BoundNodeExtraCreateTableInfo*)info.extraInfo.get();
auto extraInfo = (BoundNodeExtraCreateTableInfo*)info.extraInfo.get();
auto nodeTableSchema = std::make_unique<NodeTableSchema>(
info.tableName, tableID, extraInfo->primaryKeyIdx, std::move(properties));
tableNameToIDMap.emplace(nodeTableSchema->tableName, tableID);
Expand All @@ -47,12 +48,12 @@ static void addRelInternalIDProperty(std::vector<std::unique_ptr<Property>>& pro
properties.insert(properties.begin(), std::move(relInternalIDProperty));
}

table_id_t CatalogContent::addRelTableSchema(const binder::BoundCreateTableInfo& info) {
table_id_t CatalogContent::addRelTableSchema(const BoundCreateTableInfo& info) {
table_id_t tableID = assignNextTableID();
auto properties = Property::copy(info.properties);
addRelInternalIDProperty(properties);
assignPropertyIDAndTableID(properties, tableID);
auto extraInfo = (binder::BoundRelExtraCreateTableInfo*)info.extraInfo.get();
auto extraInfo = (BoundRelExtraCreateTableInfo*)info.extraInfo.get();
getNodeTableSchema(extraInfo->srcTableID)->addFwdRelTableID(tableID);
getNodeTableSchema(extraInfo->dstTableID)->addBwdRelTableID(tableID);
auto relTableSchema = std::make_unique<RelTableSchema>(info.tableName, tableID,
Expand All @@ -63,13 +64,13 @@ table_id_t CatalogContent::addRelTableSchema(const binder::BoundCreateTableInfo&
return tableID;
}

common::table_id_t CatalogContent::addRdfGraphSchema(const binder::BoundCreateTableInfo& info) {
common::table_id_t CatalogContent::addRdfGraphSchema(const BoundCreateTableInfo& info) {
table_id_t rdfGraphID = assignNextTableID();
auto extraInfo = (binder::BoundRdfExtraCreateTableInfo*)info.extraInfo.get();
auto extraInfo = (BoundRdfExtraCreateTableInfo*)info.extraInfo.get();
auto nodeInfo = extraInfo->nodeInfo.get();
auto relInfo = extraInfo->relInfo.get();
auto nodeExtraInfo = (binder::BoundNodeExtraCreateTableInfo*)nodeInfo->extraInfo.get();
auto relExtraInfo = (binder::BoundRelExtraCreateTableInfo*)relInfo->extraInfo.get();
auto nodeExtraInfo = (BoundNodeExtraCreateTableInfo*)nodeInfo->extraInfo.get();
auto relExtraInfo = (BoundRelExtraCreateTableInfo*)relInfo->extraInfo.get();
// Node table schema
auto nodeTableID = addNodeTableSchema(*nodeInfo);
// Rel table schema
Expand Down
13 changes: 13 additions & 0 deletions src/common/exception.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,18 @@ std::string ExceptionMessage::overLargeStringValueException(const std::string& l
"Maximum length of strings is 4096. Input string's length is {}.", length);
}

std::string ExceptionMessage::violateUniquenessOfRelAdjColumn(const std::string& tableName,
const std::string& offset, const std::string& multiplicity, const std::string& direction) {
return StringUtils::string_format("RelTable {} is a {} table, but node(nodeOffset: {}) "
"has more than one neighbour in the {} direction.",
tableName, offset, multiplicity, direction);
}

std::string ExceptionMessage::validateCopyNpyNotForRelTablesException(
const std::string& tableName) {
return StringUtils::string_format(
"Copy from npy files to rel table {} is not supported yet.", tableName);
}

} // namespace common
} // namespace kuzu
1 change: 1 addition & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ void ValueVector::initializeValueBuffer() {

void ArrowColumnVector::setArrowColumn(
ValueVector* vector, std::shared_ptr<arrow::ChunkedArray> column) {
assert(vector->dataType.getLogicalTypeID() == LogicalTypeID::ARROW_COLUMN);
auto arrowColumnBuffer =
reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get());
arrowColumnBuffer->column = std::move(column);
Expand Down
10 changes: 7 additions & 3 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,23 @@ class Binder {
std::unique_ptr<common::LogicalType> bindDataType(const std::string& dataType);

/*** bind copy from/to ***/
static bool bindPreservingOrder(
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
expression_vector bindColumnExpressions(catalog::TableSchema* tableSchema);
std::unique_ptr<BoundStatement> bindCopyFromClause(const parser::Statement& statement);
std::unique_ptr<BoundStatement> bindCopyToClause(const parser::Statement& statement);

std::vector<std::string> bindFilePaths(const std::vector<std::string>& filePaths);
static std::vector<std::string> bindFilePaths(const std::vector<std::string>& filePaths);

std::unique_ptr<common::CSVReaderConfig> bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<parser::ParsedExpression>>*
parsingOptions);
void bindStringParsingOptions(common::CSVReaderConfig& csvReaderConfig,
const std::string& optionName, std::string& optionValue);
char bindParsingOptionValue(std::string value);
common::CopyDescription::FileType bindFileType(const std::vector<std::string>& filePaths);
common::CopyDescription::FileType bindFileType(const std::string& filePath);
static common::CopyDescription::FileType bindFileType(
const std::vector<std::string>& filePaths);
static common::CopyDescription::FileType bindFileType(const std::string& filePath);

/*** bind query ***/
std::unique_ptr<BoundRegularQuery> bindQuery(const parser::RegularQuery& regularQuery);
Expand Down
50 changes: 30 additions & 20 deletions src/include/binder/copy/bound_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,42 @@
namespace kuzu {
namespace binder {

struct BoundCopyFromInfo {
std::unique_ptr<common::CopyDescription> copyDesc;
catalog::TableSchema* tableSchema;
expression_vector columnExpressions;
std::shared_ptr<Expression> offsetExpression;
// `boundOffsetExpression` and `nbrOffsetExpression` are for rel tables only.
std::shared_ptr<Expression> boundOffsetExpression;
std::shared_ptr<Expression> nbrOffsetExpression;

bool preservingOrder;

BoundCopyFromInfo(std::unique_ptr<common::CopyDescription> copyDesc,
catalog::TableSchema* tableSchema, expression_vector columnExpressions,
std::shared_ptr<Expression> offsetExpression,
std::shared_ptr<Expression> boundOffsetExpression,
std::shared_ptr<Expression> nbrOffsetExpression, bool preservingOrder)
: copyDesc{std::move(copyDesc)}, tableSchema{tableSchema}, columnExpressions{std::move(
columnExpressions)},
offsetExpression{std::move(offsetExpression)}, boundOffsetExpression{std::move(
boundOffsetExpression)},
nbrOffsetExpression{std::move(nbrOffsetExpression)}, preservingOrder{preservingOrder} {}

std::unique_ptr<BoundCopyFromInfo> copy();
};

class BoundCopyFrom : public BoundStatement {
public:
BoundCopyFrom(std::unique_ptr<common::CopyDescription> copyDescription,
catalog::TableSchema* tableSchema, expression_vector columnExpressions,
std::shared_ptr<Expression> nodeOffsetExpression, bool preservingOrder_)
explicit BoundCopyFrom(std::unique_ptr<BoundCopyFromInfo> info)
: BoundStatement{common::StatementType::COPY_FROM,
BoundStatementResult::createSingleStringColumnResult()},
copyDescription{std::move(copyDescription)}, tableSchema{tableSchema},
columnExpressions{std::move(columnExpressions)},
nodeOffsetExpression{std::move(nodeOffsetExpression)}, preservingOrder_{
preservingOrder_} {}

inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); }
inline catalog::TableSchema* getTableSchema() const { return tableSchema; }
inline expression_vector getColumnExpressions() const { return columnExpressions; }
inline std::shared_ptr<Expression> getNodeOffsetExpression() const {
return nodeOffsetExpression;
}
inline bool preservingOrder() const { return preservingOrder_; }
info{std::move(info)} {}

inline BoundCopyFromInfo* getInfo() const { return info.get(); }

private:
std::unique_ptr<common::CopyDescription> copyDescription;
catalog::TableSchema* tableSchema;
expression_vector columnExpressions;
std::shared_ptr<Expression> nodeOffsetExpression;
bool preservingOrder_;
std::unique_ptr<BoundCopyFromInfo> info;
};

} // namespace binder
Expand Down
Loading

0 comments on commit bf7414f

Please sign in to comment.