From de7cd05f96e6cab07f52a5ca3fc5dd80f563e631 Mon Sep 17 00:00:00 2001 From: ziyi chen Date: Thu, 14 Sep 2023 10:35:04 -0400 Subject: [PATCH] Implement copy rdf rel table --- dataset/copy-test/node/turtle/copy.cypher | 1 - dataset/copy-test/rdf/copy.cypher | 2 + .../{node/turtle => rdf}/schema.cypher | 0 .../{node/turtle => rdf}/taxonomy.ttl | 0 src/binder/bind/bind_copy.cpp | 49 +++++++++---- src/binder/copy/bound_copy_from.cpp | 2 + src/common/data_chunk/data_chunk.cpp | 6 ++ src/include/binder/binder.h | 3 +- src/include/binder/copy/bound_copy_from.h | 16 ++-- src/include/catalog/property.h | 1 + src/include/catalog/table_schema.h | 5 +- src/include/common/data_chunk/data_chunk.h | 2 + src/include/processor/operator/index_lookup.h | 5 ++ src/include/processor/plan_mapper.h | 5 +- .../storage/index/hash_index_builder.h | 6 ++ .../operator/copy/logical_copy_from.cpp | 2 + src/processor/map/map_copy_from.cpp | 31 ++++++-- src/processor/operator/index_lookup.cpp | 73 +++++++++++++------ .../operator/persistent/copy_node.cpp | 16 ++-- src/processor/operator/persistent/reader.cpp | 1 + .../operator/persistent/reader/rdf_reader.cpp | 3 - .../operator/persistent/reader_functions.cpp | 2 + test/test_files/copy/copy_rdf.test | 11 ++- 23 files changed, 176 insertions(+), 66 deletions(-) delete mode 100644 dataset/copy-test/node/turtle/copy.cypher create mode 100644 dataset/copy-test/rdf/copy.cypher rename dataset/copy-test/{node/turtle => rdf}/schema.cypher (100%) rename dataset/copy-test/{node/turtle => rdf}/taxonomy.ttl (100%) diff --git a/dataset/copy-test/node/turtle/copy.cypher b/dataset/copy-test/node/turtle/copy.cypher deleted file mode 100644 index f9753868c2..0000000000 --- a/dataset/copy-test/node/turtle/copy.cypher +++ /dev/null @@ -1 +0,0 @@ -COPY taxonomy_RESOURCE FROM "dataset/copy-test/node/turtle/taxonomy.ttl" ; diff --git a/dataset/copy-test/rdf/copy.cypher b/dataset/copy-test/rdf/copy.cypher new file mode 100644 index 0000000000..d43179a848 --- /dev/null +++ b/dataset/copy-test/rdf/copy.cypher @@ -0,0 +1,2 @@ +COPY taxonomy_RESOURCE FROM "dataset/copy-test/rdf/taxonomy.ttl" ; +COPY taxonomy_TRIPLES FROM "dataset/copy-test/rdf/taxonomy.ttl" ; diff --git a/dataset/copy-test/node/turtle/schema.cypher b/dataset/copy-test/rdf/schema.cypher similarity index 100% rename from dataset/copy-test/node/turtle/schema.cypher rename to dataset/copy-test/rdf/schema.cypher diff --git a/dataset/copy-test/node/turtle/taxonomy.ttl b/dataset/copy-test/rdf/taxonomy.ttl similarity index 100% rename from dataset/copy-test/node/turtle/taxonomy.ttl rename to dataset/copy-test/rdf/taxonomy.ttl diff --git a/src/binder/bind/bind_copy.cpp b/src/binder/bind/bind_copy.cpp index 922deed7b8..f6d22722b7 100644 --- a/src/binder/bind/bind_copy.cpp +++ b/src/binder/bind/bind_copy.cpp @@ -191,7 +191,8 @@ std::unique_ptr Binder::bindCopyNodeFrom( auto nodeOffset = createVariable(std::string(Property::OFFSET_NAME), LogicalType{LogicalTypeID::INT64}); auto boundCopyFromInfo = std::make_unique(std::move(copyDescription), - tableSchema, std::move(columns), std::move(nodeOffset), nullptr, nullptr, containsSerial); + tableSchema, std::move(columns), std::move(nodeOffset), nullptr /* boundOffsetExpression */, + nullptr /* nbrOffsetExpression */, nullptr /* predicateOffsetExpression */, containsSerial); return std::make_unique(std::move(boundCopyFromInfo)); } @@ -199,16 +200,18 @@ std::unique_ptr Binder::bindCopyRelFrom( std::unique_ptr copyDescription, TableSchema* tableSchema) { // For table with SERIAL columns, we need to read in serial from files. auto containsSerial = bindContainsSerial(tableSchema); - auto columns = bindCopyRelColumns(tableSchema); + auto columns = bindCopyRelColumns(tableSchema, copyDescription->fileType); auto nodeOffset = createVariable(std::string(Property::OFFSET_NAME), LogicalType{LogicalTypeID::INT64}); auto boundOffset = createVariable( std::string(Property::REL_BOUND_OFFSET_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN}); auto nbrOffset = createVariable( std::string(Property::REL_NBR_OFFSET_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN}); - auto boundCopyFromInfo = - std::make_unique(std::move(copyDescription), tableSchema, - std::move(columns), std::move(nodeOffset), boundOffset, nbrOffset, containsSerial); + auto predicateOffset = createVariable( + std::string(Property::REL_PREDICATE_OFFSET_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN}); + auto boundCopyFromInfo = std::make_unique(std::move(copyDescription), + tableSchema, std::move(columns), std::move(nodeOffset), std::move(boundOffset), + std::move(nbrOffset), std::move(predicateOffset), containsSerial); return std::make_unique(std::move(boundCopyFromInfo)); } @@ -253,18 +256,34 @@ expression_vector Binder::bindCopyNodeColumns( return columnExpressions; } -expression_vector Binder::bindCopyRelColumns(TableSchema* tableSchema) { +expression_vector Binder::bindCopyRelColumns( + TableSchema* tableSchema, CopyDescription::FileType fileType) { expression_vector columnExpressions; - columnExpressions.push_back(createVariable( - std::string(Property::REL_FROM_PROPERTY_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN})); - columnExpressions.push_back(createVariable( - std::string(Property::REL_TO_PROPERTY_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN})); - for (auto& property : tableSchema->properties) { - if (skipPropertyInFile(*property)) { - continue; - } + switch (fileType) { + case common::CopyDescription::FileType::TURTLE: { + columnExpressions.push_back(createVariable("SUBJECT", LogicalType{LogicalTypeID::STRING})); columnExpressions.push_back( - createVariable(property->getName(), LogicalType{LogicalTypeID::ARROW_COLUMN})); + createVariable("PREDICATE", LogicalType{LogicalTypeID::STRING})); + columnExpressions.push_back(createVariable("OBJECT", LogicalType{LogicalTypeID::STRING})); + } break; + case common::CopyDescription::FileType::CSV: + case common::CopyDescription::FileType::PARQUET: + case common::CopyDescription::FileType::NPY: { + columnExpressions.push_back(createVariable(std::string(Property::REL_FROM_PROPERTY_NAME), + LogicalType{LogicalTypeID::ARROW_COLUMN})); + columnExpressions.push_back(createVariable( + std::string(Property::REL_TO_PROPERTY_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN})); + for (auto& property : tableSchema->properties) { + if (skipPropertyInFile(*property)) { + continue; + } + columnExpressions.push_back( + createVariable(property->getName(), LogicalType{LogicalTypeID::ARROW_COLUMN})); + } + } break; + default: { + throw NotImplementedException{"Binder::bindCopyRelColumns"}; + } } return columnExpressions; } diff --git a/src/binder/copy/bound_copy_from.cpp b/src/binder/copy/bound_copy_from.cpp index c3305b7322..68f892b72a 100644 --- a/src/binder/copy/bound_copy_from.cpp +++ b/src/binder/copy/bound_copy_from.cpp @@ -13,6 +13,8 @@ std::unique_ptr BoundCopyFromInfo::copy() { std::move(copiedColumnExpressions), offsetExpression->copy(), tableSchema->tableType == common::TableType::REL ? boundOffsetExpression->copy() : nullptr, tableSchema->tableType == common::TableType::REL ? nbrOffsetExpression->copy() : nullptr, + tableSchema->tableType == common::TableType::REL ? predicateOffsetExpression->copy() : + nullptr, containsSerial); } diff --git a/src/common/data_chunk/data_chunk.cpp b/src/common/data_chunk/data_chunk.cpp index 68b4e6a178..03e82e8890 100644 --- a/src/common/data_chunk/data_chunk.cpp +++ b/src/common/data_chunk/data_chunk.cpp @@ -9,5 +9,11 @@ void DataChunk::insert(uint32_t pos, std::shared_ptr valueVector) { valueVectors[pos] = std::move(valueVector); } +void DataChunk::resetAuxiliaryBuffer() { + for (auto& valueVector : valueVectors) { + valueVector->resetAuxiliaryBuffer(); + } +} + } // namespace common } // namespace kuzu diff --git a/src/include/binder/binder.h b/src/include/binder/binder.h index df85c2d2e9..e4bfb44490 100644 --- a/src/include/binder/binder.h +++ b/src/include/binder/binder.h @@ -113,7 +113,8 @@ class Binder { catalog::TableSchema* tableSchema); expression_vector bindCopyNodeColumns( catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType); - expression_vector bindCopyRelColumns(catalog::TableSchema* tableSchema); + expression_vector bindCopyRelColumns( + catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType); std::unique_ptr bindCopyToClause(const parser::Statement& statement); std::unique_ptr bindParsingOptions( const std::unordered_map>& diff --git a/src/include/binder/copy/bound_copy_from.h b/src/include/binder/copy/bound_copy_from.h index 603e1be6c4..248e01c847 100644 --- a/src/include/binder/copy/bound_copy_from.h +++ b/src/include/binder/copy/bound_copy_from.h @@ -19,6 +19,7 @@ struct BoundCopyFromInfo { // `boundOffsetExpression` and `nbrOffsetExpression` are for rel tables only. std::shared_ptr boundOffsetExpression; std::shared_ptr nbrOffsetExpression; + std::shared_ptr predicateOffsetExpression; bool containsSerial; @@ -26,12 +27,15 @@ struct BoundCopyFromInfo { catalog::TableSchema* tableSchema, expression_vector columnExpressions, std::shared_ptr offsetExpression, std::shared_ptr boundOffsetExpression, - std::shared_ptr nbrOffsetExpression, bool containsSerial) - : copyDesc{std::move(copyDesc)}, tableSchema{tableSchema}, columnExpressions{std::move( - columnExpressions)}, - offsetExpression{std::move(offsetExpression)}, boundOffsetExpression{std::move( - boundOffsetExpression)}, - nbrOffsetExpression{std::move(nbrOffsetExpression)}, containsSerial{containsSerial} {} + std::shared_ptr nbrOffsetExpression, + std::shared_ptr predicateOffsetExpression, bool containsSerial) + : copyDesc{std::move(copyDesc)}, tableSchema{tableSchema}, + columnExpressions{std::move(columnExpressions)}, offsetExpression{std::move( + offsetExpression)}, + boundOffsetExpression{std::move(boundOffsetExpression)}, nbrOffsetExpression{std::move( + nbrOffsetExpression)}, + predicateOffsetExpression{std::move(predicateOffsetExpression)}, containsSerial{ + containsSerial} {} std::unique_ptr copy(); }; diff --git a/src/include/catalog/property.h b/src/include/catalog/property.h index 414adb427f..a23c4b27fd 100644 --- a/src/include/catalog/property.h +++ b/src/include/catalog/property.h @@ -40,6 +40,7 @@ class Property { static constexpr std::string_view OFFSET_NAME = "_OFFSET_"; static constexpr std::string_view REL_BOUND_OFFSET_NAME = "_BOUND_OFFSET_"; static constexpr std::string_view REL_NBR_OFFSET_NAME = "_NBR_OFFSET_"; + static constexpr std::string_view REL_PREDICATE_OFFSET_NAME = "_PREDICATE_OFFSET_"; Property(std::string name, std::unique_ptr dataType) : Property{std::move(name), std::move(dataType), common::INVALID_PROPERTY_ID, diff --git a/src/include/catalog/table_schema.h b/src/include/catalog/table_schema.h index 06c3ac3720..a6598f5645 100644 --- a/src/include/catalog/table_schema.h +++ b/src/include/catalog/table_schema.h @@ -16,8 +16,9 @@ class TableSchema { TableSchema(std::string tableName, common::table_id_t tableID, common::TableType tableType, std::vector> properties) : tableName{std::move(tableName)}, tableID{tableID}, tableType{tableType}, - properties{std::move(properties)}, - nextPropertyID{(common::property_id_t)this->properties.size()}, comment{} {} + properties{std::move(properties)}, nextPropertyID{( + common::property_id_t)this->properties.size()}, + comment{"" /* empty comment */} {} TableSchema(common::TableType tableType, std::string tableName, common::table_id_t tableID, std::vector> properties, std::string comment, common::property_id_t nextPropertyID) diff --git a/src/include/common/data_chunk/data_chunk.h b/src/include/common/data_chunk/data_chunk.h index c8e4a6117b..6a014dac8d 100644 --- a/src/include/common/data_chunk/data_chunk.h +++ b/src/include/common/data_chunk/data_chunk.h @@ -27,6 +27,8 @@ class DataChunk { void insert(uint32_t pos, std::shared_ptr valueVector); + void resetAuxiliaryBuffer(); + inline uint32_t getNumValueVectors() const { return valueVectors.size(); } inline std::shared_ptr getValueVector(uint64_t valueVectorPos) { diff --git a/src/include/processor/operator/index_lookup.h b/src/include/processor/operator/index_lookup.h index a4bf95d30e..8c4b13917c 100644 --- a/src/include/processor/operator/index_lookup.h +++ b/src/include/processor/operator/index_lookup.h @@ -47,6 +47,11 @@ class IndexLookup : public PhysicalOperator { void indexLookup(const IndexLookupInfo& info); static void lookupOnArray( const IndexLookupInfo& info, arrow::Array* array, common::offset_t* offsets); + void fillOffsetArraysFromArrowVector(const IndexLookupInfo& info, + common::ValueVector* keyVector, + std::vector>& offsetArraysVector); + void fillOffsetArraysFromVector(const IndexLookupInfo& info, common::ValueVector* keyVector, + std::vector>& offsetArraysVector); private: std::vector> infos; diff --git a/src/include/processor/plan_mapper.h b/src/include/processor/plan_mapper.h index 53207e459a..d803b9bb62 100644 --- a/src/include/processor/plan_mapper.h +++ b/src/include/processor/plan_mapper.h @@ -102,8 +102,9 @@ class PlanMapper { const std::vector>& dataColumnExpressions, const std::shared_ptr& offsetExpression, bool readingInSerial); std::unique_ptr createIndexLookup(catalog::RelTableSchema* tableSchema, - const std::vector& dataPoses, const DataPos& boundOffsetDataPos, - const DataPos& nbrOffsetDataPos, std::unique_ptr readerOp); + std::vector& dataPoses, const DataPos& boundOffsetDataPos, + const DataPos& nbrOffsetDataPos, const DataPos& predicateOffsetDataPos, + std::unique_ptr readerOp, common::CopyDescription::FileType fileType); std::unique_ptr createCopyRelColumnsOrLists( std::shared_ptr sharedState, planner::LogicalCopyFrom* copyFrom, bool isColumns, std::unique_ptr copyRelColumns); diff --git a/src/include/storage/index/hash_index_builder.h b/src/include/storage/index/hash_index_builder.h index a4307fa967..4fd5b717c6 100644 --- a/src/include/storage/index/hash_index_builder.h +++ b/src/include/storage/index/hash_index_builder.h @@ -92,6 +92,9 @@ class HashIndexBuilder : public BaseHashIndex { inline bool lookup(int64_t key, common::offset_t& result) { return lookupInternalWithoutLock(reinterpret_cast(&key), result); } + inline bool lookup(const char* key, common::offset_t& result) { + return lookupInternalWithoutLock(reinterpret_cast(key), result); + } // Non-thread safe. This should only be called in the copyCSV and never be called in parallel. void flush(); @@ -167,6 +170,9 @@ class PrimaryKeyIndexBuilder { hashIndexBuilderForInt64->lookup(key, result) : hashIndexBuilderForString->lookup(key, result); } + inline bool lookup(const char* key, common::offset_t& result) { + return hashIndexBuilderForString->lookup(key, result); + } // Non-thread safe. This should only be called in the copyCSV and never be called in parallel. inline void flush() { diff --git a/src/planner/operator/copy/logical_copy_from.cpp b/src/planner/operator/copy/logical_copy_from.cpp index 8956920165..36093b4e3e 100644 --- a/src/planner/operator/copy/logical_copy_from.cpp +++ b/src/planner/operator/copy/logical_copy_from.cpp @@ -13,6 +13,7 @@ void LogicalCopyFrom::computeFactorizedSchema() { if (info->tableSchema->tableType == TableType::REL) { schema->insertToGroupAndScope(info->boundOffsetExpression, unflatGroup); schema->insertToGroupAndScope(info->nbrOffsetExpression, unflatGroup); + schema->insertToGroupAndScope(info->predicateOffsetExpression, unflatGroup); } schema->insertToGroupAndScope(info->offsetExpression, flatGroup); schema->insertToGroupAndScope(outputExpression, flatGroup); @@ -26,6 +27,7 @@ void LogicalCopyFrom::computeFlatSchema() { if (info->tableSchema->tableType == TableType::REL) { schema->insertToGroupAndScope(info->boundOffsetExpression, 0); schema->insertToGroupAndScope(info->nbrOffsetExpression, 0); + schema->insertToGroupAndScope(info->predicateOffsetExpression, 0); } schema->insertToGroupAndScope(info->offsetExpression, 0); schema->insertToGroupAndScope(outputExpression, 0); diff --git a/src/processor/map/map_copy_from.cpp b/src/processor/map/map_copy_from.cpp index 80a3465787..87b58c060d 100644 --- a/src/processor/map/map_copy_from.cpp +++ b/src/processor/map/map_copy_from.cpp @@ -75,8 +75,9 @@ std::unique_ptr PlanMapper::mapCopyNodeFrom( } std::unique_ptr PlanMapper::createIndexLookup(RelTableSchema* tableSchema, - const std::vector& dataPoses, const DataPos& boundOffsetDataPos, - const DataPos& nbrOffsetDataPos, std::unique_ptr readerOp) { + std::vector& dataPoses, const DataPos& boundOffsetDataPos, + const DataPos& nbrOffsetDataPos, const DataPos& predicateOffsetDataPos, + std::unique_ptr readerOp, CopyDescription::FileType fileType) { auto boundNodeTableID = tableSchema->getBoundTableID(RelDataDirection::FWD); auto boundNodePKIndex = storageManager.getNodesStore().getNodeTable(boundNodeTableID)->getPKIndex(); @@ -88,8 +89,18 @@ std::unique_ptr PlanMapper::createIndexLookup(RelTableSchema* indexLookupInfos.push_back( std::make_unique(tableSchema->getSrcPKDataType()->copy(), boundNodePKIndex, dataPoses[0], boundOffsetDataPos)); - indexLookupInfos.push_back(std::make_unique( - tableSchema->getDstPKDataType()->copy(), nbrNodePKIndex, dataPoses[1], nbrOffsetDataPos)); + if (fileType == CopyDescription::FileType::TURTLE) { + indexLookupInfos.push_back( + std::make_unique(std::make_unique(LogicalTypeID::STRING), + boundNodePKIndex, dataPoses[1], predicateOffsetDataPos)); + indexLookupInfos.push_back( + std::make_unique(tableSchema->getDstPKDataType()->copy(), + nbrNodePKIndex, dataPoses[2], nbrOffsetDataPos)); + } else { + indexLookupInfos.push_back( + std::make_unique(tableSchema->getDstPKDataType()->copy(), + nbrNodePKIndex, dataPoses[1], nbrOffsetDataPos)); + } return std::make_unique( std::move(indexLookupInfos), std::move(readerOp), getOperatorID(), tableSchema->tableName); } @@ -110,9 +121,17 @@ std::unique_ptr PlanMapper::createCopyRelColumnsOrLists( DataPos{outFSchema->getExpressionPos(*copyFromInfo->boundOffsetExpression)}; auto nbrOffsetDataPos = DataPos{outFSchema->getExpressionPos(*copyFromInfo->nbrOffsetExpression)}; + auto predicateOffsetDataPos = + DataPos{outFSchema->getExpressionPos(*copyFromInfo->predicateOffsetExpression)}; auto indexLookup = createIndexLookup(tableSchema, readerInfo->dataColumnsPos, - boundOffsetDataPos, nbrOffsetDataPos, std::move(reader)); - CopyRelInfo copyRelInfo{tableSchema, readerInfo->dataColumnsPos, offsetDataPos, + boundOffsetDataPos, nbrOffsetDataPos, predicateOffsetDataPos, std::move(reader), + copyFromInfo->copyDesc->fileType); + auto dataColumnPosCopy = readerInfo->dataColumnsPos; + if (copyFromInfo->copyDesc->fileType == CopyDescription::FileType::TURTLE) { + dataColumnPosCopy[1] = dataColumnPosCopy[2]; + dataColumnPosCopy[2] = predicateOffsetDataPos; + } + CopyRelInfo copyRelInfo{tableSchema, std::move(dataColumnPosCopy), offsetDataPos, boundOffsetDataPos, nbrOffsetDataPos, storageManager.getWAL()}; if (isColumns) { return std::make_unique(copyRelInfo, std::move(sharedState), diff --git a/src/processor/operator/index_lookup.cpp b/src/processor/operator/index_lookup.cpp index 7c51f078dc..6cc1fe83e7 100644 --- a/src/processor/operator/index_lookup.cpp +++ b/src/processor/operator/index_lookup.cpp @@ -25,25 +25,23 @@ bool IndexLookup::getNextTuplesInternal(ExecutionContext* context) { return true; } +std::unique_ptr IndexLookup::clone() { + std::vector> copiedInfos; + copiedInfos.reserve(infos.size()); + for (const auto& info : infos) { + copiedInfos.push_back(info->copy()); + } + return make_unique( + std::move(copiedInfos), children[0]->clone(), getOperatorID(), paramsString); +} + void IndexLookup::indexLookup(const IndexLookupInfo& info) { auto keyVector = resultSet->getValueVector(info.keyVectorPos).get(); - // This should be changed to handle non-arrow Vectors once the new csv reader is in. - assert(keyVector->dataType.getLogicalTypeID() == LogicalTypeID::ARROW_COLUMN); - auto keyChunkedArray = ArrowColumnVector::getArrowColumn(keyVector); arrow::ArrayVector offsetArraysVector; - offsetArraysVector.reserve(keyChunkedArray->num_chunks()); - for (auto& keyArray : keyChunkedArray->chunks()) { - auto numKeys = keyArray->length(); - std::shared_ptr arrowBuffer; - TableCopyUtils::throwCopyExceptionIfNotOK( - arrow::AllocateBuffer((int64_t)(numKeys * sizeof(offset_t))).Value(&arrowBuffer)); - auto offsets = (offset_t*)arrowBuffer->data(); - if (keyArray->null_count() != 0) { - throw RuntimeException(ExceptionMessage::nullPKException()); - } - lookupOnArray(info, keyArray.get(), offsets); - offsetArraysVector.push_back(TableCopyUtils::createArrowPrimitiveArray( - std::make_shared(), arrowBuffer, numKeys)); + if (keyVector->dataType.getPhysicalType() == PhysicalTypeID::ARROW_COLUMN) { + fillOffsetArraysFromArrowVector(info, keyVector, offsetArraysVector); + } else { + fillOffsetArraysFromVector(info, keyVector, offsetArraysVector); } auto offsetChunkedArray = std::make_shared(offsetArraysVector); ArrowColumnVector::setArrowColumn( @@ -107,14 +105,43 @@ void IndexLookup::lookupOnArray( } } -std::unique_ptr IndexLookup::clone() { - std::vector> copiedInfos; - copiedInfos.reserve(infos.size()); - for (const auto& info : infos) { - copiedInfos.push_back(info->copy()); +void IndexLookup::fillOffsetArraysFromArrowVector(const IndexLookupInfo& info, + common::ValueVector* keyVector, arrow::ArrayVector& offsetArraysVector) { + // This should be changed to handle non-arrow Vectors once the new csv reader is in. + auto keyChunkedArray = ArrowColumnVector::getArrowColumn(keyVector); + offsetArraysVector.reserve(keyChunkedArray->num_chunks()); + for (auto& keyArray : keyChunkedArray->chunks()) { + auto numKeys = keyArray->length(); + std::shared_ptr arrowBuffer; + TableCopyUtils::throwCopyExceptionIfNotOK( + arrow::AllocateBuffer((int64_t)(numKeys * sizeof(offset_t))).Value(&arrowBuffer)); + auto offsets = (offset_t*)arrowBuffer->data(); + if (keyArray->null_count() != 0) { + throw RuntimeException(ExceptionMessage::nullPKException()); + } + lookupOnArray(info, keyArray.get(), offsets); + offsetArraysVector.push_back(TableCopyUtils::createArrowPrimitiveArray( + std::make_shared(), arrowBuffer, numKeys)); } - return make_unique( - std::move(copiedInfos), children[0]->clone(), getOperatorID(), paramsString); +} + +void IndexLookup::fillOffsetArraysFromVector(const kuzu::processor::IndexLookupInfo& info, + common::ValueVector* keyVector, + std::vector>& offsetArraysVector) { + auto numKeys = keyVector->state->selVector->selectedSize; + std::shared_ptr arrowBuffer; + TableCopyUtils::throwCopyExceptionIfNotOK( + arrow::AllocateBuffer((int64_t)(numKeys * sizeof(offset_t))).Value(&arrowBuffer)); + auto offsets = (offset_t*)arrowBuffer->data(); + for (auto i = 0u; i < keyVector->state->selVector->selectedSize; i++) { + info.pkIndex->lookup(&transaction::DUMMY_READ_TRANSACTION, + keyVector->getValue(keyVector->state->selVector->selectedPositions[i]) + .getAsString() + .c_str(), + offsets[i]); + } + offsetArraysVector.push_back(TableCopyUtils::createArrowPrimitiveArray( + std::make_shared(), arrowBuffer, numKeys)); } } // namespace processor diff --git a/src/processor/operator/persistent/copy_node.cpp b/src/processor/operator/persistent/copy_node.cpp index 7dc03faa65..adf651f721 100644 --- a/src/processor/operator/persistent/copy_node.cpp +++ b/src/processor/operator/persistent/copy_node.cpp @@ -29,7 +29,10 @@ void CopyNodeSharedState::initializePrimaryKey(const std::string& directory) { pkIndex = std::make_unique( StorageUtils::getNodeIndexFName(directory, tableSchema->tableID, DBFileType::ORIGINAL), *tableSchema->getPrimaryKey()->getDataType()); - pkIndex->bulkReserve(numRows); + // Since hashIndexBuilder doesn't support dynamic rehash, we need to reserve enough number + // of slots when copying turtle files. + pkIndex->bulkReserve( + copyDesc.fileType == common::CopyDescription::FileType::TURTLE ? numRows * 3 : numRows); } for (auto& property : tableSchema->properties) { if (property->getPropertyID() == tableSchema->getPrimaryKey()->getPropertyID()) { @@ -91,6 +94,7 @@ void CopyNode::executeInternal(ExecutionContext* context) { for (auto& dataPos : copyNodeInfo.dataColumnPoses) { // All tuples in the resultSet are in the same data chunk. auto vectorToAppend = resultSet->getValueVector(dataPos).get(); + vectorToAppend->state->selVector = originalSelVector; appendUniqueValueToPKIndex(sharedState->pkIndex.get(), vectorToAppend); copyToNodeGroup({dataPos}); } @@ -190,6 +194,8 @@ void CopyNode::checkNonNullConstraint(NullColumnChunk* nullChunk, offset_t numNo } void CopyNode::finalize(ExecutionContext* context) { + auto numNodes = StorageUtils::getStartOffsetOfNodeGroup(sharedState->getCurNodeGroupIdx()) + + sharedState->sharedNodeGroup->getNumNodes(); if (sharedState->sharedNodeGroup) { auto nodeGroupIdx = sharedState->getNextNodeGroupIdx(); writeAndResetNodeGroup(nodeGroupIdx, sharedState->pkIndex.get(), sharedState->pkColumnID, @@ -205,12 +211,12 @@ void CopyNode::finalize(ExecutionContext* context) { sharedState->tableSchema->getBwdRelTableIDSet().end()); for (auto relTableID : connectedRelTableIDs) { copyNodeInfo.relsStore->getRelTable(relTableID) - ->batchInitEmptyRelsForNewNodes(relTableID, sharedState->numRows); + ->batchInitEmptyRelsForNewNodes(relTableID, numNodes); } sharedState->table->getNodeStatisticsAndDeletedIDs()->setNumTuplesForTable( - sharedState->table->getTableID(), sharedState->numRows); + sharedState->table->getTableID(), numNodes); auto outputMsg = StringUtils::string_format("{} number of tuples has been copied to table: {}.", - sharedState->numRows, sharedState->tableSchema->tableName.c_str()); + numNodes, sharedState->tableSchema->tableName.c_str()); FactorizedTableUtils::appendStringToTable( sharedState->fTable.get(), outputMsg, context->memoryManager); } @@ -252,7 +258,7 @@ void CopyNode::appendUniqueValueToPKIndex( localNodeGroup->getNumNodes(); for (auto i = 0u; i < vectorToAppend->state->getNumSelectedValues(); i++) { auto uriStr = vectorToAppend->getValue(i).getAsString(); - if (!pkIndex->lookup((int64_t)uriStr.c_str(), result)) { + if (!pkIndex->lookup(uriStr.c_str(), result)) { pkIndex->append(uriStr.c_str(), offset++); selVector->selectedPositions[nextPos++] = i; } diff --git a/src/processor/operator/persistent/reader.cpp b/src/processor/operator/persistent/reader.cpp index 08fb6a583a..a6321cc470 100644 --- a/src/processor/operator/persistent/reader.cpp +++ b/src/processor/operator/persistent/reader.cpp @@ -70,6 +70,7 @@ void Reader::readNextDataChunk() { initFunc(*readFuncData, sharedState->copyDescription->filePaths, morsel->fileIdx, *sharedState->copyDescription->csvReaderConfig, sharedState->tableSchema); } + dataChunk->resetAuxiliaryBuffer(); readFunc(*readFuncData, morsel->blockIdx, dataChunk.get()); if (dataChunk->state->selVector->selectedSize > 0) { leftArrowArrays.appendFromDataChunk(dataChunk.get()); diff --git a/src/processor/operator/persistent/reader/rdf_reader.cpp b/src/processor/operator/persistent/reader/rdf_reader.cpp index b5c81a0587..fdd7f88365 100644 --- a/src/processor/operator/persistent/reader/rdf_reader.cpp +++ b/src/processor/operator/persistent/reader/rdf_reader.cpp @@ -68,9 +68,6 @@ bool RDFReader::isSerdTypeSupported(SerdType serdType) { } offset_t RDFReader::read(DataChunk* dataChunk) { - for (auto& vector : dataChunk->valueVectors) { - vector->resetAuxiliaryBuffer(); - } if (status) { return 0; } diff --git a/src/processor/operator/persistent/reader_functions.cpp b/src/processor/operator/persistent/reader_functions.cpp index 51aa15b8fa..07dd6d96f6 100644 --- a/src/processor/operator/persistent/reader_functions.cpp +++ b/src/processor/operator/persistent/reader_functions.cpp @@ -169,6 +169,7 @@ std::vector ReaderFunctions::countRowsInNodeCSVFile( block_idx_t numBlocks = 0; while (true) { dataChunk->state->selVector->selectedSize = 0; + dataChunk->resetAuxiliaryBuffer(); auto numRowsRead = reader->ParseCSV(*dataChunk); if (numRowsRead == 0) { break; @@ -221,6 +222,7 @@ std::vector ReaderFunctions::countRowsInRDFFile( row_idx_t numRowsInFile = 0; block_idx_t numBlocks = 0; while (true) { + dataChunk->resetAuxiliaryBuffer(); auto numRowsRead = reader->read(dataChunk.get()); if (numRowsRead == 0) { break; diff --git a/test/test_files/copy/copy_rdf.test b/test/test_files/copy/copy_rdf.test index 6c565a99d1..e30f4a5eaf 100644 --- a/test/test_files/copy/copy_rdf.test +++ b/test/test_files/copy/copy_rdf.test @@ -1,5 +1,6 @@ -GROUP CopyRDFTest --DATASET CSV copy-test/node/turtle +-DATASET CSV copy-test/rdf +-BUFFER_POOL_SIZE 536870912 -- @@ -8,7 +9,7 @@ -LOG CountRDFNodeTable -STATEMENT MATCH (s:taxonomy_RESOURCE) RETURN COUNT(s._IRI) ---- 1 -570211 +1138397 -LOG QueryRDFNodeTable -STATEMENT MATCH (s:taxonomy_RESOURCE) RETURN s._IRI ORDER BY s._IRI LIMIT 5 @@ -18,3 +19,9 @@ http://dbpedia.org/class/yago/14July115200493 http://dbpedia.org/class/yago/1530s115148787 http://dbpedia.org/class/yago/16PF106475933 http://dbpedia.org/class/yago/1750s115149933 + +-LOG QueryRDFRelTable +-STATEMENT MATCH (s:taxonomy_RESOURCE)-[p:taxonomy_TRIPLES]->(o:taxonomy_RESOURCE) RETURN s._IRI, o._IRI ORDER BY s._IRI LIMIT 2 +---- 2 +http://dbpedia.org/class/yago/'hood108641944|http://dbpedia.org/class/yago/Vicinity108641113 +http://dbpedia.org/class/yago/14July115200493|http://dbpedia.org/class/yago/LegalHoliday115199592