Skip to content

Commit

Permalink
Implement copy rdf rel table
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Sep 15, 2023
1 parent 0eed60d commit 6900b62
Show file tree
Hide file tree
Showing 24 changed files with 181 additions and 70 deletions.
1 change: 0 additions & 1 deletion dataset/copy-test/node/turtle/copy.cypher

This file was deleted.

2 changes: 2 additions & 0 deletions dataset/copy-test/rdf/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
COPY taxonomy_RESOURCE FROM "dataset/copy-test/rdf/taxonomy.ttl" ;
COPY taxonomy_TRIPLES FROM "dataset/copy-test/rdf/taxonomy.ttl" ;
File renamed without changes.
File renamed without changes.
49 changes: 34 additions & 15 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,24 +191,27 @@ std::unique_ptr<BoundStatement> Binder::bindCopyNodeFrom(
auto nodeOffset =
createVariable(std::string(Property::OFFSET_NAME), LogicalType{LogicalTypeID::INT64});
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(std::move(copyDescription),
tableSchema, std::move(columns), std::move(nodeOffset), nullptr, nullptr, containsSerial);
tableSchema, std::move(columns), std::move(nodeOffset), nullptr /* boundOffsetExpression */,
nullptr /* nbrOffsetExpression */, nullptr /* predicateOffsetExpression */, containsSerial);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

std::unique_ptr<BoundStatement> Binder::bindCopyRelFrom(
std::unique_ptr<CopyDescription> copyDescription, TableSchema* tableSchema) {
// For table with SERIAL columns, we need to read in serial from files.
auto containsSerial = bindContainsSerial(tableSchema);
auto columns = bindCopyRelColumns(tableSchema);
auto columns = bindCopyRelColumns(tableSchema, copyDescription->fileType);
auto nodeOffset =
createVariable(std::string(Property::OFFSET_NAME), LogicalType{LogicalTypeID::INT64});
auto boundOffset = createVariable(
std::string(Property::REL_BOUND_OFFSET_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN});
auto nbrOffset = createVariable(
std::string(Property::REL_NBR_OFFSET_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN});
auto boundCopyFromInfo =
std::make_unique<BoundCopyFromInfo>(std::move(copyDescription), tableSchema,
std::move(columns), std::move(nodeOffset), boundOffset, nbrOffset, containsSerial);
auto predicateOffset = createVariable(
std::string(Property::REL_PREDICATE_OFFSET_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN});
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(std::move(copyDescription),
tableSchema, std::move(columns), std::move(nodeOffset), std::move(boundOffset),
std::move(nbrOffset), std::move(predicateOffset), containsSerial);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

Expand Down Expand Up @@ -253,18 +256,34 @@ expression_vector Binder::bindCopyNodeColumns(
return columnExpressions;
}

expression_vector Binder::bindCopyRelColumns(TableSchema* tableSchema) {
expression_vector Binder::bindCopyRelColumns(
TableSchema* tableSchema, CopyDescription::FileType fileType) {
expression_vector columnExpressions;
columnExpressions.push_back(createVariable(
std::string(Property::REL_FROM_PROPERTY_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN}));
columnExpressions.push_back(createVariable(
std::string(Property::REL_TO_PROPERTY_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN}));
for (auto& property : tableSchema->properties) {
if (skipPropertyInFile(*property)) {
continue;
}
switch (fileType) {
case common::CopyDescription::FileType::TURTLE: {
columnExpressions.push_back(createVariable("SUBJECT", LogicalType{LogicalTypeID::STRING}));
columnExpressions.push_back(
createVariable(property->getName(), LogicalType{LogicalTypeID::ARROW_COLUMN}));
createVariable("PREDICATE", LogicalType{LogicalTypeID::STRING}));
columnExpressions.push_back(createVariable("OBJECT", LogicalType{LogicalTypeID::STRING}));
} break;
case common::CopyDescription::FileType::CSV:
case common::CopyDescription::FileType::PARQUET:
case common::CopyDescription::FileType::NPY: {
columnExpressions.push_back(createVariable(std::string(Property::REL_FROM_PROPERTY_NAME),
LogicalType{LogicalTypeID::ARROW_COLUMN}));
columnExpressions.push_back(createVariable(
std::string(Property::REL_TO_PROPERTY_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN}));
for (auto& property : tableSchema->properties) {
if (skipPropertyInFile(*property)) {
continue;
}
columnExpressions.push_back(
createVariable(property->getName(), LogicalType{LogicalTypeID::ARROW_COLUMN}));
}
} break;
default: {
throw NotImplementedException{"Binder::bindCopyRelColumns"};
}
}
return columnExpressions;
}
Expand Down
2 changes: 2 additions & 0 deletions src/binder/copy/bound_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ std::unique_ptr<BoundCopyFromInfo> BoundCopyFromInfo::copy() {
std::move(copiedColumnExpressions), offsetExpression->copy(),
tableSchema->tableType == common::TableType::REL ? boundOffsetExpression->copy() : nullptr,
tableSchema->tableType == common::TableType::REL ? nbrOffsetExpression->copy() : nullptr,
tableSchema->tableType == common::TableType::REL ? predicateOffsetExpression->copy() :
nullptr,
containsSerial);
}

Expand Down
6 changes: 6 additions & 0 deletions src/common/data_chunk/data_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,11 @@ void DataChunk::insert(uint32_t pos, std::shared_ptr<ValueVector> valueVector) {
valueVectors[pos] = std::move(valueVector);
}

void DataChunk::resetAuxiliaryBuffer() {
for (auto& valueVector : valueVectors) {
valueVector->resetAuxiliaryBuffer();
}
}

} // namespace common
} // namespace kuzu
3 changes: 2 additions & 1 deletion src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ class Binder {
catalog::TableSchema* tableSchema);
expression_vector bindCopyNodeColumns(
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
expression_vector bindCopyRelColumns(catalog::TableSchema* tableSchema);
expression_vector bindCopyRelColumns(
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
std::unique_ptr<BoundStatement> bindCopyToClause(const parser::Statement& statement);
std::unique_ptr<common::CSVReaderConfig> bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<parser::ParsedExpression>>&
Expand Down
16 changes: 10 additions & 6 deletions src/include/binder/copy/bound_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,23 @@ struct BoundCopyFromInfo {
// `boundOffsetExpression` and `nbrOffsetExpression` are for rel tables only.
std::shared_ptr<Expression> boundOffsetExpression;
std::shared_ptr<Expression> nbrOffsetExpression;
std::shared_ptr<Expression> predicateOffsetExpression;

bool containsSerial;

BoundCopyFromInfo(std::unique_ptr<common::CopyDescription> copyDesc,
catalog::TableSchema* tableSchema, expression_vector columnExpressions,
std::shared_ptr<Expression> offsetExpression,
std::shared_ptr<Expression> boundOffsetExpression,
std::shared_ptr<Expression> nbrOffsetExpression, bool containsSerial)
: copyDesc{std::move(copyDesc)}, tableSchema{tableSchema}, columnExpressions{std::move(
columnExpressions)},
offsetExpression{std::move(offsetExpression)}, boundOffsetExpression{std::move(
boundOffsetExpression)},
nbrOffsetExpression{std::move(nbrOffsetExpression)}, containsSerial{containsSerial} {}
std::shared_ptr<Expression> nbrOffsetExpression,
std::shared_ptr<Expression> predicateOffsetExpression, bool containsSerial)
: copyDesc{std::move(copyDesc)}, tableSchema{tableSchema},
columnExpressions{std::move(columnExpressions)}, offsetExpression{std::move(
offsetExpression)},
boundOffsetExpression{std::move(boundOffsetExpression)}, nbrOffsetExpression{std::move(
nbrOffsetExpression)},
predicateOffsetExpression{std::move(predicateOffsetExpression)}, containsSerial{
containsSerial} {}

std::unique_ptr<BoundCopyFromInfo> copy();
};
Expand Down
1 change: 1 addition & 0 deletions src/include/catalog/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Property {
static constexpr std::string_view OFFSET_NAME = "_OFFSET_";
static constexpr std::string_view REL_BOUND_OFFSET_NAME = "_BOUND_OFFSET_";
static constexpr std::string_view REL_NBR_OFFSET_NAME = "_NBR_OFFSET_";
static constexpr std::string_view REL_PREDICATE_OFFSET_NAME = "_PREDICATE_OFFSET_";

Property(std::string name, std::unique_ptr<common::LogicalType> dataType)
: Property{std::move(name), std::move(dataType), common::INVALID_PROPERTY_ID,
Expand Down
5 changes: 3 additions & 2 deletions src/include/catalog/table_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ class TableSchema {
TableSchema(std::string tableName, common::table_id_t tableID, common::TableType tableType,
std::vector<std::unique_ptr<Property>> properties)
: tableName{std::move(tableName)}, tableID{tableID}, tableType{tableType},
properties{std::move(properties)},
nextPropertyID{(common::property_id_t)this->properties.size()}, comment{} {}
properties{std::move(properties)}, nextPropertyID{(
common::property_id_t)this->properties.size()},
comment{"" /* empty comment */} {}
TableSchema(common::TableType tableType, std::string tableName, common::table_id_t tableID,
std::vector<std::unique_ptr<Property>> properties, std::string comment,
common::property_id_t nextPropertyID)
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/data_chunk/data_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class DataChunk {

void insert(uint32_t pos, std::shared_ptr<ValueVector> valueVector);

void resetAuxiliaryBuffer();

inline uint32_t getNumValueVectors() const { return valueVectors.size(); }

inline std::shared_ptr<ValueVector> getValueVector(uint64_t valueVectorPos) {
Expand Down
5 changes: 5 additions & 0 deletions src/include/processor/operator/index_lookup.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ class IndexLookup : public PhysicalOperator {
void indexLookup(const IndexLookupInfo& info);
static void lookupOnArray(
const IndexLookupInfo& info, arrow::Array* array, common::offset_t* offsets);
void fillOffsetArraysFromArrowVector(const IndexLookupInfo& info,
common::ValueVector* keyVector,
std::vector<std::shared_ptr<arrow::Array>>& offsetArraysVector);
void fillOffsetArraysFromVector(const IndexLookupInfo& info, common::ValueVector* keyVector,
std::vector<std::shared_ptr<arrow::Array>>& offsetArraysVector);

private:
std::vector<std::unique_ptr<IndexLookupInfo>> infos;
Expand Down
5 changes: 3 additions & 2 deletions src/include/processor/plan_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ class PlanMapper {
const std::vector<std::shared_ptr<binder::Expression>>& dataColumnExpressions,
const std::shared_ptr<binder::Expression>& offsetExpression, bool readingInSerial);
std::unique_ptr<PhysicalOperator> createIndexLookup(catalog::RelTableSchema* tableSchema,
const std::vector<DataPos>& dataPoses, const DataPos& boundOffsetDataPos,
const DataPos& nbrOffsetDataPos, std::unique_ptr<PhysicalOperator> readerOp);
std::vector<DataPos>& dataPoses, const DataPos& boundOffsetDataPos,
const DataPos& nbrOffsetDataPos, const DataPos& predicateOffsetDataPos,
std::unique_ptr<PhysicalOperator> readerOp, common::CopyDescription::FileType fileType);
std::unique_ptr<PhysicalOperator> createCopyRelColumnsOrLists(
std::shared_ptr<CopyRelSharedState> sharedState, planner::LogicalCopyFrom* copyFrom,
bool isColumns, std::unique_ptr<PhysicalOperator> copyRelColumns);
Expand Down
6 changes: 6 additions & 0 deletions src/include/storage/index/hash_index_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ class HashIndexBuilder : public BaseHashIndex {
inline bool lookup(int64_t key, common::offset_t& result) {
return lookupInternalWithoutLock(reinterpret_cast<const uint8_t*>(&key), result);
}
inline bool lookup(const char* key, common::offset_t& result) {
return lookupInternalWithoutLock(reinterpret_cast<const uint8_t*>(key), result);
}

// Non-thread safe. This should only be called in the copyCSV and never be called in parallel.
void flush();
Expand Down Expand Up @@ -167,6 +170,9 @@ class PrimaryKeyIndexBuilder {
hashIndexBuilderForInt64->lookup(key, result) :
hashIndexBuilderForString->lookup(key, result);
}
inline bool lookup(const char* key, common::offset_t& result) {
return hashIndexBuilderForString->lookup(key, result);
}

// Non-thread safe. This should only be called in the copyCSV and never be called in parallel.
inline void flush() {
Expand Down
2 changes: 2 additions & 0 deletions src/planner/operator/copy/logical_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ void LogicalCopyFrom::computeFactorizedSchema() {
if (info->tableSchema->tableType == TableType::REL) {
schema->insertToGroupAndScope(info->boundOffsetExpression, unflatGroup);
schema->insertToGroupAndScope(info->nbrOffsetExpression, unflatGroup);
schema->insertToGroupAndScope(info->predicateOffsetExpression, unflatGroup);
}
schema->insertToGroupAndScope(info->offsetExpression, flatGroup);
schema->insertToGroupAndScope(outputExpression, flatGroup);
Expand All @@ -26,6 +27,7 @@ void LogicalCopyFrom::computeFlatSchema() {
if (info->tableSchema->tableType == TableType::REL) {
schema->insertToGroupAndScope(info->boundOffsetExpression, 0);
schema->insertToGroupAndScope(info->nbrOffsetExpression, 0);
schema->insertToGroupAndScope(info->predicateOffsetExpression, 0);
}
schema->insertToGroupAndScope(info->offsetExpression, 0);
schema->insertToGroupAndScope(outputExpression, 0);
Expand Down
31 changes: 25 additions & 6 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNodeFrom(
}

std::unique_ptr<PhysicalOperator> PlanMapper::createIndexLookup(RelTableSchema* tableSchema,
const std::vector<DataPos>& dataPoses, const DataPos& boundOffsetDataPos,
const DataPos& nbrOffsetDataPos, std::unique_ptr<PhysicalOperator> readerOp) {
std::vector<DataPos>& dataPoses, const DataPos& boundOffsetDataPos,
const DataPos& nbrOffsetDataPos, const DataPos& predicateOffsetDataPos,
std::unique_ptr<PhysicalOperator> readerOp, CopyDescription::FileType fileType) {
auto boundNodeTableID = tableSchema->getBoundTableID(RelDataDirection::FWD);
auto boundNodePKIndex =
storageManager.getNodesStore().getNodeTable(boundNodeTableID)->getPKIndex();
Expand All @@ -88,8 +89,18 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createIndexLookup(RelTableSchema*
indexLookupInfos.push_back(
std::make_unique<IndexLookupInfo>(tableSchema->getSrcPKDataType()->copy(), boundNodePKIndex,
dataPoses[0], boundOffsetDataPos));
indexLookupInfos.push_back(std::make_unique<IndexLookupInfo>(
tableSchema->getDstPKDataType()->copy(), nbrNodePKIndex, dataPoses[1], nbrOffsetDataPos));
if (fileType == CopyDescription::FileType::TURTLE) {
indexLookupInfos.push_back(
std::make_unique<IndexLookupInfo>(std::make_unique<LogicalType>(LogicalTypeID::STRING),
boundNodePKIndex, dataPoses[1], predicateOffsetDataPos));
indexLookupInfos.push_back(
std::make_unique<IndexLookupInfo>(tableSchema->getDstPKDataType()->copy(),
nbrNodePKIndex, dataPoses[2], nbrOffsetDataPos));
} else {
indexLookupInfos.push_back(
std::make_unique<IndexLookupInfo>(tableSchema->getDstPKDataType()->copy(),
nbrNodePKIndex, dataPoses[1], nbrOffsetDataPos));
}
return std::make_unique<IndexLookup>(
std::move(indexLookupInfos), std::move(readerOp), getOperatorID(), tableSchema->tableName);
}
Expand All @@ -110,9 +121,17 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createCopyRelColumnsOrLists(
DataPos{outFSchema->getExpressionPos(*copyFromInfo->boundOffsetExpression)};
auto nbrOffsetDataPos =
DataPos{outFSchema->getExpressionPos(*copyFromInfo->nbrOffsetExpression)};
auto predicateOffsetDataPos =
DataPos{outFSchema->getExpressionPos(*copyFromInfo->predicateOffsetExpression)};
auto indexLookup = createIndexLookup(tableSchema, readerInfo->dataColumnsPos,
boundOffsetDataPos, nbrOffsetDataPos, std::move(reader));
CopyRelInfo copyRelInfo{tableSchema, readerInfo->dataColumnsPos, offsetDataPos,
boundOffsetDataPos, nbrOffsetDataPos, predicateOffsetDataPos, std::move(reader),
copyFromInfo->copyDesc->fileType);
auto dataColumnPosCopy = readerInfo->dataColumnsPos;
if (copyFromInfo->copyDesc->fileType == CopyDescription::FileType::TURTLE) {
dataColumnPosCopy[1] = dataColumnPosCopy[2];
dataColumnPosCopy[2] = predicateOffsetDataPos;
}
CopyRelInfo copyRelInfo{tableSchema, std::move(dataColumnPosCopy), offsetDataPos,
boundOffsetDataPos, nbrOffsetDataPos, storageManager.getWAL()};
if (isColumns) {
return std::make_unique<CopyRelColumns>(copyRelInfo, std::move(sharedState),
Expand Down
Loading

0 comments on commit 6900b62

Please sign in to comment.