From ab919c09aae297fc8931fd630b7f58080f0e5181 Mon Sep 17 00:00:00 2001 From: Guodong Jin Date: Mon, 26 Jun 2023 00:27:22 +0800 Subject: [PATCH] fix issue 1704 --- dataset/primary-key-tests/eFollows.csv | 2 + .../primary-key-tests/int-pk-tests/eKnows.csv | 1 - .../int-pk-tests/vPerson.csv | 2 - .../string-pk-tests/eKnows.csv | 1 - .../string-pk-tests/vPerson.csv | 2 - dataset/primary-key-tests/vOrg.csv | 1 + dataset/primary-key-tests/vPerson.csv | 2 + src/catalog/catalog.cpp | 24 ++-- src/include/catalog/catalog.h | 6 +- src/include/catalog/catalog_structs.h | 26 ++--- .../processor/operator/copy/read_csv.h | 12 +- src/include/storage/copier/rel_copier.h | 44 ++++---- .../storage/copier/rel_copy_executor.h | 6 +- src/processor/operator/copy/copy_rel.cpp | 3 +- .../operator/ddl/create_rel_table.cpp | 8 +- src/storage/copier/rel_copier.cpp | 19 ++-- src/storage/copier/rel_copy_executor.cpp | 41 ++----- src/storage/copier/table_copy_utils.cpp | 8 ++ test/runner/e2e_ddl_test.cpp | 105 ------------------ test/test_files/copy/copy_pk_basic.test | 44 ++++++++ .../copy_pk_long_string.test} | 2 +- .../copy_pk_long_string_parquet.test} | 2 +- ...opy_serial_pk.test => copy_pk_serial.test} | 2 +- 23 files changed, 146 insertions(+), 217 deletions(-) create mode 100644 dataset/primary-key-tests/eFollows.csv delete mode 100644 dataset/primary-key-tests/int-pk-tests/eKnows.csv delete mode 100644 dataset/primary-key-tests/int-pk-tests/vPerson.csv delete mode 100644 dataset/primary-key-tests/string-pk-tests/eKnows.csv delete mode 100644 dataset/primary-key-tests/string-pk-tests/vPerson.csv create mode 100644 dataset/primary-key-tests/vOrg.csv create mode 100644 dataset/primary-key-tests/vPerson.csv create mode 100644 test/test_files/copy/copy_pk_basic.test rename test/test_files/{long_string_pk/long_string_pk.test => copy/copy_pk_long_string.test} (89%) rename test/test_files/{long_string_pk/long_string_pk_parquet.test => copy/copy_pk_long_string_parquet.test} (87%) rename test/test_files/copy/{copy_serial_pk.test => copy_pk_serial.test} (85%) diff --git a/dataset/primary-key-tests/eFollows.csv b/dataset/primary-key-tests/eFollows.csv new file mode 100644 index 00000000000..0b7aa099c8e --- /dev/null +++ b/dataset/primary-key-tests/eFollows.csv @@ -0,0 +1,2 @@ +100,10020 +101,10020 diff --git a/dataset/primary-key-tests/int-pk-tests/eKnows.csv b/dataset/primary-key-tests/int-pk-tests/eKnows.csv deleted file mode 100644 index 93c12d94e70..00000000000 --- a/dataset/primary-key-tests/int-pk-tests/eKnows.csv +++ /dev/null @@ -1 +0,0 @@ -0,1 diff --git a/dataset/primary-key-tests/int-pk-tests/vPerson.csv b/dataset/primary-key-tests/int-pk-tests/vPerson.csv deleted file mode 100644 index 1ac32b5cb5e..00000000000 --- a/dataset/primary-key-tests/int-pk-tests/vPerson.csv +++ /dev/null @@ -1,2 +0,0 @@ -0,Foo,1 -1,Bar,0 diff --git a/dataset/primary-key-tests/string-pk-tests/eKnows.csv b/dataset/primary-key-tests/string-pk-tests/eKnows.csv deleted file mode 100644 index 35cfb5915a8..00000000000 --- a/dataset/primary-key-tests/string-pk-tests/eKnows.csv +++ /dev/null @@ -1 +0,0 @@ -Alice,Bob diff --git a/dataset/primary-key-tests/string-pk-tests/vPerson.csv b/dataset/primary-key-tests/string-pk-tests/vPerson.csv deleted file mode 100644 index 1418fe2148b..00000000000 --- a/dataset/primary-key-tests/string-pk-tests/vPerson.csv +++ /dev/null @@ -1,2 +0,0 @@ -Alice,1,Bob -Bob,2,Alice diff --git a/dataset/primary-key-tests/vOrg.csv b/dataset/primary-key-tests/vOrg.csv new file mode 100644 index 00000000000..8a0934b514a --- /dev/null +++ b/dataset/primary-key-tests/vOrg.csv @@ -0,0 +1 @@ +10020 diff --git a/dataset/primary-key-tests/vPerson.csv b/dataset/primary-key-tests/vPerson.csv new file mode 100644 index 00000000000..455e4c66504 --- /dev/null +++ b/dataset/primary-key-tests/vPerson.csv @@ -0,0 +1,2 @@ +100,Foo,10 +101,Bar,11 diff --git a/src/catalog/catalog.cpp b/src/catalog/catalog.cpp index da92718c86a..9b500c73743 100644 --- a/src/catalog/catalog.cpp +++ b/src/catalog/catalog.cpp @@ -147,7 +147,9 @@ uint64_t SerDeser::serializeValue( offset = SerDeser::serializeValue((const TableSchema&)value, fileInfo, offset); offset = SerDeser::serializeValue(value.relMultiplicity, fileInfo, offset); offset = SerDeser::serializeValue(value.srcTableID, fileInfo, offset); - return SerDeser::serializeValue(value.dstTableID, fileInfo, offset); + offset = SerDeser::serializeValue(value.dstTableID, fileInfo, offset); + offset = SerDeser::serializeValue(value.srcPKDataType, fileInfo, offset); + return SerDeser::serializeValue(value.dstPKDataType, fileInfo, offset); } template<> @@ -156,7 +158,9 @@ uint64_t SerDeser::deserializeValue( offset = SerDeser::deserializeValue((TableSchema&)value, fileInfo, offset); offset = SerDeser::deserializeValue(value.relMultiplicity, fileInfo, offset); offset = SerDeser::deserializeValue(value.srcTableID, fileInfo, offset); - return SerDeser::deserializeValue(value.dstTableID, fileInfo, offset); + offset = SerDeser::deserializeValue(value.dstTableID, fileInfo, offset); + offset = SerDeser::deserializeValue(value.srcPKDataType, fileInfo, offset); + return SerDeser::deserializeValue(value.dstPKDataType, fileInfo, offset); } } // namespace common @@ -205,7 +209,8 @@ table_id_t CatalogContent::addNodeTableSchema( } table_id_t CatalogContent::addRelTableSchema(std::string tableName, RelMultiplicity relMultiplicity, - std::vector properties, table_id_t srcTableID, table_id_t dstTableID) { + std::vector properties, table_id_t srcTableID, table_id_t dstTableID, + LogicalType srcPKDataType, LogicalType dstPKDataType) { table_id_t tableID = assignNextTableID(); nodeTableSchemas[srcTableID]->addFwdRelTableID(tableID); nodeTableSchemas[dstTableID]->addBwdRelTableID(tableID); @@ -216,8 +221,9 @@ table_id_t CatalogContent::addRelTableSchema(std::string tableName, RelMultiplic properties[i].propertyID = i; properties[i].tableID = tableID; } - auto relTableSchema = std::make_unique(std::move(tableName), tableID, - relMultiplicity, std::move(properties), srcTableID, dstTableID); + auto relTableSchema = + std::make_unique(std::move(tableName), tableID, relMultiplicity, + std::move(properties), srcTableID, dstTableID, srcPKDataType, dstPKDataType); relTableNameToIDMap[relTableSchema->tableName] = tableID; relTableSchemas[tableID] = std::move(relTableSchema); return tableID; @@ -401,11 +407,11 @@ table_id_t Catalog::addNodeTableSchema( } table_id_t Catalog::addRelTableSchema(std::string tableName, RelMultiplicity relMultiplicity, - const std::vector& propertyDefinitions, table_id_t srcTableID, - table_id_t dstTableID) { + const std::vector& propertyDefinitions, table_id_t srcTableID, table_id_t dstTableID, + LogicalType srcPKDataType, LogicalType dstPKDataType) { initCatalogContentForWriteTrxIfNecessary(); - auto tableID = catalogContentForWriteTrx->addRelTableSchema( - std::move(tableName), relMultiplicity, propertyDefinitions, srcTableID, dstTableID); + auto tableID = catalogContentForWriteTrx->addRelTableSchema(std::move(tableName), + relMultiplicity, propertyDefinitions, srcTableID, dstTableID, srcPKDataType, dstPKDataType); wal->logRelTableRecord(tableID); return tableID; } diff --git a/src/include/catalog/catalog.h b/src/include/catalog/catalog.h index 8dbd94c3546..9a68d8bdd81 100644 --- a/src/include/catalog/catalog.h +++ b/src/include/catalog/catalog.h @@ -40,7 +40,8 @@ class CatalogContent { common::table_id_t addRelTableSchema(std::string tableName, RelMultiplicity relMultiplicity, std::vector properties, common::table_id_t srcTableID, - common::table_id_t dstTableID); + common::table_id_t dstTableID, common::LogicalType srcPKDataType, + common::LogicalType dstPKDataType); inline bool containNodeTable(common::table_id_t tableID) const { return nodeTableSchemas.contains(tableID); @@ -192,7 +193,8 @@ class Catalog { common::table_id_t addRelTableSchema(std::string tableName, RelMultiplicity relMultiplicity, const std::vector& propertyDefinitions, common::table_id_t srcTableID, - common::table_id_t dstTableID); + common::table_id_t dstTableID, common::LogicalType srcPKDataType, + common::LogicalType dstPKDataType); void dropTableSchema(common::table_id_t tableID); diff --git a/src/include/catalog/catalog_structs.h b/src/include/catalog/catalog_structs.h index 1b12ee9d5a7..7bee73eafc3 100644 --- a/src/include/catalog/catalog_structs.h +++ b/src/include/catalog/catalog_structs.h @@ -122,22 +122,17 @@ struct RelTableSchema : TableSchema { RelTableSchema() : TableSchema{"", common::INVALID_TABLE_ID, false /* isNodeTable */, {} /* properties */}, relMultiplicity{MANY_MANY}, srcTableID{common::INVALID_TABLE_ID}, - dstTableID{common::INVALID_TABLE_ID} {} + dstTableID{common::INVALID_TABLE_ID}, srcPKDataType{common::LogicalType{ + common::LogicalTypeID::ANY}}, + dstPKDataType{common::LogicalType{common::LogicalTypeID::ANY}} {} RelTableSchema(std::string tableName, common::table_id_t tableID, RelMultiplicity relMultiplicity, std::vector properties, - common::table_id_t srcTableID, common::table_id_t dstTableID) + common::table_id_t srcTableID, common::table_id_t dstTableID, + common::LogicalType srcPKDataType, common::LogicalType dstPKDataType) : TableSchema{std::move(tableName), tableID, false /* isNodeTable */, std::move(properties)}, - relMultiplicity{relMultiplicity}, srcTableID{srcTableID}, dstTableID{dstTableID} {} - - inline Property& getRelIDDefinition() { - for (auto& property : properties) { - if (property.name == common::InternalKeyword::ID) { - return property; - } - } - throw common::InternalException("Cannot find internal rel ID definition."); - } + relMultiplicity{relMultiplicity}, srcTableID{srcTableID}, dstTableID{dstTableID}, + srcPKDataType{std::move(srcPKDataType)}, dstPKDataType{std::move(dstPKDataType)} {} inline bool isSingleMultiplicityInDirection(common::RelDataDirection direction) const { return relMultiplicity == ONE_ONE || @@ -145,11 +140,6 @@ struct RelTableSchema : TableSchema { (direction == common::RelDataDirection::FWD ? MANY_ONE : ONE_MANY); } - inline uint32_t getNumUserDefinedProperties() const { - // Note: the first column stores the relID property. - return properties.size() - 1; - } - inline bool isSrcOrDstTable(common::table_id_t tableID) const { return srcTableID == tableID || dstTableID == tableID; } @@ -165,6 +155,8 @@ struct RelTableSchema : TableSchema { RelMultiplicity relMultiplicity; common::table_id_t srcTableID; common::table_id_t dstTableID; + common::LogicalType srcPKDataType; + common::LogicalType dstPKDataType; }; } // namespace catalog diff --git a/src/include/processor/operator/copy/read_csv.h b/src/include/processor/operator/copy/read_csv.h index 4a014f3a778..2e4a5a206e7 100644 --- a/src/include/processor/operator/copy/read_csv.h +++ b/src/include/processor/operator/copy/read_csv.h @@ -18,9 +18,9 @@ class ReadCSVMorsel : public ReadFileMorsel { class ReadCSVSharedState : public ReadFileSharedState { public: - ReadCSVSharedState(common::CSVReaderConfig csvReaderConfig, - catalog::NodeTableSchema* tableSchema, std::vector filePaths) - : ReadFileSharedState{std::move(filePaths)}, csvReaderConfig{std::move(csvReaderConfig)}, + ReadCSVSharedState(common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema, + std::vector filePaths) + : ReadFileSharedState{std::move(filePaths)}, csvReaderConfig{csvReaderConfig}, tableSchema{tableSchema} {} private: @@ -30,16 +30,16 @@ class ReadCSVSharedState : public ReadFileSharedState { private: common::CSVReaderConfig csvReaderConfig; - catalog::NodeTableSchema* tableSchema; + catalog::TableSchema* tableSchema; std::shared_ptr reader; }; class ReadCSV : public ReadFile { public: - ReadCSV(std::vector arrowColumnPoses, DataPos offsetVectorPos, + ReadCSV(std::vector arrowColumnPoses, const DataPos& offsetVectorPos, std::shared_ptr sharedState, uint32_t id, const std::string& paramsString) - : ReadFile{std::move(arrowColumnPoses), std::move(offsetVectorPos), std::move(sharedState), + : ReadFile{std::move(arrowColumnPoses), offsetVectorPos, std::move(sharedState), PhysicalOperatorType::READ_CSV, id, paramsString} {} inline std::shared_ptr readTuples( diff --git a/src/include/storage/copier/rel_copier.h b/src/include/storage/copier/rel_copier.h index 1f46372b28a..459c1276c67 100644 --- a/src/include/storage/copier/rel_copier.h +++ b/src/include/storage/copier/rel_copier.h @@ -15,11 +15,9 @@ class RelCopier { public: RelCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, - DirectedInMemRelData* bwdRelData, std::vector pkColumnTypes, - std::vector pkIndexes) + DirectedInMemRelData* bwdRelData, std::vector pkIndexes) : sharedState{std::move(sharedState)}, copyDesc{copyDesc}, schema{schema}, - fwdRelData{fwdRelData}, bwdRelData{bwdRelData}, - pkColumnTypes{std::move(pkColumnTypes)}, pkIndexes{std::move(pkIndexes)} {} + fwdRelData{fwdRelData}, bwdRelData{bwdRelData}, pkIndexes{std::move(pkIndexes)} {} virtual ~RelCopier() = default; void execute(processor::ExecutionContext* executionContext); @@ -61,20 +59,18 @@ class RelCopier { catalog::RelTableSchema* schema; DirectedInMemRelData* fwdRelData; DirectedInMemRelData* bwdRelData; - std::vector pkColumnTypes; // src and dst node pk columns. std::vector pkIndexes; }; class RelListsCounterAndColumnCopier : public RelCopier { -public: +protected: RelListsCounterAndColumnCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, - std::vector pkColumnTypes, std::vector pkIndexes) + std::vector pkIndexes) : RelCopier{std::move(sharedState), copyDesc, schema, fwdRelData, bwdRelData, - std::move(pkColumnTypes), std::move(pkIndexes)} {} + std::move(pkIndexes)} {} -protected: void finalize() override; static void buildRelListsHeaders( @@ -89,13 +85,13 @@ class ParquetRelListsCounterAndColumnsCopier : public RelListsCounterAndColumnCo ParquetRelListsCounterAndColumnsCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, - std::vector pkColumnTypes, std::vector pkIndexes) + std::vector pkIndexes) : RelListsCounterAndColumnCopier{std::move(sharedState), copyDesc, schema, fwdRelData, - bwdRelData, std::move(pkColumnTypes), std::move(pkIndexes)} {} + bwdRelData, std::move(pkIndexes)} {} std::unique_ptr clone() const final { return std::make_unique( - sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkColumnTypes, pkIndexes); + sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes); } private: @@ -111,13 +107,13 @@ class CSVRelListsCounterAndColumnsCopier : public RelListsCounterAndColumnCopier CSVRelListsCounterAndColumnsCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, - std::vector pkColumnTypes, std::vector pkIndexes) + std::vector pkIndexes) : RelListsCounterAndColumnCopier{std::move(sharedState), copyDesc, schema, fwdRelData, - bwdRelData, std::move(pkColumnTypes), std::move(pkIndexes)} {} + bwdRelData, std::move(pkIndexes)} {} std::unique_ptr clone() const final { return std::make_unique( - sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkColumnTypes, pkIndexes); + sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes); } private: @@ -125,13 +121,13 @@ class CSVRelListsCounterAndColumnsCopier : public RelListsCounterAndColumnCopier }; class RelListsCopier : public RelCopier { -public: +protected: RelListsCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, - std::vector pkColumnTypes, std::vector pkIndexes) + std::vector pkIndexes) : RelCopier{std::move(sharedState), copyDesc, schema, fwdRelData, bwdRelData, - std::move(pkColumnTypes), std::move(pkIndexes)} {} + std::move(pkIndexes)} {} private: void finalize() final; @@ -142,13 +138,13 @@ class ParquetRelListsCopier : public RelListsCopier { ParquetRelListsCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, - std::vector pkColumnTypes, std::vector pkIndexes) + std::vector pkIndexes) : RelListsCopier{std::move(sharedState), copyDesc, schema, fwdRelData, bwdRelData, - std::move(pkColumnTypes), std::move(pkIndexes)} {} + std::move(pkIndexes)} {} std::unique_ptr clone() const final { return std::make_unique( - sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkColumnTypes, pkIndexes); + sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes); } private: @@ -164,13 +160,13 @@ class CSVRelListsCopier : public RelListsCopier { CSVRelListsCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, - std::vector pkColumnTypes, std::vector pkIndexes) + std::vector pkIndexes) : RelListsCopier{std::move(sharedState), copyDesc, schema, fwdRelData, bwdRelData, - std::move(pkColumnTypes), std::move(pkIndexes)} {} + std::move(pkIndexes)} {} std::unique_ptr clone() const final { return std::make_unique( - sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkColumnTypes, pkIndexes); + sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes); } private: diff --git a/src/include/storage/copier/rel_copy_executor.h b/src/include/storage/copier/rel_copy_executor.h index 1017885c98c..c436ed08fac 100644 --- a/src/include/storage/copier/rel_copy_executor.h +++ b/src/include/storage/copier/rel_copy_executor.h @@ -41,8 +41,9 @@ class DirectedInMemRelData { class RelCopyExecutor { public: RelCopyExecutor(common::CopyDescription& copyDescription, WAL* wal, - common::TaskScheduler& taskScheduler, catalog::Catalog& catalog, - storage::NodesStore& nodesStore, storage::RelTable* table, RelsStatistics* relsStatistics); + common::TaskScheduler& taskScheduler, storage::NodesStore& nodesStore, + storage::RelTable* table, catalog::RelTableSchema* tableSchema, + RelsStatistics* relsStatistics); common::offset_t copy(processor::ExecutionContext* executionContext); @@ -62,7 +63,6 @@ class RelCopyExecutor { std::string outputDirectory; std::unordered_map fileBlockInfos; common::TaskScheduler& taskScheduler; - catalog::Catalog& catalog; catalog::RelTableSchema* tableSchema; uint64_t numTuples; RelsStatistics* relsStatistics; diff --git a/src/processor/operator/copy/copy_rel.cpp b/src/processor/operator/copy/copy_rel.cpp index 6cf160b57d8..aea59f74a4e 100644 --- a/src/processor/operator/copy/copy_rel.cpp +++ b/src/processor/operator/copy/copy_rel.cpp @@ -9,8 +9,9 @@ namespace processor { uint64_t CopyRel::executeInternal( kuzu::common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) { + auto tableSchema = catalog->getReadOnlyVersion()->getRelTableSchema(tableID); auto relCopier = std::make_unique( - copyDescription, wal, *taskScheduler, *catalog, nodesStore, table, relsStatistics); + copyDescription, wal, *taskScheduler, nodesStore, table, tableSchema, relsStatistics); auto numRelsCopied = relCopier->copy(executionContext); return numRelsCopied; } diff --git a/src/processor/operator/ddl/create_rel_table.cpp b/src/processor/operator/ddl/create_rel_table.cpp index 6e869301e4d..6b297cfc9b3 100644 --- a/src/processor/operator/ddl/create_rel_table.cpp +++ b/src/processor/operator/ddl/create_rel_table.cpp @@ -8,8 +8,12 @@ namespace kuzu { namespace processor { void CreateRelTable::executeDDLInternal() { - auto newRelTableID = - catalog->addRelTableSchema(tableName, relMultiplicity, properties, srcTableID, dstTableID); + auto srcPKDataType = + catalog->getReadOnlyVersion()->getNodeTableSchema(srcTableID)->getPrimaryKey().dataType; + auto dstPKDataType = + catalog->getReadOnlyVersion()->getNodeTableSchema(dstTableID)->getPrimaryKey().dataType; + auto newRelTableID = catalog->addRelTableSchema(tableName, relMultiplicity, properties, + srcTableID, dstTableID, srcPKDataType, dstPKDataType); relsStatistics->addTableStatistic(catalog->getWriteVersion()->getRelTableSchema(newRelTableID)); } diff --git a/src/storage/copier/rel_copier.cpp b/src/storage/copier/rel_copier.cpp index 45d2918d8eb..c34e81ba536 100644 --- a/src/storage/copier/rel_copier.cpp +++ b/src/storage/copier/rel_copier.cpp @@ -264,9 +264,9 @@ void ParquetRelListsCounterAndColumnsCopier::executeInternal(std::unique_ptr boundPKOffsets, adjPKOffsets; boundPKOffsets.resize(numTuples); adjPKOffsets.resize(numTuples); - indexLookup(recordBatch->column(0).get(), pkColumnTypes[0], pkIndexes[0], + indexLookup(recordBatch->column(0).get(), schema->srcPKDataType, pkIndexes[0], (offset_t*)boundPKOffsets.data()); - indexLookup(recordBatch->column(1).get(), pkColumnTypes[1], pkIndexes[1], + indexLookup(recordBatch->column(1).get(), schema->dstPKDataType, pkIndexes[1], (offset_t*)adjPKOffsets.data()); std::vector> pkOffsetsArrays(2); pkOffsetsArrays[0] = createArrowPrimitiveArray( @@ -286,8 +286,9 @@ void CSVRelListsCounterAndColumnsCopier::executeInternal(std::unique_ptrcolumn(0).get(), pkColumnTypes[0], pkIndexes[0], boundPKOffsets.data()); - indexLookup(recordBatch->column(1).get(), pkColumnTypes[1], pkIndexes[1], adjPKOffsets.data()); + recordBatch->column(0).get(), schema->srcPKDataType, pkIndexes[0], boundPKOffsets.data()); + indexLookup( + recordBatch->column(1).get(), schema->dstPKDataType, pkIndexes[1], adjPKOffsets.data()); std::vector> pkOffsets(2); pkOffsets[0] = createArrowPrimitiveArray( std::make_shared(), (uint8_t*)boundPKOffsets.data(), numTuples); @@ -329,8 +330,9 @@ void ParquetRelListsCopier::executeInternal(std::unique_ptr morsel) boundPKOffsets.resize(numTuples); adjPKOffsets.resize(numTuples); indexLookup( - recordBatch->column(0).get(), pkColumnTypes[0], pkIndexes[0], boundPKOffsets.data()); - indexLookup(recordBatch->column(1).get(), pkColumnTypes[1], pkIndexes[1], adjPKOffsets.data()); + recordBatch->column(0).get(), schema->srcPKDataType, pkIndexes[0], boundPKOffsets.data()); + indexLookup( + recordBatch->column(1).get(), schema->dstPKDataType, pkIndexes[1], adjPKOffsets.data()); std::vector> pkOffsets(2); pkOffsets[0] = createArrowPrimitiveArray( std::make_shared(), (uint8_t*)boundPKOffsets.data(), numTuples); @@ -353,8 +355,9 @@ void CSVRelListsCopier::executeInternal(std::unique_ptr morsel) { boundPKOffsets.resize(numTuples); adjPKOffsets.resize(numTuples); indexLookup( - recordBatch->column(0).get(), pkColumnTypes[0], pkIndexes[0], boundPKOffsets.data()); - indexLookup(recordBatch->column(1).get(), pkColumnTypes[1], pkIndexes[1], adjPKOffsets.data()); + recordBatch->column(0).get(), schema->srcPKDataType, pkIndexes[0], boundPKOffsets.data()); + indexLookup( + recordBatch->column(1).get(), schema->dstPKDataType, pkIndexes[1], adjPKOffsets.data()); std::vector> pkOffsets(2); pkOffsets[0] = createArrowPrimitiveArray( std::make_shared(), (uint8_t*)boundPKOffsets.data(), numTuples); diff --git a/src/storage/copier/rel_copy_executor.cpp b/src/storage/copier/rel_copy_executor.cpp index 5378a592e6e..427d364e4fc 100644 --- a/src/storage/copier/rel_copy_executor.cpp +++ b/src/storage/copier/rel_copy_executor.cpp @@ -9,12 +9,11 @@ namespace kuzu { namespace storage { RelCopyExecutor::RelCopyExecutor(CopyDescription& copyDescription, WAL* wal, - TaskScheduler& taskScheduler, Catalog& catalog, NodesStore& nodesStore, RelTable* table, - RelsStatistics* relsStatistics) + TaskScheduler& taskScheduler, NodesStore& nodesStore, RelTable* table, + RelTableSchema* tableSchema, RelsStatistics* relsStatistics) : copyDescription{copyDescription}, wal{wal}, outputDirectory{std::move(wal->getDirectory())}, - taskScheduler{taskScheduler}, catalog{catalog}, - tableSchema{catalog.getReadOnlyVersion()->getRelTableSchema(table->getRelTableID())}, - numTuples{0}, nodesStore{nodesStore}, table{table}, relsStatistics{relsStatistics} { + taskScheduler{taskScheduler}, tableSchema{tableSchema}, numTuples{0}, + nodesStore{nodesStore}, table{table}, relsStatistics{relsStatistics} { // Initialize rel data. fwdRelData = initializeDirectedInMemRelData(FWD); bwdRelData = initializeDirectedInMemRelData(BWD); @@ -105,33 +104,19 @@ void RelCopyExecutor::populateRelLists(processor::ExecutionContext* executionCon std::unique_ptr RelCopyExecutor::createRelCopier(RelCopierType relCopierType) { std::shared_ptr sharedState; std::unique_ptr relCopier; - std::vector pkColumnTypes{2}; - pkColumnTypes[0] = catalog.getReadOnlyVersion() - ->getNodeTableSchema(tableSchema->getBoundTableID(FWD)) - ->getPrimaryKey() - .dataType; - pkColumnTypes[1] = catalog.getReadOnlyVersion() - ->getNodeTableSchema(tableSchema->getBoundTableID(BWD)) - ->getPrimaryKey() - .dataType; switch (copyDescription.fileType) { case CopyDescription::FileType::CSV: { sharedState = std::make_shared(copyDescription.filePaths, fileBlockInfos, copyDescription.csvReaderConfig.get(), tableSchema); switch (relCopierType) { case RelCopierType::REL_COLUMN_COPIER_AND_LIST_COUNTER: { - relCopier = - std::make_unique(sharedState, copyDescription, - tableSchema, fwdRelData.get(), bwdRelData.get(), pkColumnTypes, pkIndexes); + relCopier = std::make_unique(sharedState, + copyDescription, tableSchema, fwdRelData.get(), bwdRelData.get(), pkIndexes); } break; case RelCopierType::REL_LIST_COPIER: { relCopier = std::make_unique(std::move(sharedState), copyDescription, - tableSchema, fwdRelData.get(), bwdRelData.get(), pkColumnTypes, pkIndexes); + tableSchema, fwdRelData.get(), bwdRelData.get(), pkIndexes); } break; - default: { - throw NotImplementedException( - "Unsupported RelCopierType in RelCopyExecutor::createRelCopier."); - } } } break; case CopyDescription::FileType::PARQUET: { @@ -140,18 +125,12 @@ std::unique_ptr RelCopyExecutor::createRelCopier(RelCopierType relCop switch (relCopierType) { case RelCopierType::REL_COLUMN_COPIER_AND_LIST_COUNTER: { relCopier = std::make_unique(sharedState, - copyDescription, tableSchema, fwdRelData.get(), bwdRelData.get(), pkColumnTypes, - pkIndexes); + copyDescription, tableSchema, fwdRelData.get(), bwdRelData.get(), pkIndexes); } break; case RelCopierType::REL_LIST_COPIER: { - relCopier = - std::make_unique(std::move(sharedState), copyDescription, - tableSchema, fwdRelData.get(), bwdRelData.get(), pkColumnTypes, pkIndexes); + relCopier = std::make_unique(std::move(sharedState), + copyDescription, tableSchema, fwdRelData.get(), bwdRelData.get(), pkIndexes); } break; - default: { - throw NotImplementedException( - "Unsupported RelCopierType in RelCopyExecutor::createRelCopier."); - } } } break; default: { diff --git a/src/storage/copier/table_copy_utils.cpp b/src/storage/copier/table_copy_utils.cpp index d6a2d01b974..d71354ff8e6 100644 --- a/src/storage/copier/table_copy_utils.cpp +++ b/src/storage/copier/table_copy_utils.cpp @@ -192,6 +192,13 @@ std::shared_ptr TableCopyUtils::createCSVReader( // Only the empty string is treated as NULL. csvConvertOptions.null_values = {""}; csvConvertOptions.quoted_strings_can_be_null = false; + if (!tableSchema->isNodeTable) { + auto relTableSchema = (RelTableSchema*)tableSchema; + csvConvertOptions.column_types[std::string(Property::REL_FROM_PROPERTY_NAME)] = + toArrowDataType(relTableSchema->srcPKDataType); + csvConvertOptions.column_types[std::string(Property::REL_TO_PROPERTY_NAME)] = + toArrowDataType(relTableSchema->dstPKDataType); + } for (auto& property : tableSchema->properties) { if (property.name == Property::REL_FROM_PROPERTY_NAME || property.name == Property::REL_TO_PROPERTY_NAME) { @@ -345,6 +352,7 @@ std::shared_ptr TableCopyUtils::toArrowDataType(const LogicalTy case LogicalTypeID::BOOL: { return arrow::boolean(); } + case LogicalTypeID::SERIAL: case LogicalTypeID::INT64: { return arrow::int64(); } diff --git a/test/runner/e2e_ddl_test.cpp b/test/runner/e2e_ddl_test.cpp index 1da81987194..fd37f5a7540 100644 --- a/test/runner/e2e_ddl_test.cpp +++ b/test/runner/e2e_ddl_test.cpp @@ -12,95 +12,6 @@ using namespace kuzu::testing; namespace kuzu { namespace testing { -class PrimaryKeyTest : public EmptyDBTest { -public: - void SetUp() override { - EmptyDBTest::SetUp(); - createDBAndConn(); - } -}; - -class IntPrimaryKeyTest : public PrimaryKeyTest { -public: - std::string getInputDir() override { - return TestHelper::appendKuzuRootPath("dataset/primary-key-tests/int-pk-tests/"); - } - - void testPrimaryKey(std::string pkColName) { - conn->query("CREATE NODE TABLE Person(firstIntCol INT64, name STRING, secondIntCol INT64, " - "PRIMARY KEY (" + - pkColName + "))"); - conn->query("CREATE REL TABLE Knows(From Person TO Person)"); - conn->query( - "COPY Person FROM \"" + - TestHelper::appendKuzuRootPath("dataset/primary-key-tests/int-pk-tests/vPerson.csv\"")); - conn->query( - "COPY Knows FROM \"" + - TestHelper::appendKuzuRootPath("dataset/primary-key-tests/int-pk-tests/eKnows.csv\"")); - auto tuple = conn->query("MATCH (a:Person)-[e:Knows]->(b:Person) WHERE a.firstIntCol = 0" - "RETURN COUNT(*)") - ->getNext(); - // Edge is from 0->1, and when the primary key is firstIntCol, we expect to find 1 result. - // If key is secondIntCol we expect to find 0 result. - if (pkColName == "firstIntCol") { - ASSERT_EQ(tuple->getValue(0)->getValue(), 1); - } else { - ASSERT_EQ(tuple->getValue(0)->getValue(), 0); - } - tuple = conn->query("MATCH (a:Person)-[e:Knows]->(b:Person) WHERE a.firstIntCol = 1 " - "RETURN COUNT(*)") - ->getNext(); - // Edge is from 0->1, and when the primary key is firstIntCol, we expect to find 0 result. - // If key is secondIntCol we expect to find 1 result. - if (pkColName == "firstIntCol") { - ASSERT_EQ(tuple->getValue(0)->getValue(), 0); - } else { - ASSERT_EQ(tuple->getValue(0)->getValue(), 1); - } - } -}; - -class StringPrimaryKeyTest : public PrimaryKeyTest { -public: - std::string getInputDir() override { - return TestHelper::appendKuzuRootPath("dataset/primary-key-tests/string-pk-tests/"); - } - - void testPrimaryKey(std::string pkColName) { - conn->query("CREATE NODE TABLE Person(firstStrCol STRING, age INT64, secondStrCol STRING, " - "PRIMARY KEY (" + - pkColName + "))"); - conn->query("CREATE REL TABLE Knows(From Person TO Person)"); - conn->query( - "COPY Person FROM \"" + TestHelper::appendKuzuRootPath( - "dataset/primary-key-tests/string-pk-tests/vPerson.csv\"")); - conn->query( - "COPY Knows FROM \"" + TestHelper::appendKuzuRootPath( - "dataset/primary-key-tests/string-pk-tests/eKnows.csv\"")); - auto tuple = - conn->query("MATCH (a:Person)-[e:Knows]->(b:Person) WHERE a.firstStrCol = \"Alice\" " - "RETURN COUNT(*)") - ->getNext(); - // Edge is from "Alice"->"Bob", and when the primary key is firstStrCol, we expect to find 1 - // result. If key is secondStrCol we expect to find 0 result. - if (pkColName == "firstStrCol") { - ASSERT_EQ(tuple->getValue(0)->getValue(), 1); - } else { - ASSERT_EQ(tuple->getValue(0)->getValue(), 0); - } - tuple = conn->query("MATCH (a:Person)-[e:Knows]->(b:Person) WHERE a.firstStrCol = \"Bob\" " - "RETURN COUNT(*)") - ->getNext(); - // Edge is from "Alice"->"Bob", and when the primary key is firstStrCol, we expect to find 0 - // result. If key is secondStrCol we expect to find 1 result. - if (pkColName == "firstStrCol") { - ASSERT_EQ(tuple->getValue(0)->getValue(), 0); - } else { - ASSERT_EQ(tuple->getValue(0)->getValue(), 1); - } - } -}; - class TinySnbDDLTest : public DBTest { public: @@ -584,22 +495,6 @@ class TinySnbDDLTest : public DBTest { table_id_t studyAtTableID; }; -TEST_F(StringPrimaryKeyTest, PrimaryKeyFirstColumn) { - testPrimaryKey("firstStrCol"); -} - -TEST_F(StringPrimaryKeyTest, PrimaryKeySecondColumn) { - testPrimaryKey("secondStrCol"); -} - -TEST_F(IntPrimaryKeyTest, PrimaryKeyFirstColumn) { - testPrimaryKey("firstIntCol"); -} - -TEST_F(IntPrimaryKeyTest, PrimaryKeySecondColumn) { - testPrimaryKey("secondIntCol"); -} - TEST_F(TinySnbDDLTest, DDLStatementWithActiveTransactionError) { ddlStatementsInsideActiveTransactionErrorTest( "CREATE NODE TABLE UNIVERSITY(NAME STRING, WEBSITE " diff --git a/test/test_files/copy/copy_pk_basic.test b/test/test_files/copy/copy_pk_basic.test new file mode 100644 index 00000000000..a4ae81bd36f --- /dev/null +++ b/test/test_files/copy/copy_pk_basic.test @@ -0,0 +1,44 @@ +-GROUP CopyNodeInitRelTablesTest +-DATASET CSV empty + +-- + +-DEFINE_STATEMENT_BLOCK CREATE_REL_AND_COPY_NODES_AND_VALIDATE [ +-STATEMENT create rel table follows (FROM person TO org) +---- ok +-STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/primary-key-tests/vPerson.csv"; +---- ok +-STATEMENT COPY org FROM "${KUZU_ROOT_DIRECTORY}/dataset/primary-key-tests/vOrg.csv"; +---- ok +-STATEMENT COPY follows FROM "${KUZU_ROOT_DIRECTORY}/dataset/primary-key-tests/eFollows.csv"; +---- ok +-STATEMENT MATCH (p:person)-[:follows]->(o:org) return p.fName, o.ID +---- 2 +Foo|10020 +Bar|10020 +] + +-CASE CopyIntPK + +-STATEMENT create node table person (ID INT64, fName STRING, age INT64, PRIMARY KEY (ID)); +---- ok +-STATEMENT create node table org (ID INT64, PRIMARY KEY (ID)); +---- ok +-INSERT_STATEMENT_BLOCK CREATE_REL_AND_COPY_NODES_AND_VALIDATE + + +-CASE CopyStringPK + +-STATEMENT create node table person (ID STRING, fName STRING, age INT64, PRIMARY KEY (ID)); +---- ok +-STATEMENT create node table org (ID STRING, PRIMARY KEY (ID)); +---- ok +-INSERT_STATEMENT_BLOCK CREATE_REL_AND_COPY_NODES_AND_VALIDATE + +-CASE CopyMixedStringAndIntPK + +-STATEMENT create node table person (ID INT64, fName STRING, age INT64, PRIMARY KEY (ID)); +---- ok +-STATEMENT create node table org (ID STRING, PRIMARY KEY (ID)); +---- ok +-INSERT_STATEMENT_BLOCK CREATE_REL_AND_COPY_NODES_AND_VALIDATE diff --git a/test/test_files/long_string_pk/long_string_pk.test b/test/test_files/copy/copy_pk_long_string.test similarity index 89% rename from test/test_files/long_string_pk/long_string_pk.test rename to test/test_files/copy/copy_pk_long_string.test index a7ae40d06d9..db21f71e5dc 100644 --- a/test/test_files/long_string_pk/long_string_pk.test +++ b/test/test_files/copy/copy_pk_long_string.test @@ -3,7 +3,7 @@ -- --CASE LongStringPKTest +-CASE CopyLongStringPK -LOG LongStringPKTest -STATEMENT MATCH (a:Person)-[e:Knows]->(b:Person) WHERE a.name = "AAAAAAAAAAAAAAAAAAAA" RETURN COUNT(*) diff --git a/test/test_files/long_string_pk/long_string_pk_parquet.test b/test/test_files/copy/copy_pk_long_string_parquet.test similarity index 87% rename from test/test_files/long_string_pk/long_string_pk_parquet.test rename to test/test_files/copy/copy_pk_long_string_parquet.test index d71febed817..c70f0e7058e 100644 --- a/test/test_files/long_string_pk/long_string_pk_parquet.test +++ b/test/test_files/copy/copy_pk_long_string_parquet.test @@ -3,7 +3,7 @@ -- --CASE LongStringPKTestParquet +-CASE CopyPLongStringPKParquet -LOG LongStringPKTestParquet -STATEMENT MATCH (a:Person)-[e:Knows]->(b:Person) WHERE a.name = "AAAAAAAAAAAAAAAAAAAA" RETURN COUNT(*) diff --git a/test/test_files/copy/copy_serial_pk.test b/test/test_files/copy/copy_pk_serial.test similarity index 85% rename from test/test_files/copy/copy_serial_pk.test rename to test/test_files/copy/copy_pk_serial.test index bd38799019a..d82070d6626 100644 --- a/test/test_files/copy/copy_serial_pk.test +++ b/test/test_files/copy/copy_pk_serial.test @@ -3,7 +3,7 @@ -- --CASE CopySerialPKTest +-CASE CopySerialPK -STATEMENT MATCH (:person)-[e:knows]->(:person) RETURN COUNT(*) ---- 1 14