From 1d7616d3d1c19a6dc6b80cad1d8e072aa47b44aa Mon Sep 17 00:00:00 2001 From: Benjamin Winger Date: Wed, 3 Apr 2024 15:56:10 -0400 Subject: [PATCH] Enable multi copy for node tables --- dataset/tinysnb/copy.cypher | 2 +- dataset/tinysnb/vPerson.csv | 4 - dataset/tinysnb/vPerson2.csv | 4 + src/common/exception/message.cpp | 4 - src/include/common/exception/message.h | 2 - src/include/main/client_config.h | 3 - src/include/main/settings.h | 12 -- .../operator/persistent/batch_insert.h | 3 - .../operator/persistent/node_batch_insert.h | 28 +++- src/include/storage/index/hash_index.h | 15 +- .../storage/local_storage/local_node_table.h | 3 + .../storage/local_storage/local_table.h | 5 +- .../storage/store/chunked_node_group.h | 10 +- src/include/storage/store/node_table.h | 18 +++ src/include/storage/store/node_table_data.h | 23 +++- src/main/client_context.cpp | 1 - src/main/db_config.cpp | 2 +- .../operator/persistent/CMakeLists.txt | 1 - .../operator/persistent/batch_insert.cpp | 18 --- .../operator/persistent/node_batch_insert.cpp | 130 +++++++++++++----- .../operator/persistent/rel_batch_insert.cpp | 9 +- src/storage/local_storage/local_table.cpp | 27 +++- .../storage_structure/db_file_utils.cpp | 4 +- src/storage/storage_structure/disk_array.cpp | 22 +-- src/storage/store/chunked_node_group.cpp | 8 +- src/storage/store/column_chunk.cpp | 1 + src/storage/store/node_table.cpp | 5 + src/storage/store/node_table_data.cpp | 44 ++++-- test/CMakeLists.txt | 1 + test/copy/CMakeLists.txt | 1 + test/copy/multi_copy_test.cpp | 111 +++++++++++++++ test/test_files/copy/copy_transaction.test | 12 +- test/test_files/copy/multi_copy_node.test | 20 +++ test/test_files/demo_db/demo_db_set_copy.test | 6 - .../exceptions/copy/auto_commit.test | 7 +- .../exceptions/copy/wrong_header.test | 9 -- .../tinysnb/load_from/load_from.test | 14 +- test/test_files/transaction/copy/copy.test | 16 +-- 38 files changed, 416 insertions(+), 189 deletions(-) create mode 100644 dataset/tinysnb/vPerson2.csv delete mode 100644 src/processor/operator/persistent/batch_insert.cpp create mode 100644 test/copy/CMakeLists.txt create mode 100644 test/copy/multi_copy_test.cpp create mode 100644 test/test_files/copy/multi_copy_node.test diff --git a/dataset/tinysnb/copy.cypher b/dataset/tinysnb/copy.cypher index 1e852119b3c..ad7161f3357 100644 --- a/dataset/tinysnb/copy.cypher +++ b/dataset/tinysnb/copy.cypher @@ -1,5 +1,5 @@ -CALL ENABLE_MULTI_COPY=true COPY person FROM "dataset/tinysnb/vPerson.csv" (HeaDER=true, deLim=','); +COPY person FROM "dataset/tinysnb/vPerson2.csv" (deLim=','); COPY organisation FROM "dataset/tinysnb/vOrganisation.csv"; COPY movies FROM "dataset/tinysnb/vMovies.csv"; COPY knows FROM "dataset/tinysnb/eKnows.csv"; diff --git a/dataset/tinysnb/vPerson.csv b/dataset/tinysnb/vPerson.csv index aa00a7c05a7..eb27e781f3f 100644 --- a/dataset/tinysnb/vPerson.csv +++ b/dataset/tinysnb/vPerson.csv @@ -3,7 +3,3 @@ id,fname,Gender,ISStudent,isWorker,age,eyeSight,birthdate,registerTime,lastJobDu 2,Bob,2,true,false,30,5.1,1900-01-01,2008-11-03 13:25:30.000526-02:00,10 years 5 months 13 hours 24 us,"[12,8]","[Bobby]","[[8,9],[9,10]]","[98,42,93,88]",0.99,{a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a12} 3,Carol,1,false,true,45,5.0,1940-06-22,1911-08-20 02:32:21,48 hours 24 minutes 11 seconds,"[4,5]","[Carmen,Fred]","[[8,10]]","[91,75,21,95]",1.00,a0eebc999c0b4ef8bb6d6bb9bd380a13 5,Dan,2,false,true,20,4.8,1950-7-23,2031-11-30 12:25:30Z,10 years 5 months 13 hours 24 us,"[1,9]","[Wolfeschlegelstein,Daniel]","[[7,4],[8,8],[9]]","[76,88,99,89]",1.30,a0ee-bc99-9c0b-4ef8-bb6d-6bb9-bd38-0a14 -7,Elizabeth,1,false,true,20,4.7,1980-10-26,1976-12-23 11:21:42,48 hours 24 minutes 11 seconds,"[2]","[Ein]","[[6],[7],[8]]","[96,59,65,88]",1.463,{a0eebc99-9c0b4ef8-bb6d6bb9-bd380a15} -8,Farooq,2,true,false,25,4.5,1980-10-26,1972-07-31 13:22:30.678559,18 minutes 24 milliseconds,"[3,4,5,6,7]","[Fesdwe]","[[8]]","[80,78,34,83]",1.51,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A16 -9,Greg,2,false,false,40,4.9,1980-10-26,1976-12-23 11:21:42Z+06:40,10 years 5 months 13 hours 24 us,"[1]","[Grad]","[[10]]","[43,83,67,43]",1.6,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A17 -10,Hubert Blaine Wolfeschlegelsteinhausenbergerdorff,2,false,true,83,4.9,1990-11-27,2023-02-21 13:25:30,3 years 2 days 13 hours 2 minutes,"[10,11,12,3,4,5,6,7]","[Ad,De,Hi,Kye,Orlan]","[[7],[10],[6,7]]","[77,64,100,54]",1.323,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A18 diff --git a/dataset/tinysnb/vPerson2.csv b/dataset/tinysnb/vPerson2.csv new file mode 100644 index 00000000000..2665c1ff62a --- /dev/null +++ b/dataset/tinysnb/vPerson2.csv @@ -0,0 +1,4 @@ +7,Elizabeth,1,false,true,20,4.7,1980-10-26,1976-12-23 11:21:42,48 hours 24 minutes 11 seconds,"[2]","[Ein]","[[6],[7],[8]]","[96,59,65,88]",1.463,{a0eebc99-9c0b4ef8-bb6d6bb9-bd380a15} +8,Farooq,2,true,false,25,4.5,1980-10-26,1972-07-31 13:22:30.678559,18 minutes 24 milliseconds,"[3,4,5,6,7]","[Fesdwe]","[[8]]","[80,78,34,83]",1.51,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A16 +9,Greg,2,false,false,40,4.9,1980-10-26,1976-12-23 11:21:42Z+06:40,10 years 5 months 13 hours 24 us,"[1]","[Grad]","[[10]]","[43,83,67,43]",1.6,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A17 +10,Hubert Blaine Wolfeschlegelsteinhausenbergerdorff,2,false,true,83,4.9,1990-11-27,2023-02-21 13:25:30,3 years 2 days 13 hours 2 minutes,"[10,11,12,3,4,5,6,7]","[Ad,De,Hi,Kye,Orlan]","[[7],[10],[6,7]]","[77,64,100,54]",1.323,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A18 diff --git a/src/common/exception/message.cpp b/src/common/exception/message.cpp index a57f873d8b6..842b7b157dc 100644 --- a/src/common/exception/message.cpp +++ b/src/common/exception/message.cpp @@ -25,10 +25,6 @@ std::string ExceptionMessage::nullPKException() { return "Found NULL, which violates the non-null constraint of the primary key column."; } -std::string ExceptionMessage::notAllowCopyOnNonEmptyTableException() { - return "COPY commands can only be executed once on a table."; -} - std::string ExceptionMessage::overLargeStringPKValueException(uint64_t length) { return stringFormat("The maximum length of primary key strings is 262144 bytes. The input " "string's length was {}.", diff --git a/src/include/common/exception/message.h b/src/include/common/exception/message.h index 99028d515fc..5abda038598 100644 --- a/src/include/common/exception/message.h +++ b/src/include/common/exception/message.h @@ -13,8 +13,6 @@ struct ExceptionMessage { static std::string nonExistentPKException(const std::string& pkString); static std::string invalidPKType(const std::string& type); static std::string nullPKException(); - // Bulk insertion. - static std::string notAllowCopyOnNonEmptyTableException(); // Long string. static std::string overLargeStringPKValueException(uint64_t length); static std::string overLargeStringValueException(uint64_t length); diff --git a/src/include/main/client_config.h b/src/include/main/client_config.h index 7bf478a992d..ee6246bfaf9 100644 --- a/src/include/main/client_config.h +++ b/src/include/main/client_config.h @@ -25,8 +25,6 @@ struct ClientConfig { bool enableProgressBar; // time before displaying progress bar uint64_t showProgressAfter; - // If multi copy is enabled. - bool enableMultiCopy; // Semantic for recursive pattern, can be either WALK, TRAIL, ACYCLIC common::PathSemantic recursivePatternSemantic; // Scale factor for recursive pattern cardinality estimation. @@ -40,7 +38,6 @@ struct ClientConfigDefault { static constexpr bool ENABLE_SEMI_MASK = true; static constexpr bool ENABLE_PROGRESS_BAR = true; static constexpr uint64_t SHOW_PROGRESS_AFTER = 1000; - static constexpr bool ENABLE_MULTI_COPY = false; static constexpr common::PathSemantic RECURSIVE_PATTERN_SEMANTIC = common::PathSemantic::WALK; static constexpr uint32_t RECURSIVE_PATTERN_FACTOR = 1; }; diff --git a/src/include/main/settings.h b/src/include/main/settings.h index d898cc9d0ab..de008821b55 100644 --- a/src/include/main/settings.h +++ b/src/include/main/settings.h @@ -104,18 +104,6 @@ struct FileSearchPathSetting { } }; -struct EnableMultiCopySetting { - static constexpr const char* name = "enable_multi_copy"; - static constexpr const common::LogicalTypeID inputType = common::LogicalTypeID::BOOL; - static void setContext(ClientContext* context, const common::Value& parameter) { - parameter.validateType(inputType); - context->getClientConfigUnsafe()->enableMultiCopy = parameter.getValue(); - } - static common::Value getSetting(ClientContext* context) { - return common::Value(context->getClientConfig()->enableMultiCopy); - } -}; - struct RecursivePatternSemanticSetting { static constexpr const char* name = "recursive_pattern_semantic"; static constexpr const common::LogicalTypeID inputType = common::LogicalTypeID::STRING; diff --git a/src/include/processor/operator/persistent/batch_insert.h b/src/include/processor/operator/persistent/batch_insert.h index 8f675d8967a..b42d816e842 100644 --- a/src/include/processor/operator/persistent/batch_insert.h +++ b/src/include/processor/operator/persistent/batch_insert.h @@ -73,9 +73,6 @@ class BatchInsert : public Sink { inline std::shared_ptr getSharedState() const { return sharedState; } -protected: - void checkIfTableIsEmpty(); - protected: std::unique_ptr info; std::shared_ptr sharedState; diff --git a/src/include/processor/operator/persistent/node_batch_insert.h b/src/include/processor/operator/persistent/node_batch_insert.h index 43cd00704ef..a6dbd96de13 100644 --- a/src/include/processor/operator/persistent/node_batch_insert.h +++ b/src/include/processor/operator/persistent/node_batch_insert.h @@ -1,15 +1,19 @@ #pragma once #include "common/cast.h" +#include "common/types/internal_id_t.h" +#include "common/types/types.h" #include "processor/operator/aggregate/hash_aggregate.h" #include "processor/operator/call/in_query_call.h" #include "processor/operator/persistent/batch_insert.h" #include "processor/operator/persistent/index_builder.h" #include "storage/store/chunked_node_group.h" #include "storage/store/node_table.h" +#include "transaction/transaction.h" namespace kuzu { namespace processor { +struct ExecutionContext; struct NodeBatchInsertInfo final : public BatchInsertInfo { std::vector columnPositions; @@ -64,9 +68,6 @@ struct NodeBatchInsertSharedState final : public BatchInsertSharedState { inline uint64_t getCurNodeGroupIdx() const { return currentNodeGroupIdx; } - void appendIncompleteNodeGroup(std::unique_ptr localNodeGroup, - std::optional& indexBuilder); - inline common::offset_t getNextNodeGroupIdxWithoutLock() { return currentNodeGroupIdx++; } void calculateNumTuples(); @@ -109,12 +110,29 @@ class NodeBatchInsert final : public BatchInsert { resultSetDescriptor->copy(), children[0]->clone(), id, paramsString); } - static void writeAndResetNodeGroup(common::node_group_idx_t nodeGroupIdx, + static void writeAndResetNewNodeGroup(common::node_group_idx_t nodeGroupIdx, std::optional& indexBuilder, common::column_id_t pkColumnID, storage::NodeTable* table, storage::ChunkedNodeGroup* nodeGroup); + // The node group will be reset so that the only values remaining are the ones which were not + // written + void writeAndResetNodeGroup(common::node_group_idx_t nodeGroupIdx, ExecutionContext* context, + std::unique_ptr& nodeGroup, + std::optional& indexBuilder); + private: - void copyToNodeGroup(); + // Returns the number of nodes written from the group + uint64_t writeToExistingNodeGroup(transaction::Transaction* transaction, + common::node_group_idx_t nodeGroupIdx, std::optional& indexBuilder, + common::column_id_t pkColumnID, storage::NodeTable* table, + storage::ChunkedNodeGroup* nodeGroup); + + void appendIncompleteNodeGroup(std::unique_ptr localNodeGroup, + std::optional& indexBuilder, ExecutionContext* context); + void clearToIndex(std::unique_ptr& nodeGroup, + common::offset_t startIndexInGroup); + + void copyToNodeGroup(ExecutionContext* context); }; } // namespace processor diff --git a/src/include/storage/index/hash_index.h b/src/include/storage/index/hash_index.h index 98d8d805ad5..8129b53505c 100644 --- a/src/include/storage/index/hash_index.h +++ b/src/include/storage/index/hash_index.h @@ -12,6 +12,7 @@ #include "hash_index_slot.h" #include "storage/index/hash_index_utils.h" #include "storage/index/in_mem_hash_index.h" +#include "transaction/transaction.h" namespace kuzu { namespace common { @@ -82,9 +83,19 @@ class HashIndex final : public OnDiskHashIndex { using BufferKeyType = typename std::conditional, std::string, T>::type; - // Appends the buffer to the index. Returns the number of values successfully inserted. - // I.e. if a key fails to insert, its index will be the return value + // Appends the buffer to the index. Returns the number of values successfully inserted, + // or the index of the first value which cannot be inserted. size_t append(const IndexBuffer& buffer) { + // Check if values already exist in persistent storage + if (indexHeaderForWriteTrx->numEntries > 0) { + common::offset_t result; + for (size_t i = 0; i < buffer.size(); i++) { + const auto& [key, value] = buffer[i]; + if (lookupInPersistentIndex(transaction::TransactionType::WRITE, key, result)) { + return i; + } + } + } // Keep the same number of primary slots in the builder as we will eventually need when // flushing to disk, so that we know each slot to write to bulkInsertLocalStorage.reserve( diff --git a/src/include/storage/local_storage/local_node_table.h b/src/include/storage/local_storage/local_node_table.h index ed39c51db62..4e972fb187b 100644 --- a/src/include/storage/local_storage/local_node_table.h +++ b/src/include/storage/local_storage/local_node_table.h @@ -4,6 +4,7 @@ #include "common/copy_constructors.h" #include "local_table.h" +#include "storage/store/chunked_node_group.h" namespace kuzu { namespace storage { @@ -22,6 +23,8 @@ class LocalNodeNG final : public LocalNodeGroup { bool insert(std::vector nodeIDVectors, std::vector propertyVectors) override; + bool insert( + common::offset_t startNodeOffset, ChunkedNodeGroup* nodeGroup, common::offset_t numValues); bool update(std::vector nodeIDVectors, common::column_id_t columnID, common::ValueVector* propertyVector) override; bool delete_(common::ValueVector* nodeIDVector, diff --git a/src/include/storage/local_storage/local_table.h b/src/include/storage/local_storage/local_table.h index 1d0aa0f7611..582fa0cc9bf 100644 --- a/src/include/storage/local_storage/local_table.h +++ b/src/include/storage/local_storage/local_table.h @@ -2,7 +2,9 @@ #include +#include "common/types/internal_id_t.h" #include "common/vector/value_vector.h" +#include "storage/store/chunked_node_group.h" #include "storage/store/chunked_node_group_collection.h" namespace kuzu { @@ -63,6 +65,7 @@ class LocalChunkedGroupCollection { inline void append(common::offset_t offset, std::vector vectors) { offsetToRowIdx[offset] = append(vectors); } + void append(common::offset_t offset, ChunkedNodeGroup* nodeGroup, common::offset_t numValues); // Only used for rel tables. Should be moved out later. inline void append(common::offset_t nodeOffset, common::offset_t relOffset, std::vector vectors) { @@ -159,7 +162,7 @@ class LocalNodeGroup { KU_ASSERT(columnID < updateChunks.size()); return updateChunks[columnID]; } - LocalChunkedGroupCollection& getInsesrtChunks() { return insertChunks; } + LocalChunkedGroupCollection& getInsertChunks() { return insertChunks; } bool hasUpdatesOrDeletions() const; diff --git a/src/include/storage/store/chunked_node_group.h b/src/include/storage/store/chunked_node_group.h index 3ce4e8c6fcc..f15d571bb56 100644 --- a/src/include/storage/store/chunked_node_group.h +++ b/src/include/storage/store/chunked_node_group.h @@ -1,6 +1,7 @@ #pragma once #include "common/column_data_format.h" +#include "common/constants.h" #include "common/copy_constructors.h" #include "storage/store/column_chunk.h" @@ -40,9 +41,12 @@ class ChunkedNodeGroup { uint64_t append(const std::vector& columnVectors, common::SelectionVector& selVector, uint64_t numValuesToAppend); - common::offset_t append(ChunkedNodeGroup* other, common::offset_t offsetInOtherNodeGroup); - void write(const std::vector>& data, - common::column_id_t offsetColumnID); + // Appends up to numValuesToAppend from the other chunked node group, returning the actual + // number of values appended + common::offset_t append(ChunkedNodeGroup* other, common::offset_t offsetInOtherNodeGroup, + common::offset_t numValuesToAppend = common::StorageConstants::NODE_GROUP_SIZE); + void write( + const std::vector>& data, common::column_id_t offsetColumnID); void write(const ChunkedNodeGroup& data, common::column_id_t offsetColumnID); void finalize(uint64_t nodeGroupIdx_); diff --git a/src/include/storage/store/node_table.h b/src/include/storage/store/node_table.h index b3a16110634..2f940ca3b15 100644 --- a/src/include/storage/store/node_table.h +++ b/src/include/storage/store/node_table.h @@ -4,14 +4,17 @@ #include "common/assert.h" #include "common/cast.h" +#include "common/types/types.h" #include "storage/index/hash_index.h" #include "storage/stats/nodes_store_statistics.h" #include "storage/store/chunked_node_group.h" #include "storage/store/node_table_data.h" #include "storage/store/table.h" +#include "transaction/transaction.h" namespace kuzu { namespace storage { +class LocalNodeTable; struct NodeTableInsertState : public TableInsertState { common::ValueVector& nodeIDVector; @@ -98,13 +101,28 @@ class NodeTable final : public Table { } inline void append(ChunkedNodeGroup* nodeGroup) { tableData->append(nodeGroup); } + inline void write(common::node_group_idx_t nodeGroupIdx, common::offset_t posInDest, + ChunkedNodeGroup* nodeGroup, common::offset_t numValuesToWrite) { + tableData->write(nodeGroupIdx, posInDest, nodeGroup, numValuesToWrite); + } + void prepareCommitNodeGroup(common::node_group_idx_t nodeGroupIdx, + transaction::Transaction* transaction, storage::LocalNodeNG* localNodeGroup); void prepareCommit(transaction::Transaction* transaction, LocalTable* localTable) override; void prepareCommit() override; void prepareRollback(LocalTable* localTable) override; void checkpointInMemory() override; void rollbackInMemory() override; + inline common::node_group_idx_t getNumNodeGroups(transaction::Transaction* transaction) { + return tableData->getNumNodeGroups(transaction); + } + + inline common::offset_t getNumTuplesInNodeGroup( + transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx) { + return tableData->getNumTuplesInNodeGroup(transaction, nodeGroupIdx); + } + private: void updatePK(transaction::Transaction* transaction, common::column_id_t columnID, const common::ValueVector& nodeIDVector, const common::ValueVector& pkVector); diff --git a/src/include/storage/store/node_table_data.h b/src/include/storage/store/node_table_data.h index 218b5673a10..d11a32080b1 100644 --- a/src/include/storage/store/node_table_data.h +++ b/src/include/storage/store/node_table_data.h @@ -1,11 +1,14 @@ #pragma once +#include "common/types/internal_id_t.h" +#include "common/types/types.h" #include "storage/store/table_data.h" namespace kuzu { namespace storage { class LocalTableData; +class LocalNodeNG; class NodeTableData final : public TableData { public: @@ -31,19 +34,25 @@ class NodeTableData final : public TableData { // Flush the nodeGroup to disk and update metadataDAs. void append(ChunkedNodeGroup* nodeGroup) override; - void prepareLocalTableToCommit(transaction::Transaction* transaction, - LocalTableData* localTable) override; + // Write data from the given node group to the table + void write(common::node_group_idx_t nodeGroupIdx, common::offset_t posInDest, + ChunkedNodeGroup* nodeGroup, common::offset_t numValuesToWrite); + + void prepareLocalNodeGroupToCommit(common::node_group_idx_t nodeGroupIdx, + transaction::Transaction* transaction, LocalNodeNG* localNodeGroup); + void prepareLocalTableToCommit( + transaction::Transaction* transaction, LocalTableData* localTable) override; inline common::node_group_idx_t getNumNodeGroups( transaction::Transaction* transaction) const override { return columns[0]->getNumNodeGroups(transaction); } -private: - void append(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, - LocalNodeGroup* localNodeGroup); - void merge(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, - LocalNodeGroup* nodeGroup); + inline common::offset_t getNumTuplesInNodeGroup( + transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx) const { + KU_ASSERT(nodeGroupIdx < getNumNodeGroups(transaction)); + return columns[0]->getMetadata(nodeGroupIdx, transaction->getType()).numValues; + } }; } // namespace storage diff --git a/src/main/client_context.cpp b/src/main/client_context.cpp index 43c62348ddf..98a4f115e22 100644 --- a/src/main/client_context.cpp +++ b/src/main/client_context.cpp @@ -58,7 +58,6 @@ ClientContext::ClientContext(Database* database) : database{database} { config.varLengthMaxDepth = ClientConfigDefault::VAR_LENGTH_MAX_DEPTH; config.enableProgressBar = ClientConfigDefault::ENABLE_PROGRESS_BAR; config.showProgressAfter = ClientConfigDefault::SHOW_PROGRESS_AFTER; - config.enableMultiCopy = ClientConfigDefault::ENABLE_MULTI_COPY; config.recursivePatternSemantic = ClientConfigDefault::RECURSIVE_PATTERN_SEMANTIC; config.recursivePatternCardinalityScaleFactor = ClientConfigDefault::RECURSIVE_PATTERN_FACTOR; } diff --git a/src/main/db_config.cpp b/src/main/db_config.cpp index c689727f5bf..b43adb17aad 100644 --- a/src/main/db_config.cpp +++ b/src/main/db_config.cpp @@ -16,7 +16,7 @@ static ConfigurationOption options[] = { // NOLINT(cert-err58-cpp): GET_CONFIGURATION(VarLengthExtendMaxDepthSetting), GET_CONFIGURATION(EnableSemiMaskSetting), GET_CONFIGURATION(HomeDirectorySetting), GET_CONFIGURATION(FileSearchPathSetting), GET_CONFIGURATION(ProgressBarSetting), GET_CONFIGURATION(ProgressBarTimerSetting), - GET_CONFIGURATION(EnableMultiCopySetting), GET_CONFIGURATION(RecursivePatternSemanticSetting), + GET_CONFIGURATION(RecursivePatternSemanticSetting), GET_CONFIGURATION(RecursivePatternFactorSetting)}; ConfigurationOption* DBConfig::getOptionByName(const std::string& optionName) { diff --git a/src/processor/operator/persistent/CMakeLists.txt b/src/processor/operator/persistent/CMakeLists.txt index c9bf131dcef..291a3987ceb 100644 --- a/src/processor/operator/persistent/CMakeLists.txt +++ b/src/processor/operator/persistent/CMakeLists.txt @@ -3,7 +3,6 @@ add_subdirectory(writer/parquet) add_library(kuzu_processor_operator_persistent OBJECT - batch_insert.cpp node_batch_insert.cpp copy_rdf.cpp rel_batch_insert.cpp diff --git a/src/processor/operator/persistent/batch_insert.cpp b/src/processor/operator/persistent/batch_insert.cpp deleted file mode 100644 index 1511e42769a..00000000000 --- a/src/processor/operator/persistent/batch_insert.cpp +++ /dev/null @@ -1,18 +0,0 @@ -#include "processor/operator/persistent/batch_insert.h" - -#include "common/exception/copy.h" -#include "common/exception/message.h" - -using namespace kuzu::common; - -namespace kuzu { -namespace processor { - -void BatchInsert::checkIfTableIsEmpty() { - if (sharedState->table->getNumTuples(&transaction::DUMMY_READ_TRANSACTION) != 0) { - throw CopyException(ExceptionMessage::notAllowCopyOnNonEmptyTableException()); - } -} - -} // namespace processor -} // namespace kuzu diff --git a/src/processor/operator/persistent/node_batch_insert.cpp b/src/processor/operator/persistent/node_batch_insert.cpp index 7b0d7bfa865..06e29a94b1e 100644 --- a/src/processor/operator/persistent/node_batch_insert.cpp +++ b/src/processor/operator/persistent/node_batch_insert.cpp @@ -1,9 +1,17 @@ #include "processor/operator/persistent/node_batch_insert.h" +#include "common/cast.h" +#include "common/constants.h" #include "common/string_format.h" +#include "common/types/internal_id_t.h" #include "common/types/types.h" #include "function/table/scan_functions.h" +#include "processor/execution_context.h" +#include "processor/operator/persistent/index_builder.h" #include "processor/result/factorized_table.h" +#include "storage/local_storage/local_node_table.h" +#include "storage/store/chunked_node_group.h" +#include "storage/store/node_table.h" using namespace kuzu::catalog; using namespace kuzu::common; @@ -27,33 +35,44 @@ void NodeBatchInsertSharedState::initPKIndex(kuzu::processor::ExecutionContext* globalIndexBuilder = IndexBuilder(std::make_shared(pkIndex)); } -void NodeBatchInsertSharedState::appendIncompleteNodeGroup( - std::unique_ptr localNodeGroup, std::optional& indexBuilder) { - std::unique_lock xLck{mtx}; - if (!sharedNodeGroup) { - sharedNodeGroup = std::move(localNodeGroup); +void NodeBatchInsert::appendIncompleteNodeGroup(std::unique_ptr localNodeGroup, + std::optional& indexBuilder, ExecutionContext* context) { + std::unique_lock xLck{sharedState->mtx}; + auto nodeSharedState = + ku_dynamic_cast(sharedState.get()); + if (!nodeSharedState->sharedNodeGroup) { + nodeSharedState->sharedNodeGroup = std::move(localNodeGroup); return; } auto numNodesAppended = - sharedNodeGroup->append(localNodeGroup.get(), 0 /* offsetInNodeGroup */); - if (sharedNodeGroup->isFull()) { - auto nodeGroupIdx = getNextNodeGroupIdxWithoutLock(); - auto nodeTable = ku_dynamic_cast(table); - NodeBatchInsert::writeAndResetNodeGroup(nodeGroupIdx, indexBuilder, pkColumnIdx, nodeTable, - sharedNodeGroup.get()); + nodeSharedState->sharedNodeGroup->append(localNodeGroup.get(), 0 /* offsetInNodeGroup */); + if (nodeSharedState->sharedNodeGroup->isFull()) { + node_group_idx_t nodeGroupIdx = nodeSharedState->getNextNodeGroupIdxWithoutLock(); + writeAndResetNodeGroup(nodeGroupIdx, context, nodeSharedState->sharedNodeGroup, + indexBuilder); } if (numNodesAppended < localNodeGroup->getNumRows()) { - sharedNodeGroup->append(localNodeGroup.get(), numNodesAppended); + nodeSharedState->sharedNodeGroup->append(localNodeGroup.get(), numNodesAppended); } } void NodeBatchInsert::initGlobalStateInternal(ExecutionContext* context) { - checkIfTableIsEmpty(); auto nodeSharedState = ku_dynamic_cast(sharedState.get()); if (nodeSharedState->pkType.getLogicalTypeID() != LogicalTypeID::SERIAL) { nodeSharedState->initPKIndex(context); } + // Set initial node group index, which should be the last one available on disk which is not + // full, or the next index. + auto nodeTable = ku_dynamic_cast(nodeSharedState->table); + auto numExistingNodeGroups = nodeTable->getNumNodeGroups(context->clientContext->getTx()); + if (numExistingNodeGroups > 0 && + nodeTable->getNumTuplesInNodeGroup(context->clientContext->getTx(), + numExistingNodeGroups - 1) < StorageConstants::NODE_GROUP_SIZE) { + nodeSharedState->currentNodeGroupIdx = numExistingNodeGroups - 1; + } else { + nodeSharedState->currentNodeGroupIdx = numExistingNodeGroups; + } } void NodeBatchInsert::initLocalStateInternal(ResultSet* resultSet, ExecutionContext*) { @@ -105,15 +124,12 @@ void NodeBatchInsert::executeInternal(ExecutionContext* context) { while (children[0]->getNextTuple(context)) { auto originalSelVector = nodeLocalState->columnState->selVector; - copyToNodeGroup(); + copyToNodeGroup(context); nodeLocalState->columnState->selVector = std::move(originalSelVector); } if (nodeLocalState->nodeGroup->getNumRows() > 0) { - auto nodeSharedState = - ku_dynamic_cast( - sharedState.get()); - nodeSharedState->appendIncompleteNodeGroup(std::move(nodeLocalState->nodeGroup), - nodeLocalState->localIndexBuilder); + appendIncompleteNodeGroup(std::move(nodeLocalState->nodeGroup), + nodeLocalState->localIndexBuilder, context); } if (nodeLocalState->localIndexBuilder) { KU_ASSERT(token); @@ -122,7 +138,7 @@ void NodeBatchInsert::executeInternal(ExecutionContext* context) { } } -void NodeBatchInsert::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx, +void NodeBatchInsert::writeAndResetNewNodeGroup(node_group_idx_t nodeGroupIdx, std::optional& indexBuilder, column_id_t pkColumnID, NodeTable* table, ChunkedNodeGroup* nodeGroup) { nodeGroup->finalize(nodeGroupIdx); @@ -135,48 +151,88 @@ void NodeBatchInsert::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx, nodeGroup->resetToEmpty(); } -void NodeBatchInsertSharedState::calculateNumTuples() { - numRows.store(StorageUtils::getStartOffsetOfNodeGroup(getCurNodeGroupIdx())); - if (sharedNodeGroup) { - numRows += sharedNodeGroup->getNumRows(); +common::offset_t NodeBatchInsert::writeToExistingNodeGroup(transaction::Transaction* transaction, + node_group_idx_t nodeGroupIdx, std::optional& indexBuilder, + column_id_t pkColumnID, NodeTable* table, ChunkedNodeGroup* nodeGroup) { + auto numExistingTuplesInChunk = table->getNumTuplesInNodeGroup(transaction, nodeGroupIdx); + auto valuesToWrite = std::min(StorageConstants::NODE_GROUP_SIZE - numExistingTuplesInChunk, + nodeGroup->getNumRows()); + auto nodeOffset = + StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx) + numExistingTuplesInChunk; + if (indexBuilder) { + indexBuilder->insert(nodeGroup->getColumnChunkUnsafe(pkColumnID), nodeOffset, + valuesToWrite); } + auto nodeInfo = ku_dynamic_cast(info.get()); + LocalNodeNG localNodeGroup(StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx), + nodeInfo->columnTypes); + auto& insertChunks = localNodeGroup.getInsertChunks(); + insertChunks.append(numExistingTuplesInChunk, nodeGroup, valuesToWrite); + table->prepareCommitNodeGroup(nodeGroupIdx, transaction, &localNodeGroup); + return valuesToWrite; +} + +void NodeBatchInsert::clearToIndex(std::unique_ptr& nodeGroup, + common::offset_t startIndexInGroup) { + // Create a new chunked node group and move the unwritten values to it + // TODO(bmwinger): Can probably re-use the chunk and shift the values + auto oldNodeGroup = std::move(nodeGroup); + auto nodeInfo = ku_dynamic_cast(info.get()); + nodeGroup = NodeGroupFactory::createNodeGroup(ColumnDataFormat::REGULAR, nodeInfo->columnTypes, + info->compressionEnabled); + nodeGroup->append(oldNodeGroup.get(), startIndexInGroup); } -void NodeBatchInsert::copyToNodeGroup() { +void NodeBatchInsert::copyToNodeGroup(ExecutionContext* context) { auto numAppendedTuples = 0ul; auto nodeLocalState = ku_dynamic_cast(localState.get()); + auto numTuplesToAppend = nodeLocalState->columnState->getNumSelectedValues(); auto nodeSharedState = ku_dynamic_cast(sharedState.get()); - auto nodeTable = ku_dynamic_cast(sharedState->table); - auto numTuplesToAppend = nodeLocalState->columnState->getNumSelectedValues(); while (numAppendedTuples < numTuplesToAppend) { auto numAppendedTuplesInNodeGroup = nodeLocalState->nodeGroup->append(nodeLocalState->columnVectors, *nodeLocalState->columnState->selVector, numTuplesToAppend - numAppendedTuples); numAppendedTuples += numAppendedTuplesInNodeGroup; if (nodeLocalState->nodeGroup->isFull()) { - node_group_idx_t nodeGroupIdx; - nodeGroupIdx = nodeSharedState->getNextNodeGroupIdx(); - writeAndResetNodeGroup(nodeGroupIdx, nodeLocalState->localIndexBuilder, - nodeSharedState->pkColumnIdx, nodeTable, nodeLocalState->nodeGroup.get()); + writeAndResetNodeGroup(nodeSharedState->getNextNodeGroupIdx(), context, + nodeLocalState->nodeGroup, nodeLocalState->localIndexBuilder); } if (numAppendedTuples < numTuplesToAppend) { nodeLocalState->columnState->slice((offset_t)numAppendedTuplesInNodeGroup); } } + sharedState->incrementNumRows(numAppendedTuples); +} + +void NodeBatchInsert::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx, + ExecutionContext* context, std::unique_ptr& nodeGroup, + std::optional& indexBuilder) { + auto nodeSharedState = + ku_dynamic_cast(sharedState.get()); + auto nodeTable = ku_dynamic_cast(sharedState->table); + + if (nodeGroupIdx >= nodeTable->getNumNodeGroups(context->clientContext->getTx())) { + writeAndResetNewNodeGroup(nodeGroupIdx, indexBuilder, nodeSharedState->pkColumnIdx, + nodeTable, nodeGroup.get()); + } else { + KU_ASSERT(nodeGroupIdx == nodeTable->getNumNodeGroups(context->clientContext->getTx()) - 1); + auto valuesWritten = writeToExistingNodeGroup(context->clientContext->getTx(), nodeGroupIdx, + indexBuilder, nodeSharedState->pkColumnIdx, nodeTable, nodeGroup.get()); + clearToIndex(nodeGroup, valuesWritten); + } } void NodeBatchInsert::finalize(ExecutionContext* context) { auto nodeSharedState = ku_dynamic_cast(sharedState.get()); - nodeSharedState->calculateNumTuples(); nodeSharedState->updateNumTuplesForTable(); if (nodeSharedState->sharedNodeGroup) { - auto nodeGroupIdx = nodeSharedState->getNextNodeGroupIdx(); - auto nodeTable = ku_dynamic_cast(nodeSharedState->table); - NodeBatchInsert::writeAndResetNodeGroup(nodeGroupIdx, nodeSharedState->globalIndexBuilder, - nodeSharedState->pkColumnIdx, nodeTable, nodeSharedState->sharedNodeGroup.get()); + while (nodeSharedState->sharedNodeGroup->getNumRows() > 0) { + writeAndResetNodeGroup(nodeSharedState->getNextNodeGroupIdx(), context, + nodeSharedState->sharedNodeGroup, nodeSharedState->globalIndexBuilder); + } } if (nodeSharedState->globalIndexBuilder) { nodeSharedState->globalIndexBuilder->finalize(context); @@ -184,7 +240,7 @@ void NodeBatchInsert::finalize(ExecutionContext* context) { // Batch Insert wal record needs to be logged after PrimaryKeyIndex::prepareCommit // so that the wal pages get flushed before the index is re-initialized. sharedState->logBatchInsertWALRecord(); - auto outputMsg = stringFormat("{} number of tuples has been copied to table: {}.", + auto outputMsg = stringFormat("{} tuples have been copied to the {} table.", sharedState->getNumRows(), info->tableEntry->getName()); FactorizedTableUtils::appendStringToTable(sharedState->fTable.get(), outputMsg, context->clientContext->getMemoryManager()); diff --git a/src/processor/operator/persistent/rel_batch_insert.cpp b/src/processor/operator/persistent/rel_batch_insert.cpp index 43631d1275d..258f5d00473 100644 --- a/src/processor/operator/persistent/rel_batch_insert.cpp +++ b/src/processor/operator/persistent/rel_batch_insert.cpp @@ -14,10 +14,7 @@ using namespace kuzu::storage; namespace kuzu { namespace processor { -void RelBatchInsert::initGlobalStateInternal(ExecutionContext* context) { - if (!context->clientContext->getClientConfig()->enableMultiCopy) { - checkIfTableIsEmpty(); - } +void RelBatchInsert::initGlobalStateInternal(ExecutionContext* /*context*/) { sharedState->logBatchInsertWALRecord(); } @@ -87,7 +84,7 @@ void RelBatchInsert::mergeNodeGroup(ExecutionContext* context, const RelBatchIns auto localNG = std::make_unique(nodeGroupStartOffset, relInfo.columnTypes); auto& partition = partitionerSharedState.getPartitionBuffer(relInfo.partitioningIdx, localState.nodeGroupIdx); - auto& insertChunks = localNG->getInsesrtChunks(); + auto& insertChunks = localNG->getInsertChunks(); auto startNodeOffset = StorageUtils::getStartOffsetOfNodeGroup(localState.nodeGroupIdx); auto numRels = 0u; for (auto& chunkedGroup : partition.getChunkedGroups()) { @@ -226,7 +223,7 @@ void RelBatchInsert::finalize(ExecutionContext* context) { KU_ASSERT( relInfo->partitioningIdx == partitionerSharedState->partitioningBuffers.size() - 1); sharedState->updateNumTuplesForTable(); - auto outputMsg = stringFormat("{} number of tuples has been copied to table {}.", + auto outputMsg = stringFormat("{} tuples have been copied to the {} table.", sharedState->getNumRows(), info->tableEntry->getName()); FactorizedTableUtils::appendStringToTable(sharedState->fTable.get(), outputMsg, context->clientContext->getMemoryManager()); diff --git a/src/storage/local_storage/local_table.cpp b/src/storage/local_storage/local_table.cpp index 7a63d825dde..ed7ebee8107 100644 --- a/src/storage/local_storage/local_table.cpp +++ b/src/storage/local_storage/local_table.cpp @@ -1,5 +1,7 @@ #include "storage/local_storage/local_table.h" +#include "common/types/internal_id_t.h" + using namespace kuzu::common; namespace kuzu { @@ -103,8 +105,29 @@ row_idx_t LocalChunkedGroupCollection::append(std::vector vectors) return numRows++; } -bool LocalTableData::insert(std::vector nodeIDVectors, - std::vector propertyVectors) { +void LocalChunkedGroupCollection::append( + offset_t offset, ChunkedNodeGroup* nodeGroup, offset_t numValues) { + KU_ASSERT(nodeGroup->getNumColumns() == dataTypes.size()); + offset_t appended = 0; + do { + if (chunkedGroups.getNumChunkedGroups() == 0 || + chunkedGroups.getChunkedGroup(chunkedGroups.getNumChunkedGroups() - 1)->getNumRows() == + ChunkedNodeGroupCollection::CHUNK_CAPACITY) { + chunkedGroups.merge(std::make_unique(dataTypes, + false /*enableCompression*/, ChunkedNodeGroupCollection::CHUNK_CAPACITY)); + } + auto lastChunkGroup = + chunkedGroups.getChunkedGroupUnsafe(chunkedGroups.getNumChunkedGroups() - 1); + auto appendedInChunk = lastChunkGroup->append(nodeGroup, appended, numValues - appended); + for (size_t i = 0; i < appendedInChunk; i++) { + offsetToRowIdx[offset + appended + i] = numRows++; + } + appended += appendedInChunk; + } while (appended < numValues); +} + +bool LocalTableData::insert( + std::vector nodeIDVectors, std::vector propertyVectors) { KU_ASSERT(nodeIDVectors.size() >= 1); auto localNodeGroup = getOrCreateLocalNodeGroup(nodeIDVectors[0]); return localNodeGroup->insert(nodeIDVectors, propertyVectors); diff --git a/src/storage/storage_structure/db_file_utils.cpp b/src/storage/storage_structure/db_file_utils.cpp index 372308bb02d..b5ce3ea4dd4 100644 --- a/src/storage/storage_structure/db_file_utils.cpp +++ b/src/storage/storage_structure/db_file_utils.cpp @@ -35,8 +35,10 @@ WALPageIdxAndFrame DBFileUtils::createWALVersionIfNecessaryAndPinPage(page_idx_t } fileHandle.setWALPageIdxNoLock(originalPageIdx /* pageIdxInOriginalFile */, pageIdxInWAL); - wal.getShadowingFH().setLockedPageDirty(pageIdxInWAL); } + // The wal page existing already does not mean that it's already dirty + // It may have been flushed to disk to free memory and then read again + wal.getShadowingFH().setLockedPageDirty(pageIdxInWAL); } catch (Exception& e) { fileHandle.releaseWALPageIdxLock(originalPageIdx); throw; diff --git a/src/storage/storage_structure/disk_array.cpp b/src/storage/storage_structure/disk_array.cpp index 4f009f64507..1e580476c02 100644 --- a/src/storage/storage_structure/disk_array.cpp +++ b/src/storage/storage_structure/disk_array.cpp @@ -158,35 +158,21 @@ void BaseDiskArrayInternal::update(uint64_t idx, std::span val) { } uint64_t BaseDiskArrayInternal::pushBack(std::span val) { - std::unique_lock xLck{diskArraySharedMtx}; - hasTransactionalUpdates = true; - return pushBackNoLock(val); + auto it = iter_mut(val.size()); + auto originalNumElements = getNumElementsNoLock(TransactionType::WRITE); + it.pushBack(val); + return originalNumElements; } uint64_t BaseDiskArrayInternal::resize(uint64_t newNumElements, std::span defaultVal) { auto it = iter_mut(defaultVal.size()); auto originalNumElements = getNumElementsNoLock(TransactionType::WRITE); - hasTransactionalUpdates = true; while (it.size() < newNumElements) { it.pushBack(defaultVal); } return originalNumElements; } -uint64_t BaseDiskArrayInternal::pushBackNoLock(std::span val) { - uint64_t elementIdx = headerForWriteTrx.numElements; - auto apCursor = getAPIdxAndOffsetInAP(elementIdx); - auto [apPageIdx, isNewlyAdded] = - getAPPageIdxAndAddAPToPIPIfNecessaryForWriteTrxNoLock(&headerForWriteTrx, apCursor.pageIdx); - // Now do the push back. - lastAPPageIdx = apPageIdx; - updatePage(apPageIdx, isNewlyAdded, [&apCursor, &val](uint8_t* frame) -> void { - memcpy(frame + apCursor.elemPosInPage, val.data(), val.size()); - }); - headerForWriteTrx.numElements++; - return elementIdx; -} - void BaseDiskArrayInternal::setNextPIPPageIDxOfPIPNoLock(DiskArrayHeader* updatedDiskArrayHeader, uint64_t pipIdxOfPreviousPIP, page_idx_t nextPIPPageIdx) { // This happens if the first pip is being inserted, in which case we need to change the header. diff --git a/src/storage/store/chunked_node_group.cpp b/src/storage/store/chunked_node_group.cpp index f3291c90fb5..b0b6e87c90b 100644 --- a/src/storage/store/chunked_node_group.cpp +++ b/src/storage/store/chunked_node_group.cpp @@ -2,6 +2,7 @@ #include "common/assert.h" #include "common/constants.h" +#include "common/types/internal_id_t.h" #include "storage/store/column.h" using namespace kuzu::common; @@ -116,10 +117,11 @@ uint64_t ChunkedNodeGroup::append(const std::vector& columnVectors return numValuesToAppendInChunk; } -offset_t ChunkedNodeGroup::append(ChunkedNodeGroup* other, offset_t offsetInOtherNodeGroup) { +offset_t ChunkedNodeGroup::append( + ChunkedNodeGroup* other, offset_t offsetInOtherNodeGroup, offset_t numValues) { KU_ASSERT(other->chunks.size() == chunks.size()); - auto numNodesToAppend = std::min(other->numRows - offsetInOtherNodeGroup, - StorageConstants::NODE_GROUP_SIZE - numRows); + auto numNodesToAppend = std::min(std::min(numValues, other->numRows - offsetInOtherNodeGroup), + chunks[0]->getCapacity() - numRows); for (auto i = 0u; i < chunks.size(); i++) { chunks[i]->append(other->chunks[i].get(), offsetInOtherNodeGroup, numNodesToAppend); } diff --git a/src/storage/store/column_chunk.cpp b/src/storage/store/column_chunk.cpp index f309df9b8ce..f0c4a5daef1 100644 --- a/src/storage/store/column_chunk.cpp +++ b/src/storage/store/column_chunk.cpp @@ -363,6 +363,7 @@ void ColumnChunk::copyVectorToBuffer(ValueVector* vector, offset_t startPosInChu } void ColumnChunk::setNumValues(uint64_t numValues_) { + KU_ASSERT(numValues_ <= capacity); numValues = numValues_; if (nullChunk) { nullChunk->setNumValues(numValues_); diff --git a/src/storage/store/node_table.cpp b/src/storage/store/node_table.cpp index 4a85880909a..156f51b6bbe 100644 --- a/src/storage/store/node_table.cpp +++ b/src/storage/store/node_table.cpp @@ -154,6 +154,11 @@ void NodeTable::addColumn(Transaction* transaction, const Property& property, wal->addToUpdatedTables(tableID); } +void NodeTable::prepareCommitNodeGroup(node_group_idx_t nodeGroupIdx, + transaction::Transaction* transaction, LocalNodeNG* localNodeGroup) { + tableData->prepareLocalNodeGroupToCommit(nodeGroupIdx, transaction, localNodeGroup); +} + void NodeTable::prepareCommit(Transaction* transaction, LocalTable* localTable) { if (pkIndex) { pkIndex->prepareCommit(); diff --git a/src/storage/store/node_table_data.cpp b/src/storage/store/node_table_data.cpp index 4dbaed168cd..0701ef6cf21 100644 --- a/src/storage/store/node_table_data.cpp +++ b/src/storage/store/node_table_data.cpp @@ -1,9 +1,11 @@ #include "storage/store/node_table_data.h" #include "common/cast.h" +#include "common/types/types.h" #include "storage/local_storage/local_node_table.h" #include "storage/local_storage/local_table.h" #include "storage/stats/nodes_store_statistics.h" +#include "transaction/transaction.h" using namespace kuzu::catalog; using namespace kuzu::common; @@ -68,21 +70,35 @@ void NodeTableData::append(ChunkedNodeGroup* nodeGroup) { } } -void NodeTableData::prepareLocalTableToCommit(Transaction* transaction, - LocalTableData* localTable) { - for (auto& [nodeGroupIdx, localNodeGroup] : localTable->nodeGroups) { - for (auto columnID = 0u; columnID < columns.size(); columnID++) { - auto column = columns[columnID].get(); - auto localInsertChunk = localNodeGroup->getInsesrtChunks().getLocalChunk(columnID); - auto localUpdateChunk = localNodeGroup->getUpdateChunks(columnID).getLocalChunk(0); - if (localInsertChunk.empty() && localUpdateChunk.empty()) { - continue; - } - auto localNodeNG = ku_dynamic_cast(localNodeGroup.get()); - column->prepareCommitForChunk(transaction, nodeGroupIdx, localInsertChunk, - localNodeNG->getInsertInfoRef(), localUpdateChunk, - localNodeNG->getUpdateInfoRef(columnID), {} /* deleteInfo */); +void NodeTableData::write(common::node_group_idx_t nodeGroupIdx, common::offset_t posInDest, + ChunkedNodeGroup* nodeGroup, common::offset_t numValuesToWrite) { + for (auto columnID = 0u; columnID < columns.size(); columnID++) { + auto& columnChunk = nodeGroup->getColumnChunkUnsafe(columnID); + KU_ASSERT(columnID < columns.size()); + columns[columnID]->write(nodeGroupIdx, posInDest, &columnChunk, 0, numValuesToWrite); + } +} + +void NodeTableData::prepareLocalNodeGroupToCommit( + node_group_idx_t nodeGroupIdx, Transaction* transaction, LocalNodeNG* localNodeGroup) { + for (auto columnID = 0u; columnID < columns.size(); columnID++) { + auto column = columns[columnID].get(); + auto localInsertChunk = localNodeGroup->getInsertChunks().getLocalChunk(columnID); + auto localUpdateChunk = localNodeGroup->getUpdateChunks(columnID).getLocalChunk(0); + if (localInsertChunk.empty() && localUpdateChunk.empty()) { + continue; } + column->prepareCommitForChunk(transaction, nodeGroupIdx, localInsertChunk, + localNodeGroup->getInsertInfoRef(), localUpdateChunk, + localNodeGroup->getUpdateInfoRef(columnID), {} /* deleteInfo */); + } +} + +void NodeTableData::prepareLocalTableToCommit( + Transaction* transaction, LocalTableData* localTable) { + for (auto& [nodeGroupIdx, localNodeGroup] : localTable->nodeGroups) { + prepareLocalNodeGroupToCommit(nodeGroupIdx, transaction, + ku_dynamic_cast(localNodeGroup.get())); } } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9988e06d509..93e02b75443 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -18,3 +18,4 @@ add_subdirectory(runner) add_subdirectory(storage) add_subdirectory(transaction) add_subdirectory(util_tests) +add_subdirectory(copy) diff --git a/test/copy/CMakeLists.txt b/test/copy/CMakeLists.txt new file mode 100644 index 00000000000..87ba8aaed8b --- /dev/null +++ b/test/copy/CMakeLists.txt @@ -0,0 +1 @@ +add_kuzu_test(copy_tests multi_copy_test.cpp) diff --git a/test/copy/multi_copy_test.cpp b/test/copy/multi_copy_test.cpp new file mode 100644 index 00000000000..9bc9f507459 --- /dev/null +++ b/test/copy/multi_copy_test.cpp @@ -0,0 +1,111 @@ +#include +#include +#include +#include +#include + +#include "common/constants.h" +#include "common/file_system/local_file_system.h" +#include "common/string_format.h" +#include "graph_test/graph_test.h" + +namespace kuzu { +namespace testing { + +class MultiCopyTest : public EmptyDBTest { +public: + void SetUp() override { + EmptyDBTest::SetUp(); + createDBAndConn(); + auto result = conn->query("CREATE NODE TABLE Test(id int32, primary key(id))"); + ASSERT_TRUE(result->isSuccess()) << result->toString(); + } + void copy(size_t values) { + auto tempDir = TestHelper::getTempDir(getTestGroupAndName()); + auto filePath = common::LocalFileSystem::joinPath(tempDir, "tmp.csv"); + std::filesystem::create_directories(tempDir); + std::ofstream file(filePath); + for (size_t i = 0; i < values; i++) { + file << (totalTuples + i) << std::endl; + } + file.close(); + auto result = conn->query(common::stringFormat("COPY Test FROM '{}'", filePath)); + ASSERT_TRUE(result->isSuccess()) << result->toString(); + totalTuples += values; + } + + void validate() { + auto countResult = conn->query("MATCH (t:Test) RETURN COUNT(*)"); + ASSERT_TRUE(countResult->isSuccess()) << countResult->toString(); + ASSERT_EQ(countResult->getNumTuples(), 1); + ASSERT_EQ(countResult->getNext()->getValue(0)->getValue(), totalTuples); + + std::random_device dev; + std::mt19937 rng(dev()); + std::uniform_int_distribution dist(0, totalTuples - 1); + // Sample and check up to 1000 random elements in the range + for (size_t i = 0; i < std::min(static_cast(1000), totalTuples); i++) { + auto index = dist(rng); + auto result = conn->query( + common::stringFormat("MATCH (t:Test) WHERE t.id = {} RETURN t.id", index)); + ASSERT_TRUE(result->isSuccess()) << result->toString(); + ASSERT_EQ(result->getNumTuples(), 1) << "ID " << index << " is missing"; + ASSERT_EQ(result->getNext()->getValue(0)->getValue(), index); + } + } + +private: + size_t totalTuples = 0; +}; + +// Tests that multiple copies that only add to the existing node group succeed +// This is also covered by the tinysnb dataset +TEST_F(MultiCopyTest, OneNodeGroup) { + copy(150); + copy(175); + copy(1); + copy(5); + validate(); +} + +// Tests that a second copy that does not modify any existing node groups succeeds +TEST_F(MultiCopyTest, FullFirstNodeGroup) { + copy(common::StorageConstants::NODE_GROUP_SIZE); + copy(common::StorageConstants::NODE_GROUP_SIZE); + copy(common::StorageConstants::NODE_GROUP_SIZE); + copy(common::StorageConstants::NODE_GROUP_SIZE); + copy(common::StorageConstants::NODE_GROUP_SIZE * 1.5); + validate(); +} + +// Tries to test that appendIncompleteNodeGroup works when it fills the shared node group but the +// shared node group needs to write to an existing node group The actual results may vary depending +// on how the work is divided between threads. This assumes that two test threads handle half of the +// work each, so that each thread finishes with appendIncompleteNodeGroup and the second one fills +// the shared group. +TEST_F(MultiCopyTest, PartialGroupSecondCopyWrite) { + copy(common::StorageConstants::NODE_GROUP_SIZE / 2); + copy(common::StorageConstants::NODE_GROUP_SIZE); + validate(); +} + +// Tests that a second copy that modifies the existing node group and copies more than one node +// group succeeds +TEST_F(MultiCopyTest, PartialFirstNodeGroup) { + copy(common::StorageConstants::NODE_GROUP_SIZE / 2); + copy(common::StorageConstants::NODE_GROUP_SIZE * 2); + copy(common::StorageConstants::NODE_GROUP_SIZE * 2); + validate(); +} + +// Tests that a second copy that copies a large number of node groups succeeds +TEST_F(MultiCopyTest, MultipleNodeGroups) { + copy(common::StorageConstants::NODE_GROUP_SIZE * 10.1); + copy(common::StorageConstants::NODE_GROUP_SIZE * 20.8); + copy(1); + copy(common::StorageConstants::NODE_GROUP_SIZE * 3); + validate(); +} + +} // namespace testing +} // namespace kuzu diff --git a/test/test_files/copy/copy_transaction.test b/test/test_files/copy/copy_transaction.test index 2b6eb3a8b51..b0f73406155 100644 --- a/test/test_files/copy/copy_transaction.test +++ b/test/test_files/copy/copy_transaction.test @@ -4,7 +4,7 @@ -- -CASE CopyNodeCommitNormalExecution --STATEMENT CREATE NODE TABLE person +-STATEMENT CREATE NODE TABLE person (ID INT64, fName STRING, gender INT64, isStudent BOOLEAN, isWorker BOOLEAN, age INT64, eyeSight DOUBLE, birthdate DATE, registerTime TIMESTAMP, lastJobDuration INTERVAL, workedHours INT64[], @@ -15,6 +15,8 @@ ---- 0 -STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson.csv" (HEADER=true); ---- ok +-STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson2.csv" (HEADER=false); +---- ok -STATEMENT MATCH (p:person) return p.age ---- 8 20 @@ -27,7 +29,7 @@ 83 -CASE CopyRelCommitNormalExecution --STATEMENT CREATE NODE TABLE person +-STATEMENT CREATE NODE TABLE person (ID INT64, fName STRING, gender INT64, isStudent BOOLEAN, isWorker BOOLEAN, age INT64, eyeSight DOUBLE, birthdate DATE, registerTime TIMESTAMP, lastJobDuration INTERVAL, workedHours INT64[], @@ -36,15 +38,15 @@ ---- ok -STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson.csv" (HEADER=true); ---- ok --STATEMENT CREATE REL TABLE knows +-STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson2.csv" (HEADER=false); +---- ok +-STATEMENT CREATE REL TABLE knows (FROM person TO person, date DATE, meetTime TIMESTAMP, validInterval INTERVAL, comments STRING[], summary STRUCT(locations STRING[], transfer STRUCT(day DATE, amount INT64[])), notes UNION(firstmet DATE, type INT16, comment STRING), someMap MAP(STRING, STRING), MANY_MANY); ---- ok --STATEMENT CALL ENABLE_MULTI_COPY=true; ----- ok -STATEMENT COPY knows FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/eKnows.csv" ---- ok -STATEMENT COPY knows FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/eKnows_2.csv" diff --git a/test/test_files/copy/multi_copy_node.test b/test/test_files/copy/multi_copy_node.test new file mode 100644 index 00000000000..87c198f6a40 --- /dev/null +++ b/test/test_files/copy/multi_copy_node.test @@ -0,0 +1,20 @@ +-GROUP MultiCopyNode +-DATASET CSV empty +-- + +-CASE CopyLargeInt +-STATEMENT create node table test(id int64, primary key(id)); +---- ok +-STATEMENT COPY test from "${KUZU_ROOT_DIRECTORY}/dataset/large-serial/serialtable0.csv"; +---- ok +-STATEMENT COPY test from "${KUZU_ROOT_DIRECTORY}/dataset/large-serial/serialtable1.csv"; +---- ok +-STATEMENT COPY test from "${KUZU_ROOT_DIRECTORY}/dataset/large-serial/serialtable2.csv"; +---- ok +-STATEMENT COPY test from "${KUZU_ROOT_DIRECTORY}/dataset/large-serial/serialtable3.csv"; +---- ok +-STATEMENT COPY test from "${KUZU_ROOT_DIRECTORY}/dataset/large-serial/serialtable4.csv"; +---- ok +-STATEMENT match (t:test) return count(*); +---- 1 +200000 diff --git a/test/test_files/demo_db/demo_db_set_copy.test b/test/test_files/demo_db/demo_db_set_copy.test index 8320ea6bc3a..970d19ddc91 100644 --- a/test/test_files/demo_db/demo_db_set_copy.test +++ b/test/test_files/demo_db/demo_db_set_copy.test @@ -59,9 +59,3 @@ Guelph|0 ---- 2 1999 1999 - --CASE CopyRelToNonEmptyTableErrorTest - --STATEMENT COPY Follows FROM "${KUZU_ROOT_DIRECTORY}/dataset/demo-db/csv/follows.csv" ----- error -Copy exception: COPY commands can only be executed once on a table. diff --git a/test/test_files/exceptions/copy/auto_commit.test b/test/test_files/exceptions/copy/auto_commit.test index 0a967a772fa..05567d1e59f 100644 --- a/test/test_files/exceptions/copy/auto_commit.test +++ b/test/test_files/exceptions/copy/auto_commit.test @@ -17,9 +17,12 @@ DDL, Copy, createMacro statements can only run in the AUTO_COMMIT mode. Please c ---- ok -STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson.csv" (HEADER=TRUE, DELIM=',') ---- 1 -8 number of tuples has been copied to table: person. +4 tuples have been copied to the person table. +-STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson2.csv" (HEADER=FALSE, DELIM=',') +---- 1 +4 tuples have been copied to the person table. -STATEMENT create rel table knows (FROM person TO person, date DATE, meetTime TIMESTAMP, validInterval INTERVAL, comments STRING[], summary STRUCT(locations STRING[], transfer STRUCT(day DATE, amount INT64[])), notes UNION(firstmet DATE, type INT16, comment STRING), someMap MAP(STRING, STRING), MANY_MANY) ---- ok -STATEMENT COPY knows FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/eKnows.csv" ---- 1 -6 number of tuples has been copied to table knows. +6 tuples have been copied to the knows table. diff --git a/test/test_files/exceptions/copy/wrong_header.test b/test/test_files/exceptions/copy/wrong_header.test index 0e79a2707d4..b9b0351b76c 100644 --- a/test/test_files/exceptions/copy/wrong_header.test +++ b/test/test_files/exceptions/copy/wrong_header.test @@ -97,15 +97,6 @@ Copy exception: Error in file ${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson.csv -STATEMENT COPY knows FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/wrong-header/eKnowsWrongColumnName.csv" (HEADER=true) ---- ok --CASE CopyToNonEmptyTableErrors --STATEMENT create node table person (ID INT64, fName STRING, PRIMARY KEY (ID)); ----- ok --STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/wrong-header/vPersonWrongColumnName.csv" (HEADER=true) ----- ok --STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/wrong-header/vPersonWrongColumnName.csv" (HEADER=true) ----- error -Copy exception: COPY commands can only be executed once on a table. - -CASE MissingColumnErrors -STATEMENT create node table person (ID INT64, fName STRING, PRIMARY KEY (ID)) ---- ok diff --git a/test/test_files/tinysnb/load_from/load_from.test b/test/test_files/tinysnb/load_from/load_from.test index b5e8a7cb7ea..6123071f7ff 100644 --- a/test/test_files/tinysnb/load_from/load_from.test +++ b/test/test_files/tinysnb/load_from/load_from.test @@ -76,11 +76,14 @@ Binder exception: Column `dateColumn` type mismatch. Expected INT32 but got DATE Binder exception: Number of columns mismatch. Expected 1 but got 11. -STATEMENT LOAD WITH HEADERS (ID INt64, fName StRING, gender INT64, isStudent BoOLEAN, isWorker BOOLEAN, age INT64, eyeSight DOUBLE, birthdate DATE, registerTime TIMESTAMP, lastJobDuration interval, workedHours INT64[], usedNames STRING[], courseScoresPerTerm INT64[][], grades INT64[4], height float, u UUID) FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson.csv" (HEADER=True) RETURN fName, gender, birthdate; ----- 8 +---- 4 Alice|1|1900-01-01 Bob|2|1900-01-01 Carol|1|1940-06-22 Dan|2|1950-07-23 +-STATEMENT LOAD WITH HEADERS (ID INt64, fName StRING, gender INT64, isStudent BoOLEAN, isWorker BOOLEAN, age INT64, eyeSight DOUBLE, birthdate DATE, registerTime TIMESTAMP, lastJobDuration interval, workedHours INT64[], usedNames STRING[], courseScoresPerTerm INT64[][], grades INT64[4], height float, u UUID) FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson2.csv" (HEADER=FALSE) + RETURN fName, gender, birthdate; +---- 4 Elizabeth|1|1980-10-26 Farooq|2|1980-10-26 Greg|2|1980-10-26 @@ -90,7 +93,7 @@ Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|2|1990-11-27 RETURN *, concat(fName, 'aa'); ---- error Copy exception: Error in file ${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson.csv on line 2: Conversion exception: Cast failed. Could not convert "[Aida]" to INT64. --STATEMENT LOAD WITH HEADERS (ID StRING, fName StRING, gender StRING, isStudent StRING, isWorker StRING, age StRING, eyeSight StRING, birthdate StRING, registerTime StRING, lastJobDuration StRING, workedHours StRING, usedNames StRING, courseScoresPerTerm StRING, grades StRING, height StRING, u UUID) FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson.csv" (HEADER=True) +-STATEMENT LOAD WITH HEADERS (ID StRING, fName StRING, gender StRING, isStudent StRING, isWorker StRING, age StRING, eyeSight StRING, birthdate StRING, registerTime StRING, lastJobDuration StRING, workedHours StRING, usedNames StRING, courseScoresPerTerm StRING, grades StRING, height StRING, u UUID) FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson2.csv" (HEADER=FALSE) WHERE ID = '10' RETURN *, concat(fName, 'aa'); ---- 1 @@ -102,12 +105,12 @@ Copy exception: Error in file ${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson.csv MATCH (a:person) WHERE a.ID <= 2 RETURN COUNT(*); ---- 1 -16 +8 -STATEMENT MATCH (a:person) WHERE a.ID <= 2 LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson.csv" (HEADER=True) WITH * RETURN COUNT(*); ---- 1 -16 +8 -STATEMENT LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/eWorkAt.csv" (HEADER=False) RETURN * ---- 3 3|4|2015|[3.8,2.5]|8.2 @@ -118,10 +121,9 @@ Copy exception: Error in file ${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson.csv LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson.csv" (HEADER=True) WITH * WHERE id = a RETURN a, b, fname; ----- 3 +---- 2 3|4|Carol 5|6|Dan -7|6|Elizabeth -STATEMENT MATCH (a:person) WHERE a.ID < 2 LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/vPerson.csv" (HEADER=True) WHERE fname = "Alice" or fname = "Bob" AND a.fName = fname diff --git a/test/test_files/transaction/copy/copy.test b/test/test_files/transaction/copy/copy.test index 01e99f54c0d..7bb90820fd8 100644 --- a/test/test_files/transaction/copy/copy.test +++ b/test/test_files/transaction/copy/copy.test @@ -11,17 +11,13 @@ ---- ok -STATEMENT MATCH (p:person) return count(p); ---- 1 -8 +4 -STATEMENT MATCH (p:person) return p.age; ----- 8 +---- 4 20 -20 -25 30 35 -40 45 -83 -CASE CopyNodeCommitRecovery -STATEMENT CREATE NODE TABLE person (ID INT64, fName STRING, gender INT64, isStudent BOOLEAN, isWorker BOOLEAN, age INT64, eyeSight DOUBLE, birthdate DATE, registerTime TIMESTAMP, lastJobDuration INTERVAL, workedHours INT64[], usedNames STRING[], courseScoresPerTerm INT64[][], grades INT64[4], height float, u UUID, PRIMARY KEY (ID)); @@ -33,17 +29,13 @@ -RELOADDB -STATEMENT MATCH (p:person) return count(p); ---- 1 -8 +4 -STATEMENT MATCH (p:person) return p.age; ----- 8 -20 +---- 4 20 -25 30 35 -40 45 -83 -CASE CopyRelCommit -STATEMENT CREATE NODE TABLE person (ID INT64, fName STRING, gender INT64, isStudent BOOLEAN, isWorker BOOLEAN, age INT64, eyeSight DOUBLE, birthdate DATE, registerTime TIMESTAMP, lastJobDuration INTERVAL, workedHours INT64[], usedNames STRING[], courseScoresPerTerm INT64[][], grades INT64[4], height float, u UUID, PRIMARY KEY (ID));