Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement copy rdf rel table #2033

Merged
merged 1 commit into from
Sep 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dataset/copy-test/node/turtle/copy.cypher

This file was deleted.

2 changes: 2 additions & 0 deletions dataset/copy-test/rdf/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
COPY taxonomy_RESOURCE FROM "dataset/copy-test/rdf/taxonomy.ttl" ;
COPY taxonomy_TRIPLES FROM "dataset/copy-test/rdf/taxonomy.ttl" ;
49 changes: 34 additions & 15 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,24 +191,27 @@
auto nodeOffset =
createVariable(std::string(Property::OFFSET_NAME), LogicalType{LogicalTypeID::INT64});
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(std::move(copyDescription),
tableSchema, std::move(columns), std::move(nodeOffset), nullptr, nullptr, containsSerial);
tableSchema, std::move(columns), std::move(nodeOffset), nullptr /* boundOffsetExpression */,
nullptr /* nbrOffsetExpression */, nullptr /* predicateOffsetExpression */, containsSerial);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

std::unique_ptr<BoundStatement> Binder::bindCopyRelFrom(
std::unique_ptr<CopyDescription> copyDescription, TableSchema* tableSchema) {
// For table with SERIAL columns, we need to read in serial from files.
auto containsSerial = bindContainsSerial(tableSchema);
auto columns = bindCopyRelColumns(tableSchema);
auto columns = bindCopyRelColumns(tableSchema, copyDescription->fileType);
auto nodeOffset =
createVariable(std::string(Property::OFFSET_NAME), LogicalType{LogicalTypeID::INT64});
auto boundOffset = createVariable(
std::string(Property::REL_BOUND_OFFSET_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN});
auto nbrOffset = createVariable(
std::string(Property::REL_NBR_OFFSET_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN});
auto boundCopyFromInfo =
std::make_unique<BoundCopyFromInfo>(std::move(copyDescription), tableSchema,
std::move(columns), std::move(nodeOffset), boundOffset, nbrOffset, containsSerial);
auto predicateOffset = createVariable(
std::string(Property::REL_PREDICATE_OFFSET_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN});
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(std::move(copyDescription),
tableSchema, std::move(columns), std::move(nodeOffset), std::move(boundOffset),
std::move(nbrOffset), std::move(predicateOffset), containsSerial);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

Expand Down Expand Up @@ -253,18 +256,34 @@
return columnExpressions;
}

expression_vector Binder::bindCopyRelColumns(TableSchema* tableSchema) {
expression_vector Binder::bindCopyRelColumns(
TableSchema* tableSchema, CopyDescription::FileType fileType) {
expression_vector columnExpressions;
columnExpressions.push_back(createVariable(
std::string(Property::REL_FROM_PROPERTY_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN}));
columnExpressions.push_back(createVariable(
std::string(Property::REL_TO_PROPERTY_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN}));
for (auto& property : tableSchema->properties) {
if (skipPropertyInFile(*property)) {
continue;
}
switch (fileType) {
case common::CopyDescription::FileType::TURTLE: {
columnExpressions.push_back(createVariable("SUBJECT", LogicalType{LogicalTypeID::STRING}));
columnExpressions.push_back(
createVariable(property->getName(), LogicalType{LogicalTypeID::ARROW_COLUMN}));
createVariable("PREDICATE", LogicalType{LogicalTypeID::STRING}));
columnExpressions.push_back(createVariable("OBJECT", LogicalType{LogicalTypeID::STRING}));
} break;
case common::CopyDescription::FileType::CSV:
case common::CopyDescription::FileType::PARQUET:
case common::CopyDescription::FileType::NPY: {
columnExpressions.push_back(createVariable(std::string(Property::REL_FROM_PROPERTY_NAME),
LogicalType{LogicalTypeID::ARROW_COLUMN}));
columnExpressions.push_back(createVariable(
std::string(Property::REL_TO_PROPERTY_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN}));
for (auto& property : tableSchema->properties) {
if (skipPropertyInFile(*property)) {
continue;
}
columnExpressions.push_back(
createVariable(property->getName(), LogicalType{LogicalTypeID::ARROW_COLUMN}));
}
} break;
default: {
throw NotImplementedException{"Binder::bindCopyRelColumns"};

Check warning on line 285 in src/binder/bind/bind_copy.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_copy.cpp#L284-L285

Added lines #L284 - L285 were not covered by tests
}
}
return columnExpressions;
}
Expand Down
2 changes: 2 additions & 0 deletions src/binder/copy/bound_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ std::unique_ptr<BoundCopyFromInfo> BoundCopyFromInfo::copy() {
std::move(copiedColumnExpressions), offsetExpression->copy(),
tableSchema->tableType == common::TableType::REL ? boundOffsetExpression->copy() : nullptr,
tableSchema->tableType == common::TableType::REL ? nbrOffsetExpression->copy() : nullptr,
tableSchema->tableType == common::TableType::REL ? predicateOffsetExpression->copy() :
nullptr,
containsSerial);
}

Expand Down
6 changes: 6 additions & 0 deletions src/common/data_chunk/data_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,11 @@ void DataChunk::insert(uint32_t pos, std::shared_ptr<ValueVector> valueVector) {
valueVectors[pos] = std::move(valueVector);
}

void DataChunk::resetAuxiliaryBuffer() {
for (auto& valueVector : valueVectors) {
valueVector->resetAuxiliaryBuffer();
}
}

} // namespace common
} // namespace kuzu
3 changes: 2 additions & 1 deletion src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ class Binder {
catalog::TableSchema* tableSchema);
expression_vector bindCopyNodeColumns(
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
expression_vector bindCopyRelColumns(catalog::TableSchema* tableSchema);
expression_vector bindCopyRelColumns(
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
std::unique_ptr<BoundStatement> bindCopyToClause(const parser::Statement& statement);
std::unique_ptr<common::CSVReaderConfig> bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<parser::ParsedExpression>>&
Expand Down
16 changes: 10 additions & 6 deletions src/include/binder/copy/bound_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,23 @@ struct BoundCopyFromInfo {
// `boundOffsetExpression` and `nbrOffsetExpression` are for rel tables only.
std::shared_ptr<Expression> boundOffsetExpression;
std::shared_ptr<Expression> nbrOffsetExpression;
std::shared_ptr<Expression> predicateOffsetExpression;

bool containsSerial;

BoundCopyFromInfo(std::unique_ptr<common::CopyDescription> copyDesc,
catalog::TableSchema* tableSchema, expression_vector columnExpressions,
std::shared_ptr<Expression> offsetExpression,
std::shared_ptr<Expression> boundOffsetExpression,
std::shared_ptr<Expression> nbrOffsetExpression, bool containsSerial)
: copyDesc{std::move(copyDesc)}, tableSchema{tableSchema}, columnExpressions{std::move(
columnExpressions)},
offsetExpression{std::move(offsetExpression)}, boundOffsetExpression{std::move(
boundOffsetExpression)},
nbrOffsetExpression{std::move(nbrOffsetExpression)}, containsSerial{containsSerial} {}
std::shared_ptr<Expression> nbrOffsetExpression,
std::shared_ptr<Expression> predicateOffsetExpression, bool containsSerial)
: copyDesc{std::move(copyDesc)}, tableSchema{tableSchema},
columnExpressions{std::move(columnExpressions)}, offsetExpression{std::move(
offsetExpression)},
boundOffsetExpression{std::move(boundOffsetExpression)}, nbrOffsetExpression{std::move(
nbrOffsetExpression)},
predicateOffsetExpression{std::move(predicateOffsetExpression)}, containsSerial{
containsSerial} {}

std::unique_ptr<BoundCopyFromInfo> copy();
};
Expand Down
1 change: 1 addition & 0 deletions src/include/catalog/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Property {
static constexpr std::string_view OFFSET_NAME = "_OFFSET_";
static constexpr std::string_view REL_BOUND_OFFSET_NAME = "_BOUND_OFFSET_";
static constexpr std::string_view REL_NBR_OFFSET_NAME = "_NBR_OFFSET_";
static constexpr std::string_view REL_PREDICATE_OFFSET_NAME = "_PREDICATE_OFFSET_";

Property(std::string name, std::unique_ptr<common::LogicalType> dataType)
: Property{std::move(name), std::move(dataType), common::INVALID_PROPERTY_ID,
Expand Down
5 changes: 3 additions & 2 deletions src/include/catalog/table_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ class TableSchema {
TableSchema(std::string tableName, common::table_id_t tableID, common::TableType tableType,
std::vector<std::unique_ptr<Property>> properties)
: tableName{std::move(tableName)}, tableID{tableID}, tableType{tableType},
properties{std::move(properties)},
nextPropertyID{(common::property_id_t)this->properties.size()}, comment{} {}
properties{std::move(properties)}, nextPropertyID{(
common::property_id_t)this->properties.size()},
comment{"" /* empty comment */} {}
TableSchema(common::TableType tableType, std::string tableName, common::table_id_t tableID,
std::vector<std::unique_ptr<Property>> properties, std::string comment,
common::property_id_t nextPropertyID)
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/data_chunk/data_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class DataChunk {

void insert(uint32_t pos, std::shared_ptr<ValueVector> valueVector);

void resetAuxiliaryBuffer();

inline uint32_t getNumValueVectors() const { return valueVectors.size(); }

inline std::shared_ptr<ValueVector> getValueVector(uint64_t valueVectorPos) {
Expand Down
5 changes: 5 additions & 0 deletions src/include/processor/operator/index_lookup.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ class IndexLookup : public PhysicalOperator {
void indexLookup(const IndexLookupInfo& info);
static void lookupOnArray(
const IndexLookupInfo& info, arrow::Array* array, common::offset_t* offsets);
void fillOffsetArraysFromArrowVector(const IndexLookupInfo& info,
common::ValueVector* keyVector,
std::vector<std::shared_ptr<arrow::Array>>& offsetArraysVector);
void fillOffsetArraysFromVector(const IndexLookupInfo& info, common::ValueVector* keyVector,
std::vector<std::shared_ptr<arrow::Array>>& offsetArraysVector);

private:
std::vector<std::unique_ptr<IndexLookupInfo>> infos;
Expand Down
5 changes: 3 additions & 2 deletions src/include/processor/plan_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ class PlanMapper {
const std::vector<std::shared_ptr<binder::Expression>>& dataColumnExpressions,
const std::shared_ptr<binder::Expression>& offsetExpression, bool readingInSerial);
std::unique_ptr<PhysicalOperator> createIndexLookup(catalog::RelTableSchema* tableSchema,
const std::vector<DataPos>& dataPoses, const DataPos& boundOffsetDataPos,
const DataPos& nbrOffsetDataPos, std::unique_ptr<PhysicalOperator> readerOp);
std::vector<DataPos>& dataPoses, const DataPos& boundOffsetDataPos,
const DataPos& nbrOffsetDataPos, const DataPos& predicateOffsetDataPos,
std::unique_ptr<PhysicalOperator> readerOp, common::CopyDescription::FileType fileType);
std::unique_ptr<PhysicalOperator> createCopyRelColumnsOrLists(
std::shared_ptr<CopyRelSharedState> sharedState, planner::LogicalCopyFrom* copyFrom,
bool isColumns, std::unique_ptr<PhysicalOperator> copyRelColumns);
Expand Down
6 changes: 6 additions & 0 deletions src/include/storage/index/hash_index_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@
inline bool lookup(int64_t key, common::offset_t& result) {
return lookupInternalWithoutLock(reinterpret_cast<const uint8_t*>(&key), result);
}
inline bool lookup(const char* key, common::offset_t& result) {

Check warning on line 95 in src/include/storage/index/hash_index_builder.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/index/hash_index_builder.h#L95

Added line #L95 was not covered by tests
return lookupInternalWithoutLock(reinterpret_cast<const uint8_t*>(key), result);
}

// Non-thread safe. This should only be called in the copyCSV and never be called in parallel.
void flush();
Expand Down Expand Up @@ -167,6 +170,9 @@
hashIndexBuilderForInt64->lookup(key, result) :
hashIndexBuilderForString->lookup(key, result);
}
inline bool lookup(const char* key, common::offset_t& result) {
return hashIndexBuilderForString->lookup(key, result);
}

// Non-thread safe. This should only be called in the copyCSV and never be called in parallel.
inline void flush() {
Expand Down
2 changes: 2 additions & 0 deletions src/planner/operator/copy/logical_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ void LogicalCopyFrom::computeFactorizedSchema() {
if (info->tableSchema->tableType == TableType::REL) {
schema->insertToGroupAndScope(info->boundOffsetExpression, unflatGroup);
schema->insertToGroupAndScope(info->nbrOffsetExpression, unflatGroup);
schema->insertToGroupAndScope(info->predicateOffsetExpression, unflatGroup);
}
schema->insertToGroupAndScope(info->offsetExpression, flatGroup);
schema->insertToGroupAndScope(outputExpression, flatGroup);
Expand All @@ -26,6 +27,7 @@ void LogicalCopyFrom::computeFlatSchema() {
if (info->tableSchema->tableType == TableType::REL) {
schema->insertToGroupAndScope(info->boundOffsetExpression, 0);
schema->insertToGroupAndScope(info->nbrOffsetExpression, 0);
schema->insertToGroupAndScope(info->predicateOffsetExpression, 0);
}
schema->insertToGroupAndScope(info->offsetExpression, 0);
schema->insertToGroupAndScope(outputExpression, 0);
Expand Down
31 changes: 25 additions & 6 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNodeFrom(
}

std::unique_ptr<PhysicalOperator> PlanMapper::createIndexLookup(RelTableSchema* tableSchema,
const std::vector<DataPos>& dataPoses, const DataPos& boundOffsetDataPos,
const DataPos& nbrOffsetDataPos, std::unique_ptr<PhysicalOperator> readerOp) {
std::vector<DataPos>& dataPoses, const DataPos& boundOffsetDataPos,
const DataPos& nbrOffsetDataPos, const DataPos& predicateOffsetDataPos,
std::unique_ptr<PhysicalOperator> readerOp, CopyDescription::FileType fileType) {
auto boundNodeTableID = tableSchema->getBoundTableID(RelDataDirection::FWD);
auto boundNodePKIndex =
storageManager.getNodesStore().getNodeTable(boundNodeTableID)->getPKIndex();
Expand All @@ -88,8 +89,18 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createIndexLookup(RelTableSchema*
indexLookupInfos.push_back(
std::make_unique<IndexLookupInfo>(tableSchema->getSrcPKDataType()->copy(), boundNodePKIndex,
dataPoses[0], boundOffsetDataPos));
indexLookupInfos.push_back(std::make_unique<IndexLookupInfo>(
tableSchema->getDstPKDataType()->copy(), nbrNodePKIndex, dataPoses[1], nbrOffsetDataPos));
if (fileType == CopyDescription::FileType::TURTLE) {
indexLookupInfos.push_back(
std::make_unique<IndexLookupInfo>(std::make_unique<LogicalType>(LogicalTypeID::STRING),
boundNodePKIndex, dataPoses[1], predicateOffsetDataPos));
indexLookupInfos.push_back(
std::make_unique<IndexLookupInfo>(tableSchema->getDstPKDataType()->copy(),
nbrNodePKIndex, dataPoses[2], nbrOffsetDataPos));
} else {
indexLookupInfos.push_back(
std::make_unique<IndexLookupInfo>(tableSchema->getDstPKDataType()->copy(),
nbrNodePKIndex, dataPoses[1], nbrOffsetDataPos));
}
return std::make_unique<IndexLookup>(
std::move(indexLookupInfos), std::move(readerOp), getOperatorID(), tableSchema->tableName);
}
Expand All @@ -110,9 +121,17 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createCopyRelColumnsOrLists(
DataPos{outFSchema->getExpressionPos(*copyFromInfo->boundOffsetExpression)};
auto nbrOffsetDataPos =
DataPos{outFSchema->getExpressionPos(*copyFromInfo->nbrOffsetExpression)};
auto predicateOffsetDataPos =
DataPos{outFSchema->getExpressionPos(*copyFromInfo->predicateOffsetExpression)};
auto indexLookup = createIndexLookup(tableSchema, readerInfo->dataColumnsPos,
boundOffsetDataPos, nbrOffsetDataPos, std::move(reader));
CopyRelInfo copyRelInfo{tableSchema, readerInfo->dataColumnsPos, offsetDataPos,
boundOffsetDataPos, nbrOffsetDataPos, predicateOffsetDataPos, std::move(reader),
copyFromInfo->copyDesc->fileType);
auto dataColumnPosCopy = readerInfo->dataColumnsPos;
if (copyFromInfo->copyDesc->fileType == CopyDescription::FileType::TURTLE) {
dataColumnPosCopy[1] = dataColumnPosCopy[2];
dataColumnPosCopy[2] = predicateOffsetDataPos;
}
CopyRelInfo copyRelInfo{tableSchema, std::move(dataColumnPosCopy), offsetDataPos,
boundOffsetDataPos, nbrOffsetDataPos, storageManager.getWAL()};
if (isColumns) {
return std::make_unique<CopyRelColumns>(copyRelInfo, std::move(sharedState),
Expand Down
Loading