From ca5d82ad8d24e5fdab562e822c8b1b606aec28e5 Mon Sep 17 00:00:00 2001 From: Guodong Jin Date: Tue, 27 Feb 2024 15:17:03 +0800 Subject: [PATCH] refactor: unify CopyNode and CopyRel operator --- .../catalog_entry/table_catalog_entry.h | 1 + src/include/common/enums/table_type.h | 1 + src/include/processor/operator/index_lookup.h | 10 +- src/include/processor/operator/partitioner.h | 13 +- .../operator/persistent/batch_insert.h | 86 ++++++++ .../processor/operator/persistent/copy_node.h | 139 ------------- .../processor/operator/persistent/copy_rel.h | 121 ----------- .../operator/persistent/node_batch_insert.h | 117 +++++++++++ .../operator/persistent/rel_batch_insert.h | 87 ++++++++ .../processor/operator/physical_operator.h | 3 +- src/include/processor/plan_mapper.h | 7 +- src/include/storage/store/table.h | 6 + src/include/storage/wal/wal.h | 2 +- src/include/storage/wal/wal_record.h | 10 +- src/main/database.cpp | 1 - src/processor/map/map_copy_from.cpp | 90 ++++---- src/processor/operator/index_lookup.cpp | 21 +- src/processor/operator/partitioner.cpp | 2 + .../operator/persistent/CMakeLists.txt | 5 +- .../operator/persistent/batch_insert.cpp | 18 ++ .../operator/persistent/copy_node.cpp | 186 ----------------- .../operator/persistent/node_batch_insert.cpp | 195 ++++++++++++++++++ .../{copy_rel.cpp => rel_batch_insert.cpp} | 108 +++++----- src/processor/operator/physical_operator.cpp | 6 +- src/storage/store/node_group.cpp | 2 +- src/storage/wal/wal.cpp | 4 +- src/storage/wal/wal_record.cpp | 4 +- src/storage/wal_replayer.cpp | 7 +- 28 files changed, 656 insertions(+), 596 deletions(-) create mode 100644 src/include/processor/operator/persistent/batch_insert.h delete mode 100644 src/include/processor/operator/persistent/copy_node.h delete mode 100644 src/include/processor/operator/persistent/copy_rel.h create mode 100644 src/include/processor/operator/persistent/node_batch_insert.h create mode 100644 src/include/processor/operator/persistent/rel_batch_insert.h create mode 100644 src/processor/operator/persistent/batch_insert.cpp delete mode 100644 src/processor/operator/persistent/copy_node.cpp create mode 100644 src/processor/operator/persistent/node_batch_insert.cpp rename src/processor/operator/persistent/{copy_rel.cpp => rel_batch_insert.cpp} (59%) diff --git a/src/include/catalog/catalog_entry/table_catalog_entry.h b/src/include/catalog/catalog_entry/table_catalog_entry.h index eaf505bd36..e3c0ba6d28 100644 --- a/src/include/catalog/catalog_entry/table_catalog_entry.h +++ b/src/include/catalog/catalog_entry/table_catalog_entry.h @@ -28,6 +28,7 @@ class TableCatalogEntry : public CatalogEntry { std::string getComment() const { return comment; } void setComment(std::string newComment) { comment = std::move(newComment); } virtual bool isParent(common::table_id_t tableID) = 0; + // TODO(Guodong/Ziyi): This function should be removed. Instead we should use CatalogEntryType. virtual common::TableType getTableType() const = 0; //===--------------------------------------------------------------------===// diff --git a/src/include/common/enums/table_type.h b/src/include/common/enums/table_type.h index 6a52941746..eab71d76fd 100644 --- a/src/include/common/enums/table_type.h +++ b/src/include/common/enums/table_type.h @@ -6,6 +6,7 @@ namespace kuzu { namespace common { +// TODO(Guodong/Ziyi/Xiyang): Should we remove this and instead use `CatalogEntryType`? enum class TableType : uint8_t { UNKNOWN = 0, NODE = 1, diff --git a/src/include/processor/operator/index_lookup.h b/src/include/processor/operator/index_lookup.h index b8fd63fae6..a2795e3af2 100644 --- a/src/include/processor/operator/index_lookup.h +++ b/src/include/processor/operator/index_lookup.h @@ -1,6 +1,5 @@ #pragma once -#include "processor/operator/persistent/copy_node.h" #include "processor/operator/physical_operator.h" namespace kuzu { @@ -9,23 +8,24 @@ class PrimaryKeyIndex; } // namespace storage namespace processor { +struct BatchInsertSharedState; struct IndexLookupInfo { std::unique_ptr pkDataType; storage::PrimaryKeyIndex* index; // NULL if the PK data type is SERIAL. // In copy rdf, we need to perform lookup before primary key is persist on disk. So we need to // use index builder. - std::shared_ptr copyNodeSharedState; + std::shared_ptr batchInsertSharedState; DataPos keyVectorPos; DataPos resultVectorPos; IndexLookupInfo(std::unique_ptr pkDataType, storage::PrimaryKeyIndex* index, const DataPos& keyVectorPos, const DataPos& resultVectorPos) - : pkDataType{std::move(pkDataType)}, index{index}, copyNodeSharedState{nullptr}, + : pkDataType{std::move(pkDataType)}, index{index}, batchInsertSharedState{nullptr}, keyVectorPos{keyVectorPos}, resultVectorPos{resultVectorPos} {} IndexLookupInfo(const IndexLookupInfo& other) : pkDataType{other.pkDataType->copy()}, index{other.index}, - copyNodeSharedState{other.copyNodeSharedState}, keyVectorPos{other.keyVectorPos}, + batchInsertSharedState{other.batchInsertSharedState}, keyVectorPos{other.keyVectorPos}, resultVectorPos{other.resultVectorPos} {} inline std::unique_ptr copy() { @@ -40,7 +40,7 @@ class IndexLookup : public PhysicalOperator { : PhysicalOperator{PhysicalOperatorType::INDEX_LOOKUP, std::move(child), id, paramsString}, infos{std::move(infos)} {} - void setCopyNodeSharedState(std::shared_ptr sharedState); + void setBatchInsertSharedState(std::shared_ptr sharedState); bool getNextTuplesInternal(ExecutionContext* context) final; diff --git a/src/include/processor/operator/partitioner.h b/src/include/processor/operator/partitioner.h index 36edda2284..65800727cc 100644 --- a/src/include/processor/operator/partitioner.h +++ b/src/include/processor/operator/partitioner.h @@ -1,10 +1,12 @@ #pragma once #include "common/data_chunk/data_chunk_collection.h" -#include "processor/operator/persistent/copy_node.h" #include "processor/operator/sink.h" namespace kuzu { +namespace storage { +class NodeTable; +} namespace processor { using partitioner_func_t = @@ -23,8 +25,9 @@ struct PartitioningBuffer { void merge(std::unique_ptr localPartitioningStates); }; -// NOTE: Currently, Partitioner is tightly coupled with CopyRel. We should generalize it later when -// necessary. Here, each partition is essentially a node group. +// NOTE: Currently, Partitioner is tightly coupled with RelBatchInsert. We should generalize it +// later when necessary. Here, each partition is essentially a node group. +struct BatchInsertSharedState; struct PartitionerSharedState { std::mutex mtx; storage::MemoryManager* mm; @@ -37,7 +40,7 @@ struct PartitionerSharedState { std::vector> partitioningBuffers; common::partition_idx_t nextPartitionIdx = 0; // In copy rdf, we need to access num nodes before it is available in statistics. - std::vector> copyNodeSharedStates; + std::vector> nodeBatchInsertSharedStates; explicit PartitionerSharedState(storage::MemoryManager* mm) : mm{mm} {} @@ -102,7 +105,7 @@ class Partitioner : public Sink { std::vector numPartitions, storage::MemoryManager* mm); private: - // TODO: For now, CopyRel will guarantee all data are inside one data chunk. Should be + // TODO: For now, RelBatchInsert will guarantee all data are inside one data chunk. Should be // generalized to resultSet later if needed. void copyDataToPartitions( common::partition_idx_t partitioningIdx, common::DataChunk* chunkToCopyFrom); diff --git a/src/include/processor/operator/persistent/batch_insert.h b/src/include/processor/operator/persistent/batch_insert.h new file mode 100644 index 0000000000..ccc380a160 --- /dev/null +++ b/src/include/processor/operator/persistent/batch_insert.h @@ -0,0 +1,86 @@ +#pragma once + +#include "processor/operator/sink.h" +#include "storage/store/table.h" + +namespace kuzu { +namespace processor { + +struct BatchInsertInfo { + catalog::TableCatalogEntry* tableEntry; + bool compressionEnabled; + + BatchInsertInfo(catalog::TableCatalogEntry* tableEntry, bool compressionEnabled) + : tableEntry{tableEntry}, compressionEnabled{compressionEnabled} {} + virtual ~BatchInsertInfo() = default; + + BatchInsertInfo(const BatchInsertInfo& other) = delete; + + inline virtual std::unique_ptr copy() const = 0; +}; + +struct BatchInsertSharedState { + std::mutex mtx; + std::atomic numRows; + storage::Table* table; + std::shared_ptr fTable; + storage::WAL* wal; + + BatchInsertSharedState( + storage::Table* table, std::shared_ptr fTable, storage::WAL* wal) + : numRows{0}, table{table}, fTable{std::move(fTable)}, wal{wal} {}; + BatchInsertSharedState(const BatchInsertSharedState& other) = delete; + + virtual ~BatchInsertSharedState() = default; + + std::unique_ptr copy() const { + auto result = std::make_unique(table, fTable, wal); + result->numRows.store(numRows.load()); + return result; + } + + inline void incrementNumRows(common::row_idx_t numRowsToIncrement) { + numRows.fetch_add(numRowsToIncrement); + } + inline common::row_idx_t getNumRows() { return numRows.load(); } + // NOLINTNEXTLINE(readability-make-member-function-const): Semantically non-const. + inline void logBatchInsertWALRecord() { + wal->logCopyTableRecord(table->getTableID()); + wal->flushAllPages(); + } + inline void setNumTuplesForTable() { table->setNumTuples(getNumRows()); } +}; + +struct BatchInsertLocalState { + std::unique_ptr nodeGroup; + + virtual ~BatchInsertLocalState() = default; +}; + +class BatchInsert : public Sink { +public: + BatchInsert(std::unique_ptr info, + std::shared_ptr sharedState, + std::unique_ptr resultSetDescriptor, uint32_t id, + const std::string& paramsString) + : Sink{std::move(resultSetDescriptor), PhysicalOperatorType::BATCH_INSERT, id, + paramsString}, + info{std::move(info)}, sharedState{std::move(sharedState)} {} + + ~BatchInsert() override = default; + + std::unique_ptr clone() override = 0; + + inline std::shared_ptr getSharedState() const { return sharedState; } + +protected: + void checkIfTableIsEmpty(); + +protected: + std::unique_ptr info; + std::shared_ptr sharedState; + std::unique_ptr localState; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/copy_node.h b/src/include/processor/operator/persistent/copy_node.h deleted file mode 100644 index 6627879f76..0000000000 --- a/src/include/processor/operator/persistent/copy_node.h +++ /dev/null @@ -1,139 +0,0 @@ -#pragma once - -#include - -#include "processor/operator/aggregate/hash_aggregate.h" -#include "processor/operator/call/in_query_call.h" -#include "processor/operator/persistent/index_builder.h" -#include "processor/operator/sink.h" -#include "storage/store/node_group.h" -#include "storage/store/node_table.h" -#include "storage/store/rel_table.h" - -namespace kuzu { -namespace processor { - -class CopyNodeSharedState { - friend class PlanMapper; - friend class CopyNode; - -public: - CopyNodeSharedState() - : readerSharedState{nullptr}, distinctSharedState{nullptr}, currentNodeGroupIdx{0}, - sharedNodeGroup{nullptr} {}; - - void init(ExecutionContext* context); - - inline common::offset_t getNextNodeGroupIdx() { - std::unique_lock lck{mtx}; - return getNextNodeGroupIdxWithoutLock(); - } - - inline uint64_t getCurNodeGroupIdx() const { return currentNodeGroupIdx; } - - void appendIncompleteNodeGroup(std::unique_ptr localNodeGroup, - std::optional& indexBuilder); - - void finalize(ExecutionContext* context); - -private: - inline common::offset_t getNextNodeGroupIdxWithoutLock() { return currentNodeGroupIdx++; } - - void calculateNumTuples(); - -public: - std::shared_ptr pkIndex; - // Number of tuples loaded. - uint64_t numTuples; - // Table storing result message. - std::shared_ptr fTable; - -private: - std::mutex mtx; - storage::WAL* wal; - storage::NodeTable* table; - std::vector> columnTypes; - // Primary key info - common::vector_idx_t pkColumnIdx; - common::LogicalType pkType; - std::optional globalIndexBuilder = std::nullopt; - - InQueryCallSharedState* readerSharedState; - HashAggregateSharedState* distinctSharedState; - - uint64_t currentNodeGroupIdx; - // The sharedNodeGroup is to accumulate left data within local node groups in CopyNode ops. - std::unique_ptr sharedNodeGroup; -}; - -struct CopyNodeInfo { - std::vector columnPositions; - storage::NodeTable* table; - std::unordered_set fwdRelTables; - std::unordered_set bwdRelTables; - std::string tableName; - bool containsSerial; - bool compressionEnabled; - - CopyNodeInfo(std::vector columnPositions, storage::NodeTable* table, - std::unordered_set fwdRelTables, - std::unordered_set bwdRelTables, std::string tableName, - bool containsSerial, bool compressionEnabled) - : columnPositions{std::move(columnPositions)}, table{table}, fwdRelTables{std::move( - fwdRelTables)}, - bwdRelTables{std::move(bwdRelTables)}, tableName{std::move(tableName)}, - containsSerial{containsSerial}, compressionEnabled{compressionEnabled} {} - CopyNodeInfo(const CopyNodeInfo& other) = default; - - inline std::unique_ptr copy() const { - return std::make_unique(*this); - } -}; - -class CopyNode : public Sink { -public: - CopyNode(std::shared_ptr sharedState, std::unique_ptr info, - std::unique_ptr resultSetDescriptor, - std::unique_ptr child, uint32_t id, const std::string& paramsString) - : Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child), - id, paramsString}, - sharedState{std::move(sharedState)}, info{std::move(info)} {} - - inline std::shared_ptr getSharedState() const { return sharedState; } - - inline bool canParallel() const final { return !info->containsSerial; } - - void initGlobalStateInternal(ExecutionContext* context) final; - - void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final; - - void executeInternal(ExecutionContext* context) override; - - void finalize(ExecutionContext* context) override; - - inline std::unique_ptr clone() final { - return std::make_unique(sharedState, info->copy(), resultSetDescriptor->copy(), - children[0]->clone(), id, paramsString); - } - - static void writeAndResetNodeGroup(common::node_group_idx_t nodeGroupIdx, - std::optional& indexBuilder, common::column_id_t pkColumnID, - storage::NodeTable* table, storage::NodeGroup* nodeGroup); - -private: - void copyToNodeGroup(); - -protected: - std::shared_ptr sharedState; - std::unique_ptr info; - - std::optional localIndexBuilder; - - common::DataChunkState* columnState; - std::vector> nullColumnVectors; - std::vector columnVectors; - std::unique_ptr localNodeGroup; -}; - -} // namespace processor -} // namespace kuzu diff --git a/src/include/processor/operator/persistent/copy_rel.h b/src/include/processor/operator/persistent/copy_rel.h deleted file mode 100644 index 2523354af6..0000000000 --- a/src/include/processor/operator/persistent/copy_rel.h +++ /dev/null @@ -1,121 +0,0 @@ -#pragma once - -#include "catalog/catalog_entry/rel_table_catalog_entry.h" -#include "common/enums/rel_direction.h" -#include "processor/operator/partitioner.h" -#include "processor/operator/sink.h" -#include "storage/stats/rels_store_statistics.h" -#include "storage/store/node_group.h" -#include "storage/store/rel_table.h" -#include "storage/wal/wal.h" - -namespace kuzu { -namespace processor { - -struct CopyRelInfo { - catalog::RelTableCatalogEntry* relTableEntry; - common::vector_idx_t partitioningIdx; - common::RelDataDirection dataDirection; - - storage::WAL* wal; - bool compressionEnabled; - - CopyRelInfo(catalog::RelTableCatalogEntry* relTableEntry, common::vector_idx_t partitioningIdx, - common::RelDataDirection dataDirection, storage::WAL* wal, bool compressionEnabled) - : relTableEntry{relTableEntry}, partitioningIdx{partitioningIdx}, - dataDirection{dataDirection}, wal{wal}, compressionEnabled{compressionEnabled} {} - CopyRelInfo(const CopyRelInfo& other) - : relTableEntry{other.relTableEntry}, partitioningIdx{other.partitioningIdx}, - dataDirection{other.dataDirection}, wal{other.wal}, compressionEnabled{ - other.compressionEnabled} {} - - inline std::unique_ptr copy() { return std::make_unique(*this); } -}; - -struct CopyRelSharedState { - common::table_id_t tableID; - storage::RelTable* table; - std::vector> columnTypes; - storage::RelsStoreStats* relsStatistics; - std::shared_ptr fTable; - std::atomic numRows; - - CopyRelSharedState(common::table_id_t tableID, storage::RelTable* table, - std::vector> columnTypes, - storage::RelsStoreStats* relsStatistics, storage::MemoryManager* memoryManager); - - void logCopyRelWALRecord(storage::WAL* wal); - inline void incrementNumRows(common::row_idx_t numRowsToIncrement) { - numRows.fetch_add(numRowsToIncrement); - } - inline void updateRelsStatistics() { relsStatistics->setNumTuplesForTable(tableID, numRows); } - inline common::offset_t getNextRelOffset(transaction::Transaction* transaction) const { - return relsStatistics->getRelStatistics(tableID, transaction)->getNextRelOffset(); - } -}; - -struct CopyRelLocalState { - common::partition_idx_t currentPartition = common::INVALID_PARTITION_IDX; - std::unique_ptr nodeGroup; -}; - -class CopyRel : public Sink { -public: - CopyRel(std::unique_ptr info, - std::shared_ptr partitionerSharedState, - std::shared_ptr sharedState, - std::unique_ptr resultSetDescriptor, uint32_t id, - const std::string& paramsString) - : Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_REL, id, paramsString}, - info{std::move(info)}, partitionerSharedState{std::move(partitionerSharedState)}, - sharedState{std::move(sharedState)} {} - - inline std::shared_ptr getSharedState() const { return sharedState; } - - inline bool isSource() const final { return true; } - - void initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) final; - void initGlobalStateInternal(ExecutionContext* context) final; - - void executeInternal(ExecutionContext* context) final; - void finalize(ExecutionContext* context) final; - - inline std::unique_ptr clone() final { - return std::make_unique(info->copy(), partitionerSharedState, sharedState, - resultSetDescriptor->copy(), id, paramsString); - } - -private: - inline bool isCopyAllowed() const { - return sharedState->getNextRelOffset(transaction::Transaction::getDummyWriteTrx().get()) == - 0; - } - - void prepareCSRNodeGroup(common::DataChunkCollection* partition, - common::offset_t startNodeOffset, common::vector_idx_t offsetVectorIdx, - common::offset_t numNodes); - - static common::length_t getGapSize(common::length_t length); - static std::vector populateStartCSROffsetsAndLengths( - storage::CSRHeaderChunks& csrHeader, common::offset_t numNodes, - common::DataChunkCollection* partition, common::vector_idx_t offsetVectorIdx); - static void populateEndCSROffsets( - storage::CSRHeaderChunks& csrHeader, std::vector& gaps); - static void setOffsetToWithinNodeGroup( - common::ValueVector* vector, common::offset_t startOffset); - static void setOffsetFromCSROffsets( - common::ValueVector* offsetVector, storage::ColumnChunk* offsetChunk); - - // We only check rel multiplcity constraint (MANY_ONE, ONE_ONE) for now. - std::optional checkRelMultiplicityConstraint( - const storage::CSRHeaderChunks& csrHeader); - -private: - std::unique_ptr info; - std::shared_ptr partitionerSharedState; - std::shared_ptr sharedState; - std::unique_ptr localState; -}; - -} // namespace processor -} // namespace kuzu diff --git a/src/include/processor/operator/persistent/node_batch_insert.h b/src/include/processor/operator/persistent/node_batch_insert.h new file mode 100644 index 0000000000..b1a4c14f64 --- /dev/null +++ b/src/include/processor/operator/persistent/node_batch_insert.h @@ -0,0 +1,117 @@ +#pragma once + +#include "processor/operator/aggregate/hash_aggregate.h" +#include "processor/operator/call/in_query_call.h" +#include "processor/operator/persistent/batch_insert.h" +#include "processor/operator/persistent/index_builder.h" +#include "storage/store/node_group.h" +#include "storage/store/node_table.h" + +namespace kuzu { +namespace processor { + +struct NodeBatchInsertInfo final : public BatchInsertInfo { + std::vector columnPositions; + bool containSerial = false; + std::vector> columnTypes; + + NodeBatchInsertInfo(catalog::TableCatalogEntry* tableEntry, bool compressionEnabled, + std::vector columnPositions, bool containSerial, + std::vector> columnTypes) + : BatchInsertInfo{tableEntry, compressionEnabled}, columnPositions{columnPositions}, + containSerial{containSerial}, columnTypes{std::move(columnTypes)} {} + + NodeBatchInsertInfo(const NodeBatchInsertInfo& other) + : BatchInsertInfo{other.tableEntry, other.compressionEnabled}, + columnPositions{other.columnPositions}, containSerial{other.containSerial}, + columnTypes{common::LogicalType::copy(other.columnTypes)} {} + + inline std::unique_ptr copy() const override { + return std::make_unique(*this); + } +}; + +struct NodeBatchInsertSharedState final : public BatchInsertSharedState { + // Primary key info + std::shared_ptr pkIndex; + common::vector_idx_t pkColumnIdx; + common::LogicalType pkType; + std::optional globalIndexBuilder = std::nullopt; + + InQueryCallSharedState* readerSharedState; + HashAggregateSharedState* distinctSharedState; + + uint64_t currentNodeGroupIdx; + // The sharedNodeGroup is to accumulate left data within local node groups in NodeBatchInsert + // ops. + std::unique_ptr sharedNodeGroup; + + NodeBatchInsertSharedState( + storage::Table* table, std::shared_ptr fTable, storage::WAL* wal) + : BatchInsertSharedState{table, fTable, wal}, readerSharedState{nullptr}, + distinctSharedState{nullptr}, currentNodeGroupIdx{0}, sharedNodeGroup{nullptr} {}; + + void initPKIndex(ExecutionContext* context); + + inline common::offset_t getNextNodeGroupIdx() { + std::unique_lock lck{mtx}; + return getNextNodeGroupIdxWithoutLock(); + } + + inline uint64_t getCurNodeGroupIdx() const { return currentNodeGroupIdx; } + + void appendIncompleteNodeGroup(std::unique_ptr localNodeGroup, + std::optional& indexBuilder); + + inline common::offset_t getNextNodeGroupIdxWithoutLock() { return currentNodeGroupIdx++; } + + void calculateNumTuples(); +}; + +struct NodeBatchInsertLocalState final : public BatchInsertLocalState { + std::optional localIndexBuilder; + + common::DataChunkState* columnState; + std::vector> nullColumnVectors; + std::vector columnVectors; +}; + +class NodeBatchInsert final : public BatchInsert { +public: + NodeBatchInsert(std::unique_ptr info, + std::shared_ptr sharedState, + std::unique_ptr resultSetDescriptor, + std::unique_ptr child, uint32_t id, const std::string& paramsString) + : BatchInsert{std::move(info), std::move(sharedState), std::move(resultSetDescriptor), id, + paramsString} { + children.push_back(std::move(child)); + } + + inline bool canParallel() const override { + auto nodeInfo = common::ku_dynamic_cast(info.get()); + return !nodeInfo->containSerial; + } + + void initGlobalStateInternal(ExecutionContext* context) override; + + void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override; + + void executeInternal(ExecutionContext* context) override; + + void finalize(ExecutionContext* context) override; + + inline std::unique_ptr clone() override { + return std::make_unique(info->copy(), sharedState, + resultSetDescriptor->copy(), children[0]->clone(), id, paramsString); + } + + static void writeAndResetNodeGroup(common::node_group_idx_t nodeGroupIdx, + std::optional& indexBuilder, common::column_id_t pkColumnID, + storage::NodeTable* table, storage::NodeGroup* nodeGroup); + +private: + void copyToNodeGroup(); +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/rel_batch_insert.h b/src/include/processor/operator/persistent/rel_batch_insert.h new file mode 100644 index 0000000000..ed10d2091f --- /dev/null +++ b/src/include/processor/operator/persistent/rel_batch_insert.h @@ -0,0 +1,87 @@ +#pragma once + +#include "common/enums/rel_direction.h" +#include "processor/operator/partitioner.h" +#include "processor/operator/persistent/batch_insert.h" +#include "storage/store/node_group.h" + +namespace kuzu { +namespace processor { + +struct RelBatchInsertInfo final : public BatchInsertInfo { + common::RelDataDirection direction; + uint64_t partitioningIdx; + common::vector_idx_t offsetVectorIdx; + std::vector> columnTypes; + + RelBatchInsertInfo(catalog::TableCatalogEntry* tableEntry, bool compressionEnabled, + common::RelDataDirection direction, uint64_t partitioningIdx, + common::vector_idx_t offsetVectorIdx, + std::vector> columnTypes) + : BatchInsertInfo{tableEntry, compressionEnabled}, direction{direction}, + partitioningIdx{partitioningIdx}, offsetVectorIdx{offsetVectorIdx}, columnTypes{std::move( + columnTypes)} {} + RelBatchInsertInfo(const RelBatchInsertInfo& other) + : BatchInsertInfo{other.tableEntry, other.compressionEnabled}, direction{other.direction}, + partitioningIdx{other.partitioningIdx}, offsetVectorIdx{other.offsetVectorIdx}, + columnTypes{common::LogicalType::copy(other.columnTypes)} {} + + inline std::unique_ptr copy() const override { + return std::make_unique(*this); + } +}; + +struct RelBatchInsertLocalState final : public BatchInsertLocalState { + common::partition_idx_t nodeGroupIdx = common::INVALID_NODE_GROUP_IDX; +}; + +class RelBatchInsert final : public BatchInsert { +public: + RelBatchInsert(std::unique_ptr info, + std::shared_ptr partitionerSharedState, + std::shared_ptr sharedState, + std::unique_ptr resultSetDescriptor, uint32_t id, + const std::string& paramsString) + : BatchInsert{std::move(info), std::move(sharedState), std::move(resultSetDescriptor), id, + paramsString}, + partitionerSharedState{std::move(partitionerSharedState)} {} + + inline bool isSource() const override { return true; } + + void initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) override; + void initGlobalStateInternal(ExecutionContext* context) override; + + void executeInternal(ExecutionContext* context) override; + void finalize(ExecutionContext* context) override; + + inline std::unique_ptr clone() override { + return std::make_unique(info->copy(), partitionerSharedState, sharedState, + resultSetDescriptor->copy(), id, paramsString); + } + +private: + void prepareCSRNodeGroup(common::DataChunkCollection* partition, + common::offset_t startNodeOffset, common::vector_idx_t offsetVectorIdx, + common::offset_t numNodes); + + static common::length_t getGapSize(common::length_t length); + static std::vector populateStartCSROffsetsAndLengths( + storage::CSRHeaderChunks& csrHeader, common::offset_t numNodes, + common::DataChunkCollection* partition, common::vector_idx_t offsetVectorIdx); + static void populateEndCSROffsets( + storage::CSRHeaderChunks& csrHeader, std::vector& gaps); + static void setOffsetToWithinNodeGroup( + common::ValueVector* vector, common::offset_t startOffset); + static void setOffsetFromCSROffsets( + common::ValueVector* offsetVector, storage::ColumnChunk* offsetChunk); + + // We only check rel multiplcity constraint (MANY_ONE, ONE_ONE) for now. + std::optional checkRelMultiplicityConstraint( + const storage::CSRHeaderChunks& csrHeader); + +private: + std::shared_ptr partitionerSharedState; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/physical_operator.h b/src/include/processor/operator/physical_operator.h index 6fdf17eb29..55f5e26d34 100644 --- a/src/include/processor/operator/physical_operator.h +++ b/src/include/processor/operator/physical_operator.h @@ -11,13 +11,12 @@ enum class PhysicalOperatorType : uint8_t { ADD_PROPERTY, AGGREGATE, AGGREGATE_SCAN, + BATCH_INSERT, COMMENT_ON, CREATE_MACRO, STANDALONE_CALL, IN_QUERY_CALL, - COPY_NODE, COPY_RDF, - COPY_REL, COPY_TO, CREATE_NODE_TABLE, CREATE_REL_TABLE, diff --git a/src/include/processor/plan_mapper.h b/src/include/processor/plan_mapper.h index 9669ac1311..cf193bf988 100644 --- a/src/include/processor/plan_mapper.h +++ b/src/include/processor/plan_mapper.h @@ -27,7 +27,7 @@ class NodeInsertExecutor; class RelInsertExecutor; class NodeSetExecutor; class RelSetExecutor; -struct CopyRelSharedState; +struct BatchInsertSharedState; struct PartitionerSharedState; class PlanMapper { @@ -109,8 +109,9 @@ class PlanMapper { std::unique_ptr createCopyRel( std::shared_ptr partitionerSharedState, - std::shared_ptr sharedState, planner::LogicalCopyFrom* copyFrom, - common::RelDataDirection direction); + std::shared_ptr sharedState, planner::LogicalCopyFrom* copyFrom, + common::RelDataDirection direction, + std::vector> columnTypes); std::unique_ptr createResultCollector(common::AccumulateType accumulateType, const binder::expression_vector& expressions, planner::Schema* schema, diff --git a/src/include/storage/store/table.h b/src/include/storage/store/table.h index 399fcbab8f..be35656ad5 100644 --- a/src/include/storage/store/table.h +++ b/src/include/storage/store/table.h @@ -21,6 +21,12 @@ class Table { inline common::TableType getTableType() const { return tableType; } inline common::table_id_t getTableID() const { return tableID; } + inline common::row_idx_t getNumTuples() const { + return tablesStatistics->getNumTuplesForTable(tableID); + } + inline void setNumTuples(uint64_t numTuples) { + tablesStatistics->setNumTuplesForTable(tableID, numTuples); + } virtual void read(transaction::Transaction* transaction, TableReadState& readState, common::ValueVector* inNodeIDVector, diff --git a/src/include/storage/wal/wal.h b/src/include/storage/wal/wal.h index 03c94e08a6..042c97cb80 100644 --- a/src/include/storage/wal/wal.h +++ b/src/include/storage/wal/wal.h @@ -110,7 +110,7 @@ class WAL : public BaseWALAndWALIterator { void logOverflowFileNextBytePosRecord(DBFileID dbFileID, uint64_t prevNextByteToWriteTo); - void logCopyTableRecord(common::table_id_t tableID, common::TableType tableType); + void logCopyTableRecord(common::table_id_t tableID); void logDropTableRecord(common::table_id_t tableID); diff --git a/src/include/storage/wal/wal_record.h b/src/include/storage/wal/wal_record.h index 492c7ae176..ef11cd2431 100644 --- a/src/include/storage/wal/wal_record.h +++ b/src/include/storage/wal/wal_record.h @@ -147,16 +147,12 @@ struct DiskOverflowFileNextBytePosRecord { struct CopyTableRecord { common::table_id_t tableID; - common::TableType tableType; CopyTableRecord() = default; - explicit CopyTableRecord(common::table_id_t tableID, common::TableType tableType) - : tableID{tableID}, tableType{tableType} {} + explicit CopyTableRecord(common::table_id_t tableID) : tableID{tableID} {} - inline bool operator==(const CopyTableRecord& rhs) const { - return tableID == rhs.tableID && tableType == rhs.tableType; - } + inline bool operator==(const CopyTableRecord& rhs) const { return tableID == rhs.tableID; } }; struct TableStatisticsRecord { @@ -240,7 +236,7 @@ struct WALRecord { common::table_id_t resourceTripleTableID, common::table_id_t literalTripleTableID); static WALRecord newOverflowFileNextBytePosRecord( DBFileID dbFileID, uint64_t prevNextByteToWriteTo_); - static WALRecord newCopyTableRecord(common::table_id_t tableID, common::TableType tableType); + static WALRecord newCopyTableRecord(common::table_id_t tableID); static WALRecord newDropTableRecord(common::table_id_t tableID); static WALRecord newDropPropertyRecord( common::table_id_t tableID, common::property_id_t propertyID); diff --git a/src/main/database.cpp b/src/main/database.cpp index d3349e99a3..7a5bc11080 100644 --- a/src/main/database.cpp +++ b/src/main/database.cpp @@ -12,7 +12,6 @@ #include "common/logging_level_utils.h" #include "common/utils.h" #include "extension/extension.h" -#include "function/scalar_function.h" #include "main/db_config.h" #include "processor/processor.h" #include "spdlog/spdlog.h" diff --git a/src/processor/map/map_copy_from.cpp b/src/processor/map/map_copy_from.cpp index 4e29b502ae..11af63835a 100644 --- a/src/processor/map/map_copy_from.cpp +++ b/src/processor/map/map_copy_from.cpp @@ -6,9 +6,9 @@ #include "processor/operator/call/in_query_call.h" #include "processor/operator/index_lookup.h" #include "processor/operator/partitioner.h" -#include "processor/operator/persistent/copy_node.h" #include "processor/operator/persistent/copy_rdf.h" -#include "processor/operator/persistent/copy_rel.h" +#include "processor/operator/persistent/node_batch_insert.h" +#include "processor/operator/persistent/rel_batch_insert.h" #include "processor/plan_mapper.h" using namespace kuzu::binder; @@ -25,17 +25,17 @@ std::unique_ptr PlanMapper::mapCopyFrom(LogicalOperator* logic switch (copyFrom->getInfo()->tableEntry->getTableType()) { case TableType::NODE: { auto op = mapCopyNodeFrom(logicalOperator); - auto copy = ku_dynamic_cast(op.get()); + auto copy = ku_dynamic_cast(op.get()); auto table = copy->getSharedState()->fTable; return createFactorizedTableScanAligned(copyFrom->getOutExprs(), copyFrom->getSchema(), table, DEFAULT_VECTOR_CAPACITY /* maxMorselSize */, std::move(op)); } case TableType::REL: { auto ops = mapCopyRelFrom(logicalOperator); - auto copy = ku_dynamic_cast(ops[0].get()); - auto table = copy->getSharedState()->fTable; + auto relBatchInsert = ku_dynamic_cast(ops[0].get()); + auto fTable = relBatchInsert->getSharedState()->fTable; auto scan = createFactorizedTableScanAligned(copyFrom->getOutExprs(), copyFrom->getSchema(), - table, DEFAULT_VECTOR_CAPACITY /* maxMorselSize */, std::move(ops[0])); + fTable, DEFAULT_VECTOR_CAPACITY /* maxMorselSize */, std::move(ops[0])); for (auto i = 1u; i < ops.size(); ++i) { scan->addChild(std::move(ops[i])); } @@ -105,7 +105,9 @@ std::unique_ptr PlanMapper::mapCopyNodeFrom(LogicalOperator* l ku_dynamic_cast(copyFromInfo->tableEntry); // Map reader. auto prevOperator = mapOperator(copyFrom->getChild(0).get()); - auto sharedState = std::make_shared(); + auto nodeTable = storageManager.getNodeTable(nodeTableEntry->getTableID()); + auto sharedState = std::make_shared( + nodeTable, getSingleStringColumnFTable(), storageManager.getWAL()); if (prevOperator->getOperatorType() == PhysicalOperatorType::IN_QUERY_CALL) { auto inQueryCall = ku_dynamic_cast(prevOperator.get()); sharedState->readerSharedState = inQueryCall->getSharedState(); @@ -115,13 +117,9 @@ std::unique_ptr PlanMapper::mapCopyNodeFrom(LogicalOperator* l sharedState->distinctSharedState = hashScan->getSharedState().get(); } // Map copy node. - auto nodeTable = storageManager.getNodeTable(nodeTableEntry->getTableID()); - sharedState->wal = storageManager.getWAL(); - sharedState->table = nodeTable; auto pk = nodeTableEntry->getPrimaryKey(); sharedState->pkColumnIdx = nodeTableEntry->getColumnID(pk->getPropertyID()); sharedState->pkType = *pk->getDataType(); - sharedState->fTable = getSingleStringColumnFTable(); std::vector columnNames; logical_types_t columnTypes; getNodeColumnsInCopyOrder(nodeTableEntry, columnNames, columnTypes); @@ -134,21 +132,12 @@ std::unique_ptr PlanMapper::mapCopyNodeFrom(LogicalOperator* l } auto inputColumns = copyFromInfo->fileScanInfo->columns; inputColumns.push_back(copyFromInfo->fileScanInfo->offset); - std::unordered_set fwdRelTables; - std::unordered_set bwdRelTables; - for (auto relTableID : nodeTableEntry->getFwdRelTableIDSet()) { - fwdRelTables.insert(storageManager.getRelTable(relTableID)); - } - for (auto relTableID : nodeTableEntry->getBwdRelTableIDSet()) { - bwdRelTables.insert(storageManager.getRelTable(relTableID)); - } auto columnPositions = getColumnDataPositions(columnNamesExcludingSerial, inputColumns, *outFSchema); - sharedState->columnTypes = std::move(columnTypes); - auto info = std::make_unique(std::move(columnPositions), nodeTable, fwdRelTables, - bwdRelTables, nodeTableEntry->getName(), copyFromInfo->containsSerial, - storageManager.compressionEnabled()); - return std::make_unique(sharedState, std::move(info), + auto info = + std::make_unique(nodeTableEntry, storageManager.compressionEnabled(), + std::move(columnPositions), copyFromInfo->containsSerial, std::move(columnTypes)); + return std::make_unique(std::move(info), sharedState, std::make_unique(copyFrom->getSchema()), std::move(prevOperator), getOperatorID(), copyFrom->getExpressionsForPrinting()); } @@ -178,17 +167,18 @@ std::unique_ptr PlanMapper::mapPartitioner(LogicalOperator* lo std::unique_ptr PlanMapper::createCopyRel( std::shared_ptr partitionerSharedState, - std::shared_ptr sharedState, LogicalCopyFrom* copyFrom, - RelDataDirection direction) { + std::shared_ptr sharedState, LogicalCopyFrom* copyFrom, + RelDataDirection direction, std::vector> columnTypes) { auto copyFromInfo = copyFrom->getInfo(); auto outFSchema = copyFrom->getSchema(); - auto relTableEntry = - ku_dynamic_cast(copyFromInfo->tableEntry); auto partitioningIdx = direction == RelDataDirection::FWD ? 0 : 1; - auto copyRelInfo = std::make_unique(relTableEntry, partitioningIdx, direction, - storageManager.getWAL(), storageManager.compressionEnabled()); - return std::make_unique(std::move(copyRelInfo), std::move(partitionerSharedState), - std::move(sharedState), std::make_unique(outFSchema), getOperatorID(), + auto offsetVectorIdx = direction == RelDataDirection::FWD ? 0 : 1; + auto relBatchInsertInfo = std::make_unique(copyFromInfo->tableEntry, + storageManager.compressionEnabled(), direction, partitioningIdx, offsetVectorIdx, + std::move(columnTypes)); + return std::make_unique(std::move(relBatchInsertInfo), + std::move(partitionerSharedState), std::move(sharedState), + std::make_unique(outFSchema), getOperatorID(), copyFrom->getExpressionsForPrinting()); } @@ -210,13 +200,13 @@ physical_op_vector_t PlanMapper::mapCopyRelFrom(LogicalOperator* logicalOperator for (auto& property : relTableEntry->getPropertiesRef()) { columnTypes.push_back(property.getDataType()->copy()); } - auto copyRelSharedState = std::make_shared(relTableEntry->getTableID(), - storageManager.getRelTable(relTableEntry->getTableID()), std::move(columnTypes), - storageManager.getRelsStatistics(), memoryManager); - auto copyRelFWD = - createCopyRel(partitionerSharedState, copyRelSharedState, copyFrom, RelDataDirection::FWD); - auto copyRelBWD = - createCopyRel(partitionerSharedState, copyRelSharedState, copyFrom, RelDataDirection::BWD); + auto batchInsertSharedState = std::make_shared( + storageManager.getRelTable(relTableEntry->getTableID()), getSingleStringColumnFTable(), + storageManager.getWAL()); + auto copyRelFWD = createCopyRel(partitionerSharedState, batchInsertSharedState, copyFrom, + RelDataDirection::FWD, LogicalType::copy(columnTypes)); + auto copyRelBWD = createCopyRel(partitionerSharedState, batchInsertSharedState, copyFrom, + RelDataDirection::BWD, std::move(columnTypes)); physical_op_vector_t result; result.push_back(std::move(copyRelBWD)); result.push_back(std::move(copyRelFWD)); @@ -239,25 +229,29 @@ std::unique_ptr PlanMapper::mapCopyRdfFrom(LogicalOperator* lo } auto rChild = mapCopyNodeFrom(logicalRChild); - KU_ASSERT(rChild->getOperatorType() == PhysicalOperatorType::COPY_NODE); - auto rCopy = ku_dynamic_cast(rChild.get()); + KU_ASSERT(rChild->getOperatorType() == PhysicalOperatorType::BATCH_INSERT); + auto rCopy = ku_dynamic_cast(rChild.get()); auto lChild = mapCopyNodeFrom(logicalLChild); - auto lCopy = ku_dynamic_cast(lChild.get()); + auto lCopy = ku_dynamic_cast(lChild.get()); auto rrrChildren = mapCopyRelFrom(logicalRRRChild); KU_ASSERT(rrrChildren[2]->getOperatorType() == PhysicalOperatorType::PARTITIONER); auto rrrPartitioner = ku_dynamic_cast(rrrChildren[2].get()); - rrrPartitioner->getSharedState()->copyNodeSharedStates.push_back(rCopy->getSharedState()); - rrrPartitioner->getSharedState()->copyNodeSharedStates.push_back(rCopy->getSharedState()); + rrrPartitioner->getSharedState()->nodeBatchInsertSharedStates.push_back( + rCopy->getSharedState()); + rrrPartitioner->getSharedState()->nodeBatchInsertSharedStates.push_back( + rCopy->getSharedState()); KU_ASSERT(rrrChildren[2]->getChild(0)->getOperatorType() == PhysicalOperatorType::INDEX_LOOKUP); auto rrrLookup = ku_dynamic_cast(rrrChildren[2]->getChild(0)); - rrrLookup->setCopyNodeSharedState(rCopy->getSharedState()); + rrrLookup->setBatchInsertSharedState(rCopy->getSharedState()); auto rrlChildren = mapCopyRelFrom(logicalRRLChild); auto rrLPartitioner = ku_dynamic_cast(rrlChildren[2].get()); - rrLPartitioner->getSharedState()->copyNodeSharedStates.push_back(rCopy->getSharedState()); - rrLPartitioner->getSharedState()->copyNodeSharedStates.push_back(lCopy->getSharedState()); + rrLPartitioner->getSharedState()->nodeBatchInsertSharedStates.push_back( + rCopy->getSharedState()); + rrLPartitioner->getSharedState()->nodeBatchInsertSharedStates.push_back( + lCopy->getSharedState()); KU_ASSERT(rrlChildren[2]->getChild(0)->getOperatorType() == PhysicalOperatorType::INDEX_LOOKUP); auto rrlLookup = ku_dynamic_cast(rrlChildren[2]->getChild(0)); - rrlLookup->setCopyNodeSharedState(rCopy->getSharedState()); + rrlLookup->setBatchInsertSharedState(rCopy->getSharedState()); auto sharedState = std::make_shared(); auto fTable = getSingleStringColumnFTable(); sharedState->fTable = fTable; diff --git a/src/processor/operator/index_lookup.cpp b/src/processor/operator/index_lookup.cpp index 4919c1969f..1fa7b89e1d 100644 --- a/src/processor/operator/index_lookup.cpp +++ b/src/processor/operator/index_lookup.cpp @@ -5,6 +5,7 @@ #include "common/types/ku_string.h" #include "common/types/types.h" #include "common/vector/value_vector.h" +#include "processor/operator/persistent/node_batch_insert.h" #include "storage/index/hash_index.h" #include "transaction/transaction.h" @@ -35,10 +36,10 @@ std::unique_ptr IndexLookup::clone() { std::move(copiedInfos), children[0]->clone(), getOperatorID(), paramsString); } -void IndexLookup::setCopyNodeSharedState(std::shared_ptr sharedState) { +void IndexLookup::setBatchInsertSharedState(std::shared_ptr sharedState) { for (auto& info : infos) { - KU_ASSERT(info->copyNodeSharedState == nullptr); - info->copyNodeSharedState = sharedState; + KU_ASSERT(info->batchInsertSharedState == nullptr); + info->batchInsertSharedState = sharedState; } } @@ -65,7 +66,7 @@ void IndexLookup::checkNullKeys(ValueVector* keyVector) { void stringPKFillOffsetArraysFromVector(transaction::Transaction* transaction, const IndexLookupInfo& info, ValueVector* keyVector, offset_t* offsets) { auto numKeys = keyVector->state->selVector->selectedSize; - if (info.copyNodeSharedState == nullptr) { + if (info.batchInsertSharedState == nullptr) { for (auto i = 0u; i < numKeys; i++) { auto key = keyVector->getValue(keyVector->state->selVector->selectedPositions[i]); @@ -74,10 +75,13 @@ void stringPKFillOffsetArraysFromVector(transaction::Transaction* transaction, } } } else { + auto nodeBatchInsertSharedState = + ku_dynamic_cast( + info.batchInsertSharedState.get()); for (auto i = 0u; i < numKeys; i++) { auto key = keyVector->getValue(keyVector->state->selVector->selectedPositions[i]); - if (!info.copyNodeSharedState->pkIndex->lookup(key.getAsStringView(), offsets[i])) { + if (!nodeBatchInsertSharedState->pkIndex->lookup(key.getAsStringView(), offsets[i])) { throw RuntimeException(ExceptionMessage::nonExistentPKException(key.getAsString())); } } @@ -88,7 +92,7 @@ template void primitivePKFillOffsetArraysFromVector(transaction::Transaction* transaction, const IndexLookupInfo& info, ValueVector* keyVector, offset_t* offsets) { auto numKeys = keyVector->state->selVector->selectedSize; - if (info.copyNodeSharedState == nullptr) { + if (info.batchInsertSharedState == nullptr) { for (auto i = 0u; i < numKeys; i++) { auto pos = keyVector->state->selVector->selectedPositions[i]; auto key = keyVector->getValue(pos); @@ -98,10 +102,13 @@ void primitivePKFillOffsetArraysFromVector(transaction::Transaction* transaction } } } else { + auto nodeBatchInsertSharedState = + ku_dynamic_cast( + info.batchInsertSharedState.get()); for (auto i = 0u; i < numKeys; i++) { auto pos = keyVector->state->selVector->selectedPositions[i]; auto key = keyVector->getValue(pos); - if (!info.copyNodeSharedState->pkIndex->lookup(key, offsets[i])) { + if (!nodeBatchInsertSharedState->pkIndex->lookup(key, offsets[i])) { throw RuntimeException( ExceptionMessage::nonExistentPKException(TypeUtils::toString(key))); } diff --git a/src/processor/operator/partitioner.cpp b/src/processor/operator/partitioner.cpp index 68b33d09bb..8db8df79e2 100644 --- a/src/processor/operator/partitioner.cpp +++ b/src/processor/operator/partitioner.cpp @@ -1,5 +1,7 @@ #include "processor/operator/partitioner.h" +#include "storage/store/node_table.h" + using namespace kuzu::common; using namespace kuzu::storage; diff --git a/src/processor/operator/persistent/CMakeLists.txt b/src/processor/operator/persistent/CMakeLists.txt index 3c8a9693d8..c9bf131dce 100644 --- a/src/processor/operator/persistent/CMakeLists.txt +++ b/src/processor/operator/persistent/CMakeLists.txt @@ -3,9 +3,10 @@ add_subdirectory(writer/parquet) add_library(kuzu_processor_operator_persistent OBJECT - copy_node.cpp + batch_insert.cpp + node_batch_insert.cpp copy_rdf.cpp - copy_rel.cpp + rel_batch_insert.cpp copy_to.cpp copy_to_csv.cpp copy_to_parquet.cpp diff --git a/src/processor/operator/persistent/batch_insert.cpp b/src/processor/operator/persistent/batch_insert.cpp new file mode 100644 index 0000000000..37e36cd345 --- /dev/null +++ b/src/processor/operator/persistent/batch_insert.cpp @@ -0,0 +1,18 @@ +#include "processor/operator/persistent/batch_insert.h" + +#include "common/exception/copy.h" +#include "common/exception/message.h" + +using namespace kuzu::common; + +namespace kuzu { +namespace processor { + +void BatchInsert::checkIfTableIsEmpty() { + if (sharedState->table->getNumTuples() != 0) { + throw CopyException(ExceptionMessage::notAllowCopyOnNonEmptyTableException()); + } +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/persistent/copy_node.cpp b/src/processor/operator/persistent/copy_node.cpp deleted file mode 100644 index 122c8f3de1..0000000000 --- a/src/processor/operator/persistent/copy_node.cpp +++ /dev/null @@ -1,186 +0,0 @@ -#include "processor/operator/persistent/copy_node.h" - -#include "common/exception/copy.h" -#include "common/exception/message.h" -#include "common/string_format.h" -#include "common/types/types.h" -#include "function/table/scan_functions.h" -#include "processor/result/factorized_table.h" - -using namespace kuzu::catalog; -using namespace kuzu::common; -using namespace kuzu::storage; - -namespace kuzu { -namespace processor { - -void CopyNodeSharedState::init(ExecutionContext* context) { - wal->logCopyTableRecord(table->getTableID(), TableType::NODE); - wal->flushAllPages(); - if (pkType.getLogicalTypeID() != LogicalTypeID::SERIAL) { - auto indexFName = StorageUtils::getNodeIndexFName(context->clientContext->getVFSUnsafe(), - wal->getDirectory(), table->getTableID(), FileVersionType::ORIGINAL); - pkIndex = std::make_unique( - indexFName, pkType.getPhysicalType(), context->clientContext->getVFSUnsafe()); - uint64_t numRows; - if (readerSharedState != nullptr) { - KU_ASSERT(distinctSharedState == nullptr); - auto scanSharedState = reinterpret_cast( - readerSharedState->funcState.get()); - numRows = scanSharedState->numRows; - } else { - numRows = distinctSharedState->getFactorizedTable()->getNumTuples(); - } - pkIndex->bulkReserve(numRows); - globalIndexBuilder = IndexBuilder(std::make_shared(pkIndex.get())); - } -} - -void CopyNodeSharedState::appendIncompleteNodeGroup( - std::unique_ptr localNodeGroup, std::optional& indexBuilder) { - std::unique_lock xLck{mtx}; - if (!sharedNodeGroup) { - sharedNodeGroup = std::move(localNodeGroup); - return; - } - auto numNodesAppended = - sharedNodeGroup->append(localNodeGroup.get(), 0 /* offsetInNodeGroup */); - if (sharedNodeGroup->isFull()) { - auto nodeGroupIdx = getNextNodeGroupIdxWithoutLock(); - CopyNode::writeAndResetNodeGroup( - nodeGroupIdx, indexBuilder, pkColumnIdx, table, sharedNodeGroup.get()); - } - if (numNodesAppended < localNodeGroup->getNumRows()) { - sharedNodeGroup->append(localNodeGroup.get(), numNodesAppended); - } -} - -void CopyNodeSharedState::finalize(ExecutionContext* context) { - calculateNumTuples(); - if (sharedNodeGroup) { - auto nodeGroupIdx = getNextNodeGroupIdx(); - CopyNode::writeAndResetNodeGroup( - nodeGroupIdx, globalIndexBuilder, pkColumnIdx, table, sharedNodeGroup.get()); - } - table->getNodeStatisticsAndDeletedIDs()->setNumTuplesForTable(table->getTableID(), numTuples); - if (globalIndexBuilder) { - globalIndexBuilder->finalize(context); - } -} - -static bool isEmptyTable(NodeTable* nodeTable) { - auto nodesStatistics = nodeTable->getNodeStatisticsAndDeletedIDs(); - return nodesStatistics->getNodeStatisticsAndDeletedIDs(nodeTable->getTableID()) - ->getNumTuples() == 0; -} - -void CopyNode::initGlobalStateInternal(ExecutionContext* context) { - if (!isEmptyTable(info->table)) { - throw CopyException(ExceptionMessage::notAllowCopyOnNonEmptyTableException()); - } - sharedState->init(context); -} - -void CopyNode::initLocalStateInternal(ResultSet* resultSet, ExecutionContext*) { - std::shared_ptr state; - for (auto& pos : info->columnPositions) { - if (pos.isValid()) { - state = resultSet->getValueVector(pos)->state; - } - } - - // NOLINTBEGIN(bugprone-unchecked-optional-access) - if (sharedState->globalIndexBuilder) { - localIndexBuilder = sharedState->globalIndexBuilder.value().clone(); - } - // NOLINTEND(bugprone-unchecked-optional-access) - - KU_ASSERT(state != nullptr); - for (auto i = 0u; i < info->columnPositions.size(); ++i) { - auto pos = info->columnPositions[i]; - if (pos.isValid()) { - columnVectors.push_back(resultSet->getValueVector(pos).get()); - } else { - auto columnType = sharedState->columnTypes[i].get(); - auto nullVector = std::make_shared(*columnType); - nullVector->setState(state); - nullVector->setAllNull(); - nullColumnVectors.push_back(nullVector); - columnVectors.push_back(nullVector.get()); - } - } - localNodeGroup = NodeGroupFactory::createNodeGroup( - ColumnDataFormat::REGULAR, sharedState->columnTypes, info->compressionEnabled); - columnState = state.get(); -} - -void CopyNode::executeInternal(ExecutionContext* context) { - std::optional token; - if (localIndexBuilder) { - token = localIndexBuilder->getProducerToken(); - } - - while (children[0]->getNextTuple(context)) { - auto originalSelVector = columnState->selVector; - copyToNodeGroup(); - columnState->selVector = std::move(originalSelVector); - } - if (localNodeGroup->getNumRows() > 0) { - sharedState->appendIncompleteNodeGroup(std::move(localNodeGroup), localIndexBuilder); - } - if (localIndexBuilder) { - KU_ASSERT(token); - token->quit(); - localIndexBuilder->finishedProducing(); - } -} - -void CopyNode::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx, - std::optional& indexBuilder, column_id_t pkColumnID, NodeTable* table, - NodeGroup* nodeGroup) { - nodeGroup->finalize(nodeGroupIdx); - if (indexBuilder) { - auto nodeOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); - indexBuilder->insert( - nodeGroup->getColumnChunk(pkColumnID), nodeOffset, nodeGroup->getNumRows()); - } - table->append(nodeGroup); - nodeGroup->resetToEmpty(); -} - -void CopyNodeSharedState::calculateNumTuples() { - numTuples = StorageUtils::getStartOffsetOfNodeGroup(getCurNodeGroupIdx()); - if (sharedNodeGroup) { - numTuples += sharedNodeGroup->getNumRows(); - } -} - -void CopyNode::finalize(ExecutionContext* context) { - sharedState->finalize(context); - auto outputMsg = stringFormat("{} number of tuples has been copied to table: {}.", - sharedState->numTuples, info->tableName.c_str()); - FactorizedTableUtils::appendStringToTable( - sharedState->fTable.get(), outputMsg, context->clientContext->getMemoryManager()); -} - -void CopyNode::copyToNodeGroup() { - auto numAppendedTuples = 0ul; - auto numTuplesToAppend = columnState->getNumSelectedValues(); - while (numAppendedTuples < numTuplesToAppend) { - auto numAppendedTuplesInNodeGroup = localNodeGroup->append( - columnVectors, columnState, numTuplesToAppend - numAppendedTuples); - numAppendedTuples += numAppendedTuplesInNodeGroup; - if (localNodeGroup->isFull()) { - node_group_idx_t nodeGroupIdx; - nodeGroupIdx = sharedState->getNextNodeGroupIdx(); - writeAndResetNodeGroup(nodeGroupIdx, localIndexBuilder, sharedState->pkColumnIdx, - sharedState->table, localNodeGroup.get()); - } - if (numAppendedTuples < numTuplesToAppend) { - columnState->slice((offset_t)numAppendedTuplesInNodeGroup); - } - } -} - -} // namespace processor -} // namespace kuzu diff --git a/src/processor/operator/persistent/node_batch_insert.cpp b/src/processor/operator/persistent/node_batch_insert.cpp new file mode 100644 index 0000000000..ecfdffdcda --- /dev/null +++ b/src/processor/operator/persistent/node_batch_insert.cpp @@ -0,0 +1,195 @@ +#include "processor/operator/persistent/node_batch_insert.h" + +#include "common/string_format.h" +#include "common/types/types.h" +#include "function/table/scan_functions.h" +#include "processor/result/factorized_table.h" + +using namespace kuzu::catalog; +using namespace kuzu::common; +using namespace kuzu::storage; + +namespace kuzu { +namespace processor { + +void NodeBatchInsertSharedState::initPKIndex(kuzu::processor::ExecutionContext* context) { + KU_ASSERT(pkType.getLogicalTypeID() != LogicalTypeID::SERIAL); + auto indexFName = StorageUtils::getNodeIndexFName(context->clientContext->getVFSUnsafe(), + wal->getDirectory(), table->getTableID(), FileVersionType::ORIGINAL); + pkIndex = std::make_unique( + indexFName, pkType.getPhysicalType(), context->clientContext->getVFSUnsafe()); + uint64_t numRows; + if (readerSharedState != nullptr) { + KU_ASSERT(distinctSharedState == nullptr); + auto scanSharedState = + reinterpret_cast(readerSharedState->funcState.get()); + numRows = scanSharedState->numRows; + } else { + numRows = distinctSharedState->getFactorizedTable()->getNumTuples(); + } + pkIndex->bulkReserve(numRows); + globalIndexBuilder = IndexBuilder(std::make_shared(pkIndex.get())); +} + +void NodeBatchInsertSharedState::appendIncompleteNodeGroup( + std::unique_ptr localNodeGroup, std::optional& indexBuilder) { + std::unique_lock xLck{mtx}; + if (!sharedNodeGroup) { + sharedNodeGroup = std::move(localNodeGroup); + return; + } + auto numNodesAppended = + sharedNodeGroup->append(localNodeGroup.get(), 0 /* offsetInNodeGroup */); + if (sharedNodeGroup->isFull()) { + auto nodeGroupIdx = getNextNodeGroupIdxWithoutLock(); + auto nodeTable = ku_dynamic_cast(table); + NodeBatchInsert::writeAndResetNodeGroup( + nodeGroupIdx, indexBuilder, pkColumnIdx, nodeTable, sharedNodeGroup.get()); + } + if (numNodesAppended < localNodeGroup->getNumRows()) { + sharedNodeGroup->append(localNodeGroup.get(), numNodesAppended); + } +} + +void NodeBatchInsert::initGlobalStateInternal(ExecutionContext* context) { + checkIfTableIsEmpty(); + sharedState->logBatchInsertWALRecord(); + auto nodeSharedState = + ku_dynamic_cast(sharedState.get()); + if (nodeSharedState->pkType.getLogicalTypeID() != LogicalTypeID::SERIAL) { + nodeSharedState->initPKIndex(context); + } +} + +void NodeBatchInsert::initLocalStateInternal(ResultSet* resultSet, ExecutionContext*) { + std::shared_ptr state; + auto nodeInfo = ku_dynamic_cast(info.get()); + for (auto& pos : nodeInfo->columnPositions) { + if (pos.isValid()) { + state = resultSet->getValueVector(pos)->state; + } + } + + auto nodeSharedState = + ku_dynamic_cast(sharedState.get()); + localState = std::make_unique(); + auto nodeLocalState = + ku_dynamic_cast(localState.get()); + // NOLINTBEGIN(bugprone-unchecked-optional-access) + if (nodeSharedState->globalIndexBuilder) { + nodeLocalState->localIndexBuilder = nodeSharedState->globalIndexBuilder.value().clone(); + } + // NOLINTEND(bugprone-unchecked-optional-access) + + KU_ASSERT(state != nullptr); + for (auto i = 0u; i < nodeInfo->columnPositions.size(); ++i) { + auto pos = nodeInfo->columnPositions[i]; + if (pos.isValid()) { + nodeLocalState->columnVectors.push_back(resultSet->getValueVector(pos).get()); + } else { + auto columnType = nodeInfo->columnTypes[i].get(); + auto nullVector = std::make_shared(*columnType); + nullVector->setState(state); + nullVector->setAllNull(); + nodeLocalState->nullColumnVectors.push_back(nullVector); + nodeLocalState->columnVectors.push_back(nullVector.get()); + } + } + nodeLocalState->nodeGroup = NodeGroupFactory::createNodeGroup( + ColumnDataFormat::REGULAR, nodeInfo->columnTypes, info->compressionEnabled); + nodeLocalState->columnState = state.get(); +} + +void NodeBatchInsert::executeInternal(ExecutionContext* context) { + std::optional token; + auto nodeLocalState = + ku_dynamic_cast(localState.get()); + if (nodeLocalState->localIndexBuilder) { + token = nodeLocalState->localIndexBuilder->getProducerToken(); + } + + while (children[0]->getNextTuple(context)) { + auto originalSelVector = nodeLocalState->columnState->selVector; + copyToNodeGroup(); + nodeLocalState->columnState->selVector = std::move(originalSelVector); + } + if (nodeLocalState->nodeGroup->getNumRows() > 0) { + auto nodeSharedState = + ku_dynamic_cast( + sharedState.get()); + nodeSharedState->appendIncompleteNodeGroup( + std::move(nodeLocalState->nodeGroup), nodeLocalState->localIndexBuilder); + } + if (nodeLocalState->localIndexBuilder) { + KU_ASSERT(token); + token->quit(); + nodeLocalState->localIndexBuilder->finishedProducing(); + } +} + +void NodeBatchInsert::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx, + std::optional& indexBuilder, column_id_t pkColumnID, NodeTable* table, + NodeGroup* nodeGroup) { + nodeGroup->finalize(nodeGroupIdx); + if (indexBuilder) { + auto nodeOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); + indexBuilder->insert( + nodeGroup->getColumnChunk(pkColumnID), nodeOffset, nodeGroup->getNumRows()); + } + table->append(nodeGroup); + nodeGroup->resetToEmpty(); +} + +void NodeBatchInsertSharedState::calculateNumTuples() { + numRows.store(StorageUtils::getStartOffsetOfNodeGroup(getCurNodeGroupIdx())); + if (sharedNodeGroup) { + numRows += sharedNodeGroup->getNumRows(); + } +} + +void NodeBatchInsert::copyToNodeGroup() { + auto numAppendedTuples = 0ul; + auto nodeLocalState = + ku_dynamic_cast(localState.get()); + auto nodeSharedState = + ku_dynamic_cast(sharedState.get()); + auto nodeTable = ku_dynamic_cast(sharedState->table); + auto numTuplesToAppend = nodeLocalState->columnState->getNumSelectedValues(); + while (numAppendedTuples < numTuplesToAppend) { + auto numAppendedTuplesInNodeGroup = + nodeLocalState->nodeGroup->append(nodeLocalState->columnVectors, + nodeLocalState->columnState, numTuplesToAppend - numAppendedTuples); + numAppendedTuples += numAppendedTuplesInNodeGroup; + if (nodeLocalState->nodeGroup->isFull()) { + node_group_idx_t nodeGroupIdx; + nodeGroupIdx = nodeSharedState->getNextNodeGroupIdx(); + writeAndResetNodeGroup(nodeGroupIdx, nodeLocalState->localIndexBuilder, + nodeSharedState->pkColumnIdx, nodeTable, nodeLocalState->nodeGroup.get()); + } + if (numAppendedTuples < numTuplesToAppend) { + nodeLocalState->columnState->slice((offset_t)numAppendedTuplesInNodeGroup); + } + } +} + +void NodeBatchInsert::finalize(ExecutionContext* context) { + auto nodeSharedState = + ku_dynamic_cast(sharedState.get()); + nodeSharedState->calculateNumTuples(); + nodeSharedState->setNumTuplesForTable(); + if (nodeSharedState->sharedNodeGroup) { + auto nodeGroupIdx = nodeSharedState->getNextNodeGroupIdx(); + auto nodeTable = ku_dynamic_cast(nodeSharedState->table); + NodeBatchInsert::writeAndResetNodeGroup(nodeGroupIdx, nodeSharedState->globalIndexBuilder, + nodeSharedState->pkColumnIdx, nodeTable, nodeSharedState->sharedNodeGroup.get()); + } + if (nodeSharedState->globalIndexBuilder) { + nodeSharedState->globalIndexBuilder->finalize(context); + } + auto outputMsg = stringFormat("{} number of tuples has been copied to table: {}.", + sharedState->getNumRows(), info->tableEntry->getName()); + FactorizedTableUtils::appendStringToTable( + sharedState->fTable.get(), outputMsg, context->clientContext->getMemoryManager()); +} +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/persistent/copy_rel.cpp b/src/processor/operator/persistent/rel_batch_insert.cpp similarity index 59% rename from src/processor/operator/persistent/copy_rel.cpp rename to src/processor/operator/persistent/rel_batch_insert.cpp index 0bdc443015..0b83ab14a9 100644 --- a/src/processor/operator/persistent/copy_rel.cpp +++ b/src/processor/operator/persistent/rel_batch_insert.cpp @@ -1,4 +1,4 @@ -#include "processor/operator/persistent/copy_rel.h" +#include "processor/operator/persistent/rel_batch_insert.h" #include "common/exception/copy.h" #include "common/exception/message.h" @@ -12,72 +12,58 @@ using namespace kuzu::storage; namespace kuzu { namespace processor { -CopyRelSharedState::CopyRelSharedState(table_id_t tableID, RelTable* table, - std::vector> columnTypes, RelsStoreStats* relsStatistics, - MemoryManager* memoryManager) - : tableID{tableID}, table{table}, columnTypes{std::move(columnTypes)}, - relsStatistics{relsStatistics}, numRows{0} { - auto ftTableSchema = std::make_unique(); - ftTableSchema->appendColumn( - std::make_unique(false /* flat */, 0 /* dataChunkPos */, - LogicalTypeUtils::getRowLayoutSize(LogicalType{LogicalTypeID::STRING}))); - fTable = std::make_shared(memoryManager, std::move(ftTableSchema)); +void RelBatchInsert::initGlobalStateInternal(ExecutionContext* /*context*/) { + checkIfTableIsEmpty(); + sharedState->logBatchInsertWALRecord(); } -// NOLINTNEXTLINE(readability-make-member-function-const): Semantically non-const. -void CopyRelSharedState::logCopyRelWALRecord(WAL* wal) { - wal->logCopyTableRecord(tableID, TableType::REL); - wal->flushAllPages(); -} - -void CopyRel::initLocalStateInternal(ResultSet* /*resultSet_*/, ExecutionContext* /*context*/) { - localState = std::make_unique(); +void RelBatchInsert::initLocalStateInternal( + ResultSet* /*resultSet_*/, ExecutionContext* /*context*/) { + localState = std::make_unique(); + auto relInfo = ku_dynamic_cast(info.get()); localState->nodeGroup = NodeGroupFactory::createNodeGroup( - ColumnDataFormat::CSR, sharedState->columnTypes, info->compressionEnabled); -} - -void CopyRel::initGlobalStateInternal(ExecutionContext* /*context*/) { - if (!isCopyAllowed()) { - throw CopyException(ExceptionMessage::notAllowCopyOnNonEmptyTableException()); - } - sharedState->logCopyRelWALRecord(info->wal); + ColumnDataFormat::CSR, relInfo->columnTypes, relInfo->compressionEnabled); } -void CopyRel::executeInternal(ExecutionContext* /*context*/) { +void RelBatchInsert::executeInternal(ExecutionContext* /*context*/) { + auto relInfo = ku_dynamic_cast(info.get()); + auto relTable = ku_dynamic_cast(sharedState->table); + auto relLocalState = + ku_dynamic_cast(localState.get()); while (true) { - localState->currentPartition = - partitionerSharedState->getNextPartition(info->partitioningIdx); - if (localState->currentPartition == INVALID_PARTITION_IDX) { + relLocalState->nodeGroupIdx = + partitionerSharedState->getNextPartition(relInfo->partitioningIdx); + if (relLocalState->nodeGroupIdx == INVALID_PARTITION_IDX) { break; } // Read the whole partition, and set to node group. auto partitioningBuffer = partitionerSharedState->getPartitionBuffer( - info->partitioningIdx, localState->currentPartition); - auto startNodeOffset = - StorageUtils::getStartOffsetOfNodeGroup(localState->currentPartition); - auto offsetVectorIdx = info->dataDirection == RelDataDirection::FWD ? 0 : 1; + relInfo->partitioningIdx, relLocalState->nodeGroupIdx); + auto startNodeOffset = StorageUtils::getStartOffsetOfNodeGroup(relLocalState->nodeGroupIdx); for (auto dataChunk : partitioningBuffer->getChunks()) { setOffsetToWithinNodeGroup( - dataChunk->getValueVector(offsetVectorIdx).get(), startNodeOffset); + dataChunk->getValueVector(relInfo->offsetVectorIdx).get(), startNodeOffset); } // Calculate num of source nodes in this node group. // This will be used to set the num of values of the node group. auto numNodes = std::min(StorageConstants::NODE_GROUP_SIZE, - partitionerSharedState->maxNodeOffsets[info->partitioningIdx] - startNodeOffset + 1); - prepareCSRNodeGroup(partitioningBuffer, startNodeOffset, offsetVectorIdx, numNodes); + partitionerSharedState->maxNodeOffsets[relInfo->partitioningIdx] - startNodeOffset + 1); + prepareCSRNodeGroup( + partitioningBuffer, startNodeOffset, relInfo->offsetVectorIdx, numNodes); for (auto dataChunk : partitioningBuffer->getChunks()) { - localState->nodeGroup->write(dataChunk, offsetVectorIdx); + localState->nodeGroup->write(dataChunk, relInfo->offsetVectorIdx); } - localState->nodeGroup->finalize(localState->currentPartition); + localState->nodeGroup->finalize(relLocalState->nodeGroupIdx); // Flush node group to table. - sharedState->table->append(localState->nodeGroup.get(), info->dataDirection); + relTable->append(localState->nodeGroup.get(), relInfo->direction); sharedState->incrementNumRows(localState->nodeGroup->getNumRows()); localState->nodeGroup->resetToEmpty(); } } -void CopyRel::prepareCSRNodeGroup(DataChunkCollection* partition, common::offset_t startNodeOffset, - vector_idx_t offsetVectorIdx, offset_t numNodes) { +void RelBatchInsert::prepareCSRNodeGroup(DataChunkCollection* partition, + common::offset_t startNodeOffset, vector_idx_t offsetVectorIdx, offset_t numNodes) { + auto relInfo = ku_dynamic_cast(info.get()); auto csrNodeGroup = ku_dynamic_cast(localState->nodeGroup.get()); auto& csrHeader = csrNodeGroup->getCSRHeader(); csrHeader.setNumValues(numNodes); @@ -86,8 +72,8 @@ void CopyRel::prepareCSRNodeGroup(DataChunkCollection* partition, common::offset auto invalid = checkRelMultiplicityConstraint(csrHeader); if (invalid.has_value()) { throw CopyException(ExceptionMessage::violateRelMultiplicityConstraint( - info->relTableEntry->getName(), std::to_string(invalid.value() + startNodeOffset), - RelDataDirectionUtils::relDirectionToString(info->dataDirection))); + info->tableEntry->getName(), std::to_string(invalid.value() + startNodeOffset), + RelDataDirectionUtils::relDirectionToString(relInfo->direction))); } // Resize csr data column chunks. offset_t csrChunkCapacity = @@ -100,14 +86,15 @@ void CopyRel::prepareCSRNodeGroup(DataChunkCollection* partition, common::offset populateEndCSROffsets(csrHeader, gaps); } -void CopyRel::populateEndCSROffsets(CSRHeaderChunks& csrHeader, std::vector& gaps) { +void RelBatchInsert::populateEndCSROffsets( + CSRHeaderChunks& csrHeader, std::vector& gaps) { auto csrOffsets = (offset_t*)csrHeader.offset->getData(); for (auto i = 0u; i < csrHeader.offset->getNumValues(); i++) { csrOffsets[i] += gaps[i]; } } -length_t CopyRel::getGapSize(length_t length) { +length_t RelBatchInsert::getGapSize(length_t length) { // We intentionally leave a gap for empty CSR lists to accommondate for future insertions. // Also, for MANY_ONE and ONE_ONE relationships, we should always keep each CSR list as size 1. return length == 0 ? @@ -116,7 +103,7 @@ length_t CopyRel::getGapSize(length_t length) { length; } -std::vector CopyRel::populateStartCSROffsetsAndLengths(CSRHeaderChunks& csrHeader, +std::vector RelBatchInsert::populateStartCSROffsetsAndLengths(CSRHeaderChunks& csrHeader, offset_t numNodes, DataChunkCollection* partition, vector_idx_t offsetVectorIdx) { KU_ASSERT(numNodes == csrHeader.length->getNumValues() && numNodes == csrHeader.offset->getNumValues()); @@ -147,7 +134,7 @@ std::vector CopyRel::populateStartCSROffsetsAndLengths(CSRHeaderChunks return gaps; } -void CopyRel::setOffsetToWithinNodeGroup(ValueVector* vector, offset_t startOffset) { +void RelBatchInsert::setOffsetToWithinNodeGroup(ValueVector* vector, offset_t startOffset) { KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::INT64 && vector->state->selVector->isUnfiltered()); auto offsets = (offset_t*)vector->getData(); @@ -156,7 +143,7 @@ void CopyRel::setOffsetToWithinNodeGroup(ValueVector* vector, offset_t startOffs } } -void CopyRel::setOffsetFromCSROffsets(ValueVector* offsetVector, ColumnChunk* offsetChunk) { +void RelBatchInsert::setOffsetFromCSROffsets(ValueVector* offsetVector, ColumnChunk* offsetChunk) { KU_ASSERT(offsetVector->dataType.getPhysicalType() == PhysicalTypeID::INT64 && offsetVector->state->selVector->isUnfiltered()); for (auto i = 0u; i < offsetVector->state->selVector->selectedSize; i++) { @@ -167,9 +154,13 @@ void CopyRel::setOffsetFromCSROffsets(ValueVector* offsetVector, ColumnChunk* of } } -std::optional CopyRel::checkRelMultiplicityConstraint( +std::optional RelBatchInsert::checkRelMultiplicityConstraint( const storage::CSRHeaderChunks& csrHeader) { - if (!info->relTableEntry->isSingleMultiplicity(info->dataDirection)) { + auto relInfo = ku_dynamic_cast(info.get()); + auto relTableEntry = + ku_dynamic_cast( + info->tableEntry); + if (!relTableEntry->isSingleMultiplicity(relInfo->direction)) { return std::nullopt; } for (auto i = 0u; i < csrHeader.length->getNumValues(); i++) { @@ -180,17 +171,20 @@ std::optional CopyRel::checkRelMultiplicityConstraint( return std::nullopt; } -void CopyRel::finalize(ExecutionContext* context) { - if (info->partitioningIdx == partitionerSharedState->partitioningBuffers.size() - 1) { - sharedState->updateRelsStatistics(); +void RelBatchInsert::finalize(ExecutionContext* context) { + auto relInfo = ku_dynamic_cast(info.get()); + if (relInfo->direction == RelDataDirection::BWD) { + KU_ASSERT( + relInfo->partitioningIdx == partitionerSharedState->partitioningBuffers.size() - 1); + sharedState->setNumTuplesForTable(); auto outputMsg = stringFormat("{} number of tuples has been copied to table {}.", - sharedState->numRows.load(), info->relTableEntry->getName()); + sharedState->getNumRows(), info->tableEntry->getName()); FactorizedTableUtils::appendStringToTable( sharedState->fTable.get(), outputMsg, context->clientContext->getMemoryManager()); } sharedState->numRows.store(0); partitionerSharedState->resetState(); - partitionerSharedState->partitioningBuffers[info->partitioningIdx].reset(); + partitionerSharedState->partitioningBuffers[relInfo->partitioningIdx].reset(); } } // namespace processor diff --git a/src/processor/operator/physical_operator.cpp b/src/processor/operator/physical_operator.cpp index 89cc930a63..625c095303 100644 --- a/src/processor/operator/physical_operator.cpp +++ b/src/processor/operator/physical_operator.cpp @@ -13,16 +13,14 @@ std::string PhysicalOperatorUtils::operatorTypeToString(PhysicalOperatorType ope return "AGGREGATE"; case PhysicalOperatorType::AGGREGATE_SCAN: return "AGGREGATE_SCAN"; + case PhysicalOperatorType::BATCH_INSERT: + return "BATCH_INSERT"; case PhysicalOperatorType::STANDALONE_CALL: return "STANDALONE_CALL"; case PhysicalOperatorType::COPY_TO: return "COPY_TO"; - case PhysicalOperatorType::COPY_NODE: - return "COPY_NODE"; case PhysicalOperatorType::COPY_RDF: return "COPY_RDF"; - case PhysicalOperatorType::COPY_REL: - return "COPY_REL"; case PhysicalOperatorType::CREATE_MACRO: return "CREATE_MACRO"; case PhysicalOperatorType::READER: diff --git a/src/storage/store/node_group.cpp b/src/storage/store/node_group.cpp index dbc116d1de..d50c64a0d5 100644 --- a/src/storage/store/node_group.cpp +++ b/src/storage/store/node_group.cpp @@ -179,7 +179,7 @@ length_t CSRHeaderChunks::getCSRLength(offset_t nodeOffset) const { CSRNodeGroup::CSRNodeGroup( const std::vector>& columnTypes, bool enableCompression) // By default, initialize all column chunks except for the csrOffsetChunk to empty, as they - // should be resized after csr offset calculation (e.g., during CopyRel). + // should be resized after csr offset calculation (e.g., during RelBatchInsert). : NodeGroup{columnTypes, enableCompression, 0 /* capacity */} { csrHeaderChunks = CSRHeaderChunks(enableCompression); } diff --git a/src/storage/wal/wal.cpp b/src/storage/wal/wal.cpp index df10a2041e..29255101d6 100644 --- a/src/storage/wal/wal.cpp +++ b/src/storage/wal/wal.cpp @@ -92,9 +92,9 @@ void WAL::logOverflowFileNextBytePosRecord(DBFileID dbFileID, uint64_t prevNextB addNewWALRecordNoLock(walRecord); } -void WAL::logCopyTableRecord(table_id_t tableID, TableType tableType) { +void WAL::logCopyTableRecord(table_id_t tableID) { lock_t lck{mtx}; - WALRecord walRecord = WALRecord::newCopyTableRecord(tableID, tableType); + WALRecord walRecord = WALRecord::newCopyTableRecord(tableID); addToUpdatedTables(tableID); addNewWALRecordNoLock(walRecord); } diff --git a/src/storage/wal/wal_record.cpp b/src/storage/wal/wal_record.cpp index 0c5f031edb..54879e7911 100644 --- a/src/storage/wal/wal_record.cpp +++ b/src/storage/wal/wal_record.cpp @@ -194,10 +194,10 @@ WALRecord WALRecord::newOverflowFileNextBytePosRecord( return retVal; } -WALRecord WALRecord::newCopyTableRecord(table_id_t tableID, TableType tableType) { +WALRecord WALRecord::newCopyTableRecord(table_id_t tableID) { WALRecord retVal; retVal.recordType = WALRecordType::COPY_TABLE_RECORD; - retVal.copyTableRecord = CopyTableRecord(tableID, tableType); + retVal.copyTableRecord = CopyTableRecord(tableID); return retVal; } diff --git a/src/storage/wal_replayer.cpp b/src/storage/wal_replayer.cpp index 33346dd95f..011eaa868b 100644 --- a/src/storage/wal_replayer.cpp +++ b/src/storage/wal_replayer.cpp @@ -229,9 +229,10 @@ void WALReplayer::replayCopyTableRecord(const kuzu::storage::WALRecord& walRecor // files have been changed during checkpoint. So the in memory // fileHandles are obsolete and should be reconstructed (e.g. since the numPages // have likely changed they need to reconstruct their page locks). - if (walRecord.copyTableRecord.tableType == TableType::NODE) { - auto nodeTableEntry = ku_dynamic_cast( - catalog->getTableCatalogEntry(&DUMMY_READ_TRANSACTION, tableID)); + auto catalogEntry = catalog->getTableCatalogEntry(&DUMMY_READ_TRANSACTION, tableID); + if (catalogEntry->getType() == CatalogEntryType::NODE_TABLE_ENTRY) { + auto nodeTableEntry = + ku_dynamic_cast(catalogEntry); storageManager->getNodeTable(tableID)->initializePKIndex( nodeTableEntry, false /* readOnly */, vfs); }