diff --git a/src/common/copier_config/copier_config.cpp b/src/common/copier_config/copier_config.cpp index 338a0c4d9f..68ba578bf9 100644 --- a/src/common/copier_config/copier_config.cpp +++ b/src/common/copier_config/copier_config.cpp @@ -11,17 +11,13 @@ namespace common { CopyDescription::CopyDescription( const std::vector& filePaths, CSVReaderConfig csvReaderConfig, FileType fileType) : filePaths{filePaths}, csvReaderConfig{nullptr}, fileType{fileType} { - if (fileType == FileType::CSV) { - this->csvReaderConfig = std::make_unique(csvReaderConfig); - } + this->csvReaderConfig = std::make_unique(csvReaderConfig); } CopyDescription::CopyDescription(const CopyDescription& copyDescription) : filePaths{copyDescription.filePaths}, csvReaderConfig{nullptr}, fileType{copyDescription.fileType} { - if (fileType == FileType::CSV) { - this->csvReaderConfig = std::make_unique(*copyDescription.csvReaderConfig); - } + this->csvReaderConfig = std::make_unique(*copyDescription.csvReaderConfig); } std::string CopyDescription::getFileTypeName(FileType fileType) { diff --git a/src/include/processor/operator/copy/copy_node.h b/src/include/processor/operator/copy/copy_node.h index eccc93e944..bb3f14ab68 100644 --- a/src/include/processor/operator/copy/copy_node.h +++ b/src/include/processor/operator/copy/copy_node.h @@ -50,7 +50,7 @@ class CopyNode : public Sink { id, paramsString}, sharedState{std::move(sharedState)}, copyNodeDataInfo{std::move(copyNodeDataInfo)}, copyDesc{copyDesc}, table{table}, relsStore{relsStore}, catalog{catalog}, wal{wal}, - rowIdxVector{nullptr} { + rowIdxVector{nullptr}, filePathVector{nullptr} { auto tableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(table->getTableID()); copyStates.resize(tableSchema->getNumProperties()); for (auto i = 0u; i < tableSchema->getNumProperties(); i++) { diff --git a/src/include/processor/operator/copy/read_csv.h b/src/include/processor/operator/copy/read_csv.h index 9a28e78971..c927cea572 100644 --- a/src/include/processor/operator/copy/read_csv.h +++ b/src/include/processor/operator/copy/read_csv.h @@ -5,46 +5,18 @@ namespace kuzu { namespace processor { -// For CSV file, we need to read in streaming mode, so we need to read one batch at a time. -class ReadCSVMorsel : public ReadFileMorsel { -public: - ReadCSVMorsel(common::row_idx_t rowIdx, std::string filePath, common::row_idx_t rowIdxInFile, - std::shared_ptr recordBatch) - : ReadFileMorsel{rowIdx, common::INVALID_BLOCK_IDX, common::INVALID_ROW_IDX, - std::move(filePath), rowIdxInFile}, - recordBatch{std::move(recordBatch)} {} - - std::shared_ptr recordBatch; -}; - -class ReadCSVSharedState : public ReadFileSharedState { -public: - ReadCSVSharedState(common::CSVReaderConfig csvReaderConfig, std::vector filePaths, - catalog::TableSchema* tableSchema) - : ReadFileSharedState{std::move(filePaths), tableSchema}, csvReaderConfig{csvReaderConfig} { - } - -private: - void countNumRows() override; - - std::unique_ptr getMorsel() override; - -private: - common::CSVReaderConfig csvReaderConfig; - std::shared_ptr reader; -}; - class ReadCSV : public ReadFile { public: ReadCSV(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos, - std::vector arrowColumnPoses, std::shared_ptr sharedState, - uint32_t id, const std::string& paramsString) + std::vector arrowColumnPoses, + std::shared_ptr sharedState, uint32_t id, + const std::string& paramsString) : ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(arrowColumnPoses), std::move(sharedState), PhysicalOperatorType::READ_CSV, id, paramsString} {} inline std::shared_ptr readTuples( - std::unique_ptr morsel) override { - auto csvMorsel = reinterpret_cast(morsel.get()); + std::unique_ptr morsel) override { + auto csvMorsel = reinterpret_cast(morsel.get()); return csvMorsel->recordBatch; } diff --git a/src/include/processor/operator/copy/read_file.h b/src/include/processor/operator/copy/read_file.h index 4535211cf7..508bf87bb0 100644 --- a/src/include/processor/operator/copy/read_file.h +++ b/src/include/processor/operator/copy/read_file.h @@ -1,61 +1,16 @@ #pragma once #include "processor/operator/physical_operator.h" -#include "storage/copier/table_copy_utils.h" +#include "storage/copier/read_file_state.h" namespace kuzu { namespace processor { -class ReadFileMorsel { -public: - ReadFileMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx, - common::row_idx_t numRows, std::string filePath, common::row_idx_t rowIdxInFile) - : rowIdx{rowIdx}, blockIdx{blockIdx}, numRows{numRows}, filePath{std::move(filePath)}, - rowIdxInFile{rowIdxInFile} {}; - - virtual ~ReadFileMorsel() = default; - -public: - // When reading from multiple files, rowIdx is accumulated. - common::row_idx_t rowIdx; - common::block_idx_t blockIdx; - common::row_idx_t numRows; - std::string filePath; - // Row idx in the current file. Equal to `rowIdx` when reading from only a single file. - common::row_idx_t rowIdxInFile; -}; - -class ReadFileSharedState { -public: - explicit ReadFileSharedState( - std::vector filePaths, catalog::TableSchema* tableSchema) - : currRowIdx{0}, curBlockIdx{0}, filePaths{std::move(filePaths)}, curFileIdx{0}, - tableSchema{tableSchema}, numRows{0}, currRowIdxInCurrFile{1} {} - - virtual ~ReadFileSharedState() = default; - - virtual void countNumRows() = 0; - - virtual std::unique_ptr getMorsel() = 0; - -public: - common::row_idx_t numRows; - catalog::TableSchema* tableSchema; - -protected: - std::mutex mtx; - common::row_idx_t currRowIdx; - std::unordered_map fileBlockInfos; - common::block_idx_t curBlockIdx; - std::vector filePaths; - common::vector_idx_t curFileIdx; - common::row_idx_t currRowIdxInCurrFile; -}; - class ReadFile : public PhysicalOperator { public: ReadFile(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos, - std::vector arrowColumnPoses, std::shared_ptr sharedState, + std::vector arrowColumnPoses, + std::shared_ptr sharedState, PhysicalOperatorType operatorType, uint32_t id, const std::string& paramsString) : PhysicalOperator{operatorType, id, paramsString}, rowIdxVectorPos{rowIdxVectorPos}, filePathVectorPos{filePathVectorPos}, arrowColumnPoses{std::move(arrowColumnPoses)}, @@ -70,12 +25,12 @@ class ReadFile : public PhysicalOperator { inline bool isSource() const override { return true; } virtual std::shared_ptr readTuples( - std::unique_ptr morsel) = 0; + std::unique_ptr morsel) = 0; bool getNextTuplesInternal(ExecutionContext* context) override; protected: - std::shared_ptr sharedState; + std::shared_ptr sharedState; DataPos rowIdxVectorPos; DataPos filePathVectorPos; std::vector arrowColumnPoses; diff --git a/src/include/processor/operator/copy/read_npy.h b/src/include/processor/operator/copy/read_npy.h index b32682ab7e..aaf6b3e135 100644 --- a/src/include/processor/operator/copy/read_npy.h +++ b/src/include/processor/operator/copy/read_npy.h @@ -7,41 +7,18 @@ namespace kuzu { namespace processor { -class ReadNPYMorsel : public ReadFileMorsel { -public: - ReadNPYMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx, common::row_idx_t numRows, - std::string filePath, common::vector_idx_t curFileIdx, common::row_idx_t rowIdxInFile) - : ReadFileMorsel{rowIdx, blockIdx, numRows, std::move(filePath), rowIdxInFile}, - columnIdx{curFileIdx} {} - - inline common::vector_idx_t getColumnIdx() const { return columnIdx; } - -private: - common::vector_idx_t columnIdx; -}; - -class ReadNPYSharedState : public ReadFileSharedState { -public: - ReadNPYSharedState(catalog::NodeTableSchema* tableSchema, std::vector filePaths) - : ReadFileSharedState{std::move(filePaths), tableSchema} {} - - std::unique_ptr getMorsel() final; - -private: - void countNumRows() final; -}; - class ReadNPY : public ReadFile { public: ReadNPY(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos, std::vector arrowColumnPoses, const DataPos& columnIdxPos, - std::shared_ptr sharedState, uint32_t id, + std::shared_ptr sharedState, uint32_t id, const std::string& paramsString) : ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(arrowColumnPoses), std::move(sharedState), PhysicalOperatorType::READ_NPY, id, paramsString}, columnIdxPos{columnIdxPos}, columnIdxVector{nullptr} {} - std::shared_ptr readTuples(std::unique_ptr morsel) final; + std::shared_ptr readTuples( + std::unique_ptr morsel) final; bool getNextTuplesInternal(ExecutionContext* context) final; diff --git a/src/include/processor/operator/copy/read_parquet.h b/src/include/processor/operator/copy/read_parquet.h index c58eb0f7f3..4afa72fae6 100644 --- a/src/include/processor/operator/copy/read_parquet.h +++ b/src/include/processor/operator/copy/read_parquet.h @@ -5,27 +5,17 @@ namespace kuzu { namespace processor { -class ReadParquetSharedState : public ReadFileSharedState { -public: - explicit ReadParquetSharedState( - std::vector filePaths, catalog::TableSchema* tableSchema) - : ReadFileSharedState{std::move(filePaths), tableSchema} {} - -private: - void countNumRows() override; - - std::unique_ptr getMorsel() override; -}; - class ReadParquet : public ReadFile { public: ReadParquet(const DataPos& offsetVectorPos, const DataPos& filePathVectorPos, - std::vector arrowColumnPoses, std::shared_ptr sharedState, - uint32_t id, const std::string& paramsString) + std::vector arrowColumnPoses, + std::shared_ptr sharedState, uint32_t id, + const std::string& paramsString) : ReadFile{offsetVectorPos, filePathVectorPos, std::move(arrowColumnPoses), std::move(sharedState), PhysicalOperatorType::READ_PARQUET, id, paramsString} {} - std::shared_ptr readTuples(std::unique_ptr morsel) override; + std::shared_ptr readTuples( + std::unique_ptr morsel) override; inline std::unique_ptr clone() override { return std::make_unique( diff --git a/src/include/storage/copier/read_file_state.h b/src/include/storage/copier/read_file_state.h new file mode 100644 index 0000000000..b8e1d031ce --- /dev/null +++ b/src/include/storage/copier/read_file_state.h @@ -0,0 +1,116 @@ +#pragma once + +#include "storage/copier/table_copy_utils.h" + +namespace kuzu { +namespace storage { + +class ReadFileMorsel { +public: + ReadFileMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx, + common::row_idx_t numRows, std::string filePath, common::row_idx_t rowIdxInFile) + : rowIdx{rowIdx}, blockIdx{blockIdx}, numRows{numRows}, filePath{std::move(filePath)}, + rowIdxInFile{rowIdxInFile} {}; + virtual ~ReadFileMorsel() = default; + +public: + // When reading from multiple files, rowIdx is accumulated. + common::row_idx_t rowIdx; + common::block_idx_t blockIdx; + common::row_idx_t numRows; + std::string filePath; + // Row idx in the current file. Equal to `rowIdx` when reading from only a single file. + common::row_idx_t rowIdxInFile; +}; + +// For CSV file, we need to read in streaming mode, so we need to read one batch at a time. +class ReadCSVMorsel : public ReadFileMorsel { +public: + ReadCSVMorsel(common::offset_t startRowIdx, std::string filePath, + common::row_idx_t rowIdxInFile, std::shared_ptr recordBatch) + : ReadFileMorsel{startRowIdx, common::INVALID_BLOCK_IDX, common::INVALID_ROW_IDX, + std::move(filePath), rowIdxInFile}, + recordBatch{std::move(recordBatch)} {} + + std::shared_ptr recordBatch; +}; + +class ReadNPYMorsel : public ReadFileMorsel { +public: + ReadNPYMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx, common::row_idx_t numRows, + std::string filePath, common::vector_idx_t curFileIdx, common::row_idx_t rowIdxInFile) + : ReadFileMorsel{rowIdx, blockIdx, numRows, std::move(filePath), rowIdxInFile}, + columnIdx{curFileIdx} {} + + inline common::vector_idx_t getColumnIdx() const { return columnIdx; } + +private: + common::vector_idx_t columnIdx; +}; + +class ReadFileSharedState { +public: + ReadFileSharedState(std::vector filePaths, common::CSVReaderConfig csvReaderConfig, + catalog::TableSchema* tableSchema) + : numRows{0}, tableSchema{tableSchema}, filePaths{std::move(filePaths)}, + csvReaderConfig{csvReaderConfig}, currRowIdx{0}, currBlockIdx{0}, currFileIdx{0}, + currRowIdxInCurrFile{1} {}; + virtual ~ReadFileSharedState() = default; + + virtual void countNumRows() = 0; + virtual std::unique_ptr getMorsel() = 0; + +public: + std::mutex mtx; + common::row_idx_t numRows; + catalog::TableSchema* tableSchema; + std::vector filePaths; + common::CSVReaderConfig csvReaderConfig; + std::unordered_map fileBlockInfos; + common::row_idx_t currRowIdx; + common::block_idx_t currBlockIdx; + common::vector_idx_t currFileIdx; + common::row_idx_t currRowIdxInCurrFile; +}; + +// For CSV file, we need to read in streaming mode, so we need to keep the reader in the shared +// state. +class ReadCSVSharedState : public ReadFileSharedState { +public: + ReadCSVSharedState(std::vector filePaths, common::CSVReaderConfig csvReaderConfig, + catalog::TableSchema* tableSchema) + : ReadFileSharedState{std::move(filePaths), csvReaderConfig, tableSchema} {}; + + void countNumRows() final; + std::unique_ptr getMorsel() final; + +private: + std::shared_ptr reader; +}; + +class ReadParquetSharedState : public ReadFileSharedState { +public: + explicit ReadParquetSharedState(std::vector filePaths, + common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema) + : ReadFileSharedState{std::move(filePaths), csvReaderConfig, tableSchema} {} + +private: + void countNumRows() override; + + std::unique_ptr getMorsel() override; +}; + +class ReadNPYSharedState : public ReadFileSharedState { +public: + ReadNPYSharedState(std::vector filePaths, common::CSVReaderConfig csvReaderConfig, + catalog::NodeTableSchema* tableSchema) + : ReadFileSharedState{std::move(filePaths), csvReaderConfig, tableSchema} {} + + std::unique_ptr getMorsel() final; + +private: + void countNumRows() final; +}; + +} // namespace storage +} // namespace kuzu diff --git a/src/include/storage/copier/rel_copier.h b/src/include/storage/copier/rel_copier.h index 9ad2056a00..d07946708b 100644 --- a/src/include/storage/copier/rel_copier.h +++ b/src/include/storage/copier/rel_copier.h @@ -1,5 +1,6 @@ #pragma once +#include "storage/copier/read_file_state.h" #include "storage/in_mem_storage_structure/in_mem_column.h" #include "storage/in_mem_storage_structure/in_mem_lists.h" #include "storage/index/hash_index.h" @@ -13,11 +14,13 @@ class DirectedInMemRelData; class RelCopier { public: - RelCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, - catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, - DirectedInMemRelData* bwdRelData, std::vector pkIndexes) + RelCopier(std::shared_ptr sharedState, + const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, + DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, + std::vector pkIndexes) : sharedState{std::move(sharedState)}, copyDesc{copyDesc}, schema{schema}, - fwdRelData{fwdRelData}, bwdRelData{bwdRelData}, pkIndexes{std::move(pkIndexes)} { + fwdRelData{fwdRelData}, bwdRelData{bwdRelData}, numRows{0}, pkIndexes{ + std::move(pkIndexes)} { fwdCopyStates.resize(schema->getNumProperties()); for (auto i = 0u; i < schema->getNumProperties(); i++) { fwdCopyStates[i] = std::make_unique(schema->properties[i].dataType); @@ -35,10 +38,10 @@ class RelCopier { virtual std::unique_ptr clone() const = 0; virtual void finalize() = 0; - inline std::shared_ptr getSharedState() const { return sharedState; } + inline std::shared_ptr getSharedState() const { return sharedState; } protected: - virtual void executeInternal(std::unique_ptr morsel) { + virtual void executeInternal(std::unique_ptr morsel) { throw common::CopyException("RelCopier::executeInternal not implemented"); } @@ -65,11 +68,12 @@ class RelCopier { const std::shared_ptr& type, const uint8_t* data, uint64_t length); protected: - std::shared_ptr sharedState; + std::shared_ptr sharedState; common::CopyDescription copyDesc; catalog::RelTableSchema* schema; DirectedInMemRelData* fwdRelData; DirectedInMemRelData* bwdRelData; + common::row_idx_t numRows; std::vector pkIndexes; std::vector> fwdCopyStates; std::vector> bwdCopyStates; @@ -77,7 +81,7 @@ class RelCopier { class RelListsCounterAndColumnCopier : public RelCopier { protected: - RelListsCounterAndColumnCopier(std::shared_ptr sharedState, + RelListsCounterAndColumnCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, std::vector pkIndexes) @@ -95,7 +99,7 @@ class RelListsCounterAndColumnCopier : public RelCopier { class ParquetRelListsCounterAndColumnsCopier : public RelListsCounterAndColumnCopier { public: - ParquetRelListsCounterAndColumnsCopier(std::shared_ptr sharedState, + ParquetRelListsCounterAndColumnsCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, std::vector pkIndexes) @@ -108,7 +112,7 @@ class ParquetRelListsCounterAndColumnsCopier : public RelListsCounterAndColumnCo } private: - void executeInternal(std::unique_ptr morsel) final; + void executeInternal(std::unique_ptr morsel) final; private: std::unique_ptr reader; @@ -117,7 +121,7 @@ class ParquetRelListsCounterAndColumnsCopier : public RelListsCounterAndColumnCo class CSVRelListsCounterAndColumnsCopier : public RelListsCounterAndColumnCopier { public: - CSVRelListsCounterAndColumnsCopier(std::shared_ptr sharedState, + CSVRelListsCounterAndColumnsCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, std::vector pkIndexes) @@ -130,12 +134,12 @@ class CSVRelListsCounterAndColumnsCopier : public RelListsCounterAndColumnCopier } private: - void executeInternal(std::unique_ptr morsel) final; + void executeInternal(std::unique_ptr morsel) final; }; class RelListsCopier : public RelCopier { protected: - RelListsCopier(std::shared_ptr sharedState, + RelListsCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, std::vector pkIndexes) @@ -148,7 +152,7 @@ class RelListsCopier : public RelCopier { class ParquetRelListsCopier : public RelListsCopier { public: - ParquetRelListsCopier(std::shared_ptr sharedState, + ParquetRelListsCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, std::vector pkIndexes) @@ -161,7 +165,7 @@ class ParquetRelListsCopier : public RelListsCopier { } private: - void executeInternal(std::unique_ptr morsel) final; + void executeInternal(std::unique_ptr morsel) final; private: std::unique_ptr reader; @@ -170,7 +174,7 @@ class ParquetRelListsCopier : public RelListsCopier { class CSVRelListsCopier : public RelListsCopier { public: - CSVRelListsCopier(std::shared_ptr sharedState, + CSVRelListsCopier(std::shared_ptr sharedState, const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData, std::vector pkIndexes) @@ -183,7 +187,7 @@ class CSVRelListsCopier : public RelListsCopier { } private: - void executeInternal(std::unique_ptr morsel) final; + void executeInternal(std::unique_ptr morsel) final; }; class RelCopyTask : public common::Task { diff --git a/src/include/storage/copier/rel_copy_executor.h b/src/include/storage/copier/rel_copy_executor.h index 461ba5e815..c4a1deabee 100644 --- a/src/include/storage/copier/rel_copy_executor.h +++ b/src/include/storage/copier/rel_copy_executor.h @@ -52,8 +52,9 @@ class RelCopyExecutor { std::unique_ptr initializeDirectedInMemRelData( common::RelDataDirection direction); - void countRelListsSizeAndPopulateColumns(processor::ExecutionContext* executionContext); - void populateRelLists(processor::ExecutionContext* executionContext); + common::row_idx_t countRelListsSizeAndPopulateColumns( + processor::ExecutionContext* executionContext); + common::row_idx_t populateRelLists(processor::ExecutionContext* executionContext); std::unique_ptr createRelCopier(RelCopierType relCopierType); @@ -61,10 +62,8 @@ class RelCopyExecutor { common::CopyDescription& copyDescription; WAL* wal; std::string outputDirectory; - std::unordered_map fileBlockInfos; common::TaskScheduler& taskScheduler; catalog::RelTableSchema* tableSchema; - common::row_idx_t numRows; RelsStatistics* relsStatistics; storage::NodesStore& nodesStore; storage::RelTable* table; diff --git a/src/include/storage/copier/table_copy_utils.h b/src/include/storage/copier/table_copy_utils.h index f1fa7c3914..a439941148 100644 --- a/src/include/storage/copier/table_copy_utils.h +++ b/src/include/storage/copier/table_copy_utils.h @@ -21,83 +21,12 @@ namespace kuzu { namespace storage { struct FileBlockInfo { - FileBlockInfo( - common::offset_t startRowIdx, uint64_t numBlocks, std::vector numRowsPerBlock) - : startRowIdx{startRowIdx}, numBlocks{numBlocks}, numRowsPerBlock{ - std::move(numRowsPerBlock)} {} - common::offset_t startRowIdx; + FileBlockInfo(uint64_t numBlocks, std::vector numRowsPerBlock) + : numBlocks{numBlocks}, numRowsPerBlock{std::move(numRowsPerBlock)} {} uint64_t numBlocks; std::vector numRowsPerBlock; }; -class CopyMorsel { -public: - CopyMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx, common::row_idx_t numRows, - std::string filePath, common::row_idx_t rowIdxInFile) - : rowIdx{rowIdx}, blockIdx{blockIdx}, numRows{numRows}, filePath{std::move(filePath)}, - rowIdxInFile{rowIdxInFile} {}; - virtual ~CopyMorsel() = default; - -public: - common::row_idx_t rowIdx; - common::block_idx_t blockIdx; - common::row_idx_t numRows; - std::string filePath; - common::row_idx_t rowIdxInFile; -}; - -// For CSV file, we need to read in streaming mode, so we need to read one batch at a time. -class CSVCopyMorsel : public CopyMorsel { -public: - CSVCopyMorsel(common::offset_t startRowIdx, std::string filePath, - common::row_idx_t rowIdxInFile, std::shared_ptr recordBatch) - : CopyMorsel{startRowIdx, common::INVALID_BLOCK_IDX, common::INVALID_ROW_IDX, - std::move(filePath), rowIdxInFile}, - recordBatch{std::move(recordBatch)} {} - - std::shared_ptr recordBatch; -}; - -class CopySharedState { -public: - CopySharedState(std::vector filePaths, - std::unordered_map fileBlockInfos) - : filePaths{std::move(filePaths)}, fileIdx{0}, blockIdx{0}, numRows{0}, - fileBlockInfos{std::move(fileBlockInfos)}, currRowIdxInFile{1} {}; - virtual ~CopySharedState() = default; - - virtual std::unique_ptr getMorsel(); - -public: - std::vector filePaths; - common::field_idx_t fileIdx; - common::row_idx_t numRows; - common::row_idx_t currRowIdxInFile; - -protected: - std::mutex mtx; - std::unordered_map fileBlockInfos; - common::block_idx_t blockIdx; -}; - -// For CSV file, we need to read in streaming mode, so we need to keep the reader in the shared -// state. -class CSVCopySharedState : public CopySharedState { -public: - CSVCopySharedState(std::vector filePaths, - std::unordered_map fileBlockInfos, - common::CSVReaderConfig* csvReaderConfig, catalog::TableSchema* tableSchema) - : CopySharedState{std::move(filePaths), std::move(fileBlockInfos)}, - csvReaderConfig{csvReaderConfig}, tableSchema{tableSchema} {}; - - std::unique_ptr getMorsel() final; - -private: - common::CSVReaderConfig* csvReaderConfig; - catalog::TableSchema* tableSchema; - std::shared_ptr reader; -}; - class TableCopyUtils { public: static void throwCopyExceptionIfNotOK(const arrow::Status& status); diff --git a/src/processor/mapper/map_copy.cpp b/src/processor/mapper/map_copy.cpp index bbe6978caf..7c3243af67 100644 --- a/src/processor/mapper/map_copy.cpp +++ b/src/processor/mapper/map_copy.cpp @@ -10,6 +10,7 @@ #include "processor/operator/table_scan/factorized_table_scan.h" using namespace kuzu::planner; +using namespace kuzu::storage; namespace kuzu { namespace processor { @@ -48,21 +49,23 @@ std::unique_ptr PlanMapper::mapLogicalCopyNodeToPhysical(Logic switch (copy->getCopyDescription().fileType) { case (common::CopyDescription::FileType::CSV): { readFileSharedState = - std::make_shared(*copy->getCopyDescription().csvReaderConfig, - copy->getCopyDescription().filePaths, nodeTableSchema); + std::make_shared(copy->getCopyDescription().filePaths, + *copy->getCopyDescription().csvReaderConfig, nodeTableSchema); readFile = std::make_unique(rowIdxVectorPos, filePathVectorPos, arrowColumnPoses, readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting()); } break; case (common::CopyDescription::FileType::PARQUET): { - readFileSharedState = std::make_shared( - copy->getCopyDescription().filePaths, nodeTableSchema); + readFileSharedState = + std::make_shared(copy->getCopyDescription().filePaths, + *copy->getCopyDescription().csvReaderConfig, nodeTableSchema); readFile = std::make_unique(rowIdxVectorPos, filePathVectorPos, arrowColumnPoses, readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting()); } break; case (common::CopyDescription::FileType::NPY): { - readFileSharedState = std::make_shared( - nodeTableSchema, copy->getCopyDescription().filePaths); + readFileSharedState = + std::make_shared(copy->getCopyDescription().filePaths, + *copy->getCopyDescription().csvReaderConfig, nodeTableSchema); readFile = std::make_unique(rowIdxVectorPos, filePathVectorPos, arrowColumnPoses, columnIdxPos, readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting()); } break; diff --git a/src/processor/operator/copy/CMakeLists.txt b/src/processor/operator/copy/CMakeLists.txt index 50538414a8..a4b2fd055f 100644 --- a/src/processor/operator/copy/CMakeLists.txt +++ b/src/processor/operator/copy/CMakeLists.txt @@ -5,7 +5,6 @@ add_library(kuzu_processor_operator_copy copy_node.cpp copy_npy_node.cpp read_file.cpp - read_csv.cpp read_parquet.cpp read_npy.cpp) diff --git a/src/processor/operator/copy/read_csv.cpp b/src/processor/operator/copy/read_csv.cpp deleted file mode 100644 index 227b037a34..0000000000 --- a/src/processor/operator/copy/read_csv.cpp +++ /dev/null @@ -1,61 +0,0 @@ -#include "processor/operator/copy/read_csv.h" - -namespace kuzu { -namespace processor { - -void ReadCSVSharedState::countNumRows() { - for (auto& filePath : filePaths) { - auto csvStreamingReader = - storage::TableCopyUtils::createCSVReader(filePath, &csvReaderConfig, tableSchema); - std::shared_ptr currBatch; - uint64_t numBlocks = 0; - std::vector numRowsPerBlock; - auto startRowIdx = numRows; - while (true) { - storage::TableCopyUtils::throwCopyExceptionIfNotOK( - csvStreamingReader->ReadNext(&currBatch)); - if (currBatch == nullptr) { - break; - } - ++numBlocks; - auto currNumRows = currBatch->num_rows(); - numRowsPerBlock.push_back(currNumRows); - numRows += currNumRows; - } - fileBlockInfos.emplace( - filePath, storage::FileBlockInfo{startRowIdx, numBlocks, numRowsPerBlock}); - } -} - -std::unique_ptr ReadCSVSharedState::getMorsel() { - std::unique_lock lck{mtx}; - while (true) { - if (curFileIdx >= filePaths.size()) { - // No more files to read. - return nullptr; - } - auto filePath = filePaths[curFileIdx]; - if (!reader) { - reader = - storage::TableCopyUtils::createCSVReader(filePath, &csvReaderConfig, tableSchema); - } - std::shared_ptr recordBatch; - storage::TableCopyUtils::throwCopyExceptionIfNotOK(reader->ReadNext(&recordBatch)); - if (recordBatch == nullptr) { - // No more blocks to read in this file. - curFileIdx++; - currRowIdxInCurrFile = 1; - reader.reset(); - continue; - } - auto numRowsInBatch = recordBatch->num_rows(); - auto result = std::make_unique( - currRowIdx, filePath, currRowIdxInCurrFile, std::move(recordBatch)); - currRowIdx += numRowsInBatch; - currRowIdxInCurrFile += numRowsInBatch; - return result; - } -} - -} // namespace processor -} // namespace kuzu diff --git a/src/processor/operator/copy/read_npy.cpp b/src/processor/operator/copy/read_npy.cpp index 05380a0bb8..33e6f4dc6b 100644 --- a/src/processor/operator/copy/read_npy.cpp +++ b/src/processor/operator/copy/read_npy.cpp @@ -3,56 +3,11 @@ #include "common/constants.h" #include "storage/in_mem_storage_structure/in_mem_column_chunk.h" +using namespace kuzu::storage; + namespace kuzu { namespace processor { -void ReadNPYSharedState::countNumRows() { - uint8_t idx = 0; - uint64_t firstFileRows; - for (auto& filePath : filePaths) { - std::unique_ptr reader = make_unique(filePath); - numRows = reader->getNumRows(); - if (idx == 0) { - firstFileRows = numRows; - } - auto tableType = tableSchema->getProperty(idx).dataType; - reader->validate(tableType, firstFileRows, tableSchema->tableName); - auto numBlocks = (uint64_t)((numRows + common::CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY) / - common::CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY); - std::vector numRowsPerBlock( - numBlocks, common::CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY); - fileBlockInfos.emplace(filePath, storage::FileBlockInfo{0, numBlocks, numRowsPerBlock}); - idx++; - } -} - -std::unique_ptr ReadNPYSharedState::getMorsel() { - std::unique_lock lck{mtx}; - while (true) { - if (curFileIdx >= filePaths.size()) { - // No more files to read. - return nullptr; - } - auto filePath = filePaths[curFileIdx]; - auto fileBlockInfo = fileBlockInfos.at(filePath); - if (curBlockIdx >= fileBlockInfo.numBlocks) { - // No more blocks to read in this file. - curFileIdx++; - currRowIdxInCurrFile = 1; - curBlockIdx = 0; - currRowIdx = 0; - continue; - } - auto numRowsInBlock = fileBlockInfo.numRowsPerBlock[curBlockIdx]; - auto result = std::make_unique( - currRowIdx, curBlockIdx, numRowsInBlock, filePath, curFileIdx, currRowIdxInCurrFile); - currRowIdx += numRowsInBlock; - currRowIdxInCurrFile += numRowsInBlock; - curBlockIdx++; - return result; - } -} - std::shared_ptr ReadNPY::readTuples(std::unique_ptr morsel) { assert(!morsel->filePath.empty()); if (!reader || reader->getFilePath() != morsel->filePath) { diff --git a/src/processor/operator/copy/read_parquet.cpp b/src/processor/operator/copy/read_parquet.cpp index 041ea4e9a3..d57cf15552 100644 --- a/src/processor/operator/copy/read_parquet.cpp +++ b/src/processor/operator/copy/read_parquet.cpp @@ -1,51 +1,10 @@ #include "processor/operator/copy/read_parquet.h" +using namespace kuzu::storage; + namespace kuzu { namespace processor { -void ReadParquetSharedState::countNumRows() { - for (auto& filePath : filePaths) { - std::unique_ptr reader = - storage::TableCopyUtils::createParquetReader(filePath, tableSchema); - auto metadata = reader->parquet_reader()->metadata(); - uint64_t numBlocks = metadata->num_row_groups(); - std::vector numLinesPerBlock(numBlocks); - auto startRowIdx = numRows; - for (auto blockIdx = 0; blockIdx < numBlocks; ++blockIdx) { - numLinesPerBlock[blockIdx] = metadata->RowGroup(blockIdx)->num_rows(); - } - fileBlockInfos.emplace( - filePath, storage::FileBlockInfo{startRowIdx, numBlocks, numLinesPerBlock}); - numRows += metadata->num_rows(); - } -} - -std::unique_ptr ReadParquetSharedState::getMorsel() { - std::unique_lock lck{mtx}; - while (true) { - if (curFileIdx >= filePaths.size()) { - // No more files to read. - return nullptr; - } - auto filePath = filePaths[curFileIdx]; - auto fileBlockInfo = fileBlockInfos.at(filePath); - if (curBlockIdx >= fileBlockInfo.numBlocks) { - // No more blocks to read in this file. - curFileIdx++; - curBlockIdx = 0; - currRowIdxInCurrFile = 1; - continue; - } - auto numRowsInBlock = fileBlockInfo.numRowsPerBlock[curBlockIdx]; - auto result = std::make_unique( - currRowIdx, curBlockIdx, numRowsInBlock, filePath, currRowIdxInCurrFile); - currRowIdx += numRowsInBlock; - currRowIdxInCurrFile += numRowsInBlock; - curBlockIdx++; - return result; - } -} - std::shared_ptr ReadParquet::readTuples( std::unique_ptr morsel) { assert(!morsel->filePath.empty()); diff --git a/src/storage/copier/CMakeLists.txt b/src/storage/copier/CMakeLists.txt index e74f0ba805..3c52bd0e01 100644 --- a/src/storage/copier/CMakeLists.txt +++ b/src/storage/copier/CMakeLists.txt @@ -1,6 +1,7 @@ add_library(kuzu_storage_in_mem_csv_copier OBJECT npy_reader.cpp + read_file_state.cpp rel_copier.cpp rel_copy_executor.cpp table_copy_utils.cpp) diff --git a/src/storage/copier/read_file_state.cpp b/src/storage/copier/read_file_state.cpp new file mode 100644 index 0000000000..1fcc4ad40a --- /dev/null +++ b/src/storage/copier/read_file_state.cpp @@ -0,0 +1,153 @@ +#include "storage/copier/read_file_state.h" + +#include "storage/copier/npy_reader.h" + +using namespace kuzu::common; + +namespace kuzu { +namespace storage { + +void ReadCSVSharedState::countNumRows() { + for (auto& filePath : filePaths) { + auto csvStreamingReader = + TableCopyUtils::createCSVReader(filePath, &csvReaderConfig, tableSchema); + std::shared_ptr currBatch; + uint64_t numBlocks = 0; + std::vector numRowsPerBlock; + while (true) { + TableCopyUtils::throwCopyExceptionIfNotOK(csvStreamingReader->ReadNext(&currBatch)); + if (currBatch == nullptr) { + break; + } + ++numBlocks; + auto currNumRows = currBatch->num_rows(); + numRowsPerBlock.push_back(currNumRows); + numRows += currNumRows; + } + fileBlockInfos.emplace(filePath, FileBlockInfo{numBlocks, numRowsPerBlock}); + } +} + +// TODO(Guodong): Refactor duplicated between the three getMorsel() functions. +std::unique_ptr ReadCSVSharedState::getMorsel() { + std::unique_lock lck{mtx}; + while (true) { + if (currFileIdx >= filePaths.size()) { + // No more files to read. + return nullptr; + } + auto filePath = filePaths[currFileIdx]; + if (!reader) { + reader = TableCopyUtils::createCSVReader(filePath, &csvReaderConfig, tableSchema); + } + std::shared_ptr recordBatch; + TableCopyUtils::throwCopyExceptionIfNotOK(reader->ReadNext(&recordBatch)); + if (recordBatch == nullptr) { + // No more blocks to read in this file. + currFileIdx++; + currBlockIdx = 0; + currRowIdxInCurrFile = 1; + reader.reset(); + continue; + } + auto numRowsInBatch = recordBatch->num_rows(); + auto result = std::make_unique( + currRowIdx, filePath, currRowIdxInCurrFile, std::move(recordBatch)); + currRowIdx += numRowsInBatch; + currRowIdxInCurrFile += numRowsInBatch; + currBlockIdx++; + return result; + } +} + +void ReadParquetSharedState::countNumRows() { + for (auto& filePath : filePaths) { + std::unique_ptr reader = + TableCopyUtils::createParquetReader(filePath, tableSchema); + auto metadata = reader->parquet_reader()->metadata(); + uint64_t numBlocks = metadata->num_row_groups(); + std::vector numLinesPerBlock(numBlocks); + for (auto blockIdx = 0; blockIdx < numBlocks; ++blockIdx) { + numLinesPerBlock[blockIdx] = metadata->RowGroup(blockIdx)->num_rows(); + } + fileBlockInfos.emplace(filePath, FileBlockInfo{numBlocks, numLinesPerBlock}); + numRows += metadata->num_rows(); + } +} + +// TODO(Guodong): Refactor duplicated between the three getMorsel() functions. +std::unique_ptr ReadParquetSharedState::getMorsel() { + std::unique_lock lck{mtx}; + while (true) { + if (currFileIdx >= filePaths.size()) { + // No more files to read. + return nullptr; + } + auto filePath = filePaths[currFileIdx]; + auto fileBlockInfo = fileBlockInfos.at(filePath); + if (currBlockIdx >= fileBlockInfo.numBlocks) { + // No more blocks to read in this file. + currFileIdx++; + currBlockIdx = 0; + currRowIdxInCurrFile = 1; + continue; + } + auto numRowsInBlock = fileBlockInfo.numRowsPerBlock[currBlockIdx]; + auto result = std::make_unique( + currRowIdx, currBlockIdx, numRowsInBlock, filePath, currRowIdxInCurrFile); + currRowIdx += numRowsInBlock; + currRowIdxInCurrFile += numRowsInBlock; + currBlockIdx++; + return result; + } +} + +void ReadNPYSharedState::countNumRows() { + uint8_t idx = 0; + uint64_t firstFileRows; + for (auto& filePath : filePaths) { + auto reader = make_unique(filePath); + numRows = reader->getNumRows(); + if (idx == 0) { + firstFileRows = numRows; + } + auto tableType = tableSchema->getProperty(idx).dataType; + reader->validate(tableType, firstFileRows, tableSchema->tableName); + auto numBlocks = (uint64_t)((numRows + CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY) / + CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY); + std::vector numRowsPerBlock(numBlocks, CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY); + fileBlockInfos.emplace(filePath, FileBlockInfo{numBlocks, numRowsPerBlock}); + idx++; + } +} + +// TODO(Guodong): Refactor duplicated between the three getMorsel() functions. +std::unique_ptr ReadNPYSharedState::getMorsel() { + std::unique_lock lck{mtx}; + while (true) { + if (currFileIdx >= filePaths.size()) { + // No more files to read. + return nullptr; + } + auto filePath = filePaths[currFileIdx]; + auto fileBlockInfo = fileBlockInfos.at(filePath); + if (currBlockIdx >= fileBlockInfo.numBlocks) { + // No more blocks to read in this file. + currFileIdx++; + currRowIdxInCurrFile = 1; + currBlockIdx = 0; + currRowIdx = 0; + continue; + } + auto numRowsInBlock = fileBlockInfo.numRowsPerBlock[currBlockIdx]; + auto result = std::make_unique( + currRowIdx, currBlockIdx, numRowsInBlock, filePath, currFileIdx, currRowIdxInCurrFile); + currRowIdx += numRowsInBlock; + currRowIdxInCurrFile += numRowsInBlock; + currBlockIdx++; + return result; + } +} + +} // namespace storage +} // namespace kuzu diff --git a/src/storage/copier/rel_copier.cpp b/src/storage/copier/rel_copier.cpp index b2b4accb9f..d5df99a6a9 100644 --- a/src/storage/copier/rel_copier.cpp +++ b/src/storage/copier/rel_copier.cpp @@ -32,6 +32,10 @@ void RelCopier::execute(ExecutionContext* executionContext) { } executeInternal(std::move(morsel)); } + { + std::unique_lock xLck{sharedState->mtx}; + sharedState->numRows += numRows; + } } void RelCopier::copyRelColumnsOrCountRelListsSize(row_idx_t rowIdx, arrow::RecordBatch* recordBatch, @@ -123,14 +127,14 @@ void RelCopier::copyRelColumns(offset_t rowIdx, arrow::RecordBatch* recordBatch, auto adjColumnChunk = relData->columns->adjColumnChunk.get(); checkViolationOfRelColumn(direction, boundPKOffsets); adjColumnChunk->copyArrowArray(*adjPKOffsets, nullptr, boundPKOffsets); - auto numRows = recordBatch->num_rows(); + auto numRowsInBatch = recordBatch->num_rows(); std::vector relIDs; - relIDs.resize(numRows); - for (auto i = 0u; i < numRows; i++) { + relIDs.resize(numRowsInBatch); + for (auto i = 0u; i < numRowsInBatch; i++) { relIDs[i] = rowIdx + i; } auto relIDArray = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)relIDs.data(), numRows); + std::make_shared(), (uint8_t*)relIDs.data(), numRowsInBatch); relData->columns->propertyColumnChunks[0]->copyArrowArray( *relIDArray, copyStates[0].get(), boundPKOffsets); // Skip the two pk columns in arrow record batch. @@ -294,7 +298,8 @@ void RelListsCounterAndColumnCopier::buildRelListsHeaders( } } -void ParquetRelListsCounterAndColumnsCopier::executeInternal(std::unique_ptr morsel) { +void ParquetRelListsCounterAndColumnsCopier::executeInternal( + std::unique_ptr morsel) { assert(!morsel->filePath.empty()); if (!reader || filePath != morsel->filePath) { reader = TableCopyUtils::createParquetReader(morsel->filePath, schema); @@ -306,42 +311,44 @@ void ParquetRelListsCounterAndColumnsCopier::executeInternal(std::unique_ptr recordBatch; TableCopyUtils::throwCopyExceptionIfNotOK(batchReader.ReadNext(&recordBatch)); - auto numTuples = recordBatch->num_rows(); + auto numRowsInBatch = recordBatch->num_rows(); std::vector boundPKOffsets, adjPKOffsets; - boundPKOffsets.resize(numTuples); - adjPKOffsets.resize(numTuples); + boundPKOffsets.resize(numRowsInBatch); + adjPKOffsets.resize(numRowsInBatch); indexLookup(recordBatch->column(0).get(), schema->srcPKDataType, pkIndexes[0], (offset_t*)boundPKOffsets.data(), morsel->filePath, morsel->rowIdxInFile); indexLookup(recordBatch->column(1).get(), schema->dstPKDataType, pkIndexes[1], (offset_t*)adjPKOffsets.data(), morsel->filePath, morsel->rowIdxInFile); std::vector> pkOffsetsArrays(2); pkOffsetsArrays[0] = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)boundPKOffsets.data(), numTuples); + std::make_shared(), (uint8_t*)boundPKOffsets.data(), numRowsInBatch); pkOffsetsArrays[1] = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)adjPKOffsets.data(), numTuples); + std::make_shared(), (uint8_t*)adjPKOffsets.data(), numRowsInBatch); copyRelColumnsOrCountRelListsSize(morsel->rowIdx, recordBatch.get(), FWD, pkOffsetsArrays); copyRelColumnsOrCountRelListsSize(morsel->rowIdx, recordBatch.get(), BWD, pkOffsetsArrays); + numRows += numRowsInBatch; } -void CSVRelListsCounterAndColumnsCopier::executeInternal(std::unique_ptr morsel) { +void CSVRelListsCounterAndColumnsCopier::executeInternal(std::unique_ptr morsel) { assert(!morsel->filePath.empty()); - auto csvRelCopyMorsel = reinterpret_cast(morsel.get()); + auto csvRelCopyMorsel = reinterpret_cast(morsel.get()); auto recordBatch = csvRelCopyMorsel->recordBatch; - auto numTuples = recordBatch->num_rows(); + auto numRowsInBatch = recordBatch->num_rows(); std::vector boundPKOffsets, adjPKOffsets; - boundPKOffsets.resize(numTuples); - adjPKOffsets.resize(numTuples); + boundPKOffsets.resize(numRowsInBatch); + adjPKOffsets.resize(numRowsInBatch); indexLookup(recordBatch->column(0).get(), schema->srcPKDataType, pkIndexes[0], boundPKOffsets.data(), morsel->filePath, morsel->rowIdxInFile); indexLookup(recordBatch->column(1).get(), schema->dstPKDataType, pkIndexes[1], adjPKOffsets.data(), morsel->filePath, morsel->rowIdxInFile); std::vector> pkOffsets(2); pkOffsets[0] = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)boundPKOffsets.data(), numTuples); + std::make_shared(), (uint8_t*)boundPKOffsets.data(), numRowsInBatch); pkOffsets[1] = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)adjPKOffsets.data(), numTuples); + std::make_shared(), (uint8_t*)adjPKOffsets.data(), numRowsInBatch); copyRelColumnsOrCountRelListsSize(morsel->rowIdx, recordBatch.get(), FWD, pkOffsets); copyRelColumnsOrCountRelListsSize(morsel->rowIdx, recordBatch.get(), BWD, pkOffsets); + numRows += numRowsInBatch; } void RelListsCopier::finalize() { @@ -359,7 +366,7 @@ void RelListsCopier::finalize() { } } -void ParquetRelListsCopier::executeInternal(std::unique_ptr morsel) { +void ParquetRelListsCopier::executeInternal(std::unique_ptr morsel) { assert(!morsel->filePath.empty()); if (!reader || filePath != morsel->filePath) { reader = TableCopyUtils::createParquetReader(morsel->filePath, schema); @@ -371,50 +378,52 @@ void ParquetRelListsCopier::executeInternal(std::unique_ptr morsel) arrow::TableBatchReader batchReader(*table); std::shared_ptr recordBatch; TableCopyUtils::throwCopyExceptionIfNotOK(batchReader.ReadNext(&recordBatch)); - auto numTuples = recordBatch->num_rows(); + auto numRowsInBatch = recordBatch->num_rows(); std::vector boundPKOffsets, adjPKOffsets; - boundPKOffsets.resize(numTuples); - adjPKOffsets.resize(numTuples); + boundPKOffsets.resize(numRowsInBatch); + adjPKOffsets.resize(numRowsInBatch); indexLookup(recordBatch->column(0).get(), schema->srcPKDataType, pkIndexes[0], boundPKOffsets.data(), morsel->filePath, morsel->rowIdxInFile); indexLookup(recordBatch->column(1).get(), schema->dstPKDataType, pkIndexes[1], adjPKOffsets.data(), morsel->filePath, morsel->rowIdxInFile); std::vector> pkOffsets(2); pkOffsets[0] = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)boundPKOffsets.data(), numTuples); + std::make_shared(), (uint8_t*)boundPKOffsets.data(), numRowsInBatch); pkOffsets[1] = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)adjPKOffsets.data(), numTuples); + std::make_shared(), (uint8_t*)adjPKOffsets.data(), numRowsInBatch); if (!fwdRelData->isColumns) { copyRelLists(morsel->rowIdx, recordBatch.get(), FWD, pkOffsets); } if (!bwdRelData->isColumns) { copyRelLists(morsel->rowIdx, recordBatch.get(), BWD, pkOffsets); } + numRows += numRowsInBatch; } -void CSVRelListsCopier::executeInternal(std::unique_ptr morsel) { +void CSVRelListsCopier::executeInternal(std::unique_ptr morsel) { assert(!morsel->filePath.empty()); - auto csvRelCopyMorsel = reinterpret_cast(morsel.get()); + auto csvRelCopyMorsel = reinterpret_cast(morsel.get()); auto recordBatch = csvRelCopyMorsel->recordBatch; - auto numTuples = recordBatch->num_rows(); + auto numRowsInBatch = recordBatch->num_rows(); std::vector boundPKOffsets, adjPKOffsets; - boundPKOffsets.resize(numTuples); - adjPKOffsets.resize(numTuples); + boundPKOffsets.resize(numRowsInBatch); + adjPKOffsets.resize(numRowsInBatch); indexLookup(recordBatch->column(0).get(), schema->srcPKDataType, pkIndexes[0], boundPKOffsets.data(), morsel->filePath, morsel->rowIdxInFile); indexLookup(recordBatch->column(1).get(), schema->dstPKDataType, pkIndexes[1], adjPKOffsets.data(), morsel->filePath, morsel->rowIdxInFile); std::vector> pkOffsets(2); pkOffsets[0] = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)boundPKOffsets.data(), numTuples); + std::make_shared(), (uint8_t*)boundPKOffsets.data(), numRowsInBatch); pkOffsets[1] = createArrowPrimitiveArray( - std::make_shared(), (uint8_t*)adjPKOffsets.data(), numTuples); + std::make_shared(), (uint8_t*)adjPKOffsets.data(), numRowsInBatch); if (!fwdRelData->isColumns) { copyRelLists(morsel->rowIdx, recordBatch.get(), FWD, pkOffsets); } if (!bwdRelData->isColumns) { copyRelLists(morsel->rowIdx, recordBatch.get(), BWD, pkOffsets); } + numRows += numRowsInBatch; } void RelCopyTask::run() { diff --git a/src/storage/copier/rel_copy_executor.cpp b/src/storage/copier/rel_copy_executor.cpp index a30eeb4c5c..a5ac0e4d92 100644 --- a/src/storage/copier/rel_copy_executor.cpp +++ b/src/storage/copier/rel_copy_executor.cpp @@ -12,8 +12,8 @@ RelCopyExecutor::RelCopyExecutor(CopyDescription& copyDescription, WAL* wal, TaskScheduler& taskScheduler, NodesStore& nodesStore, RelTable* table, RelTableSchema* tableSchema, RelsStatistics* relsStatistics) : copyDescription{copyDescription}, wal{wal}, outputDirectory{std::move(wal->getDirectory())}, - taskScheduler{taskScheduler}, tableSchema{tableSchema}, numRows{0}, - nodesStore{nodesStore}, table{table}, relsStatistics{relsStatistics} { + taskScheduler{taskScheduler}, tableSchema{tableSchema}, nodesStore{nodesStore}, table{table}, + relsStatistics{relsStatistics} { // Initialize rel data. fwdRelData = initializeDirectedInMemRelData(FWD); bwdRelData = initializeDirectedInMemRelData(BWD); @@ -77,37 +77,40 @@ offset_t RelCopyExecutor::copy(processor::ExecutionContext* executionContext) { wal->logCopyRelRecord(table->getRelTableID()); // We assume that COPY is a single-statement transaction, thus COPY rel is the only wal record. wal->flushAllPages(); - countRelListsSizeAndPopulateColumns(executionContext); + auto numRows = countRelListsSizeAndPopulateColumns(executionContext); if (!tableSchema->isSingleMultiplicityInDirection(FWD) || !tableSchema->isSingleMultiplicityInDirection(BWD)) { - populateRelLists(executionContext); + auto numPopulatedRelLists = populateRelLists(executionContext); + assert(numPopulatedRelLists == numRows); } relsStatistics->setNumTuplesForTable(tableSchema->tableID, numRows); return numRows; } -void RelCopyExecutor::countRelListsSizeAndPopulateColumns( +row_idx_t RelCopyExecutor::countRelListsSizeAndPopulateColumns( processor::ExecutionContext* executionContext) { auto relCopier = createRelCopier(RelCopierType::REL_COLUMN_COPIER_AND_LIST_COUNTER); auto sharedState = relCopier->getSharedState(); auto task = std::make_shared(std::move(relCopier), executionContext); taskScheduler.scheduleTaskAndWaitOrError(task, executionContext); - numRows = sharedState->numRows; + return sharedState->numRows; } -void RelCopyExecutor::populateRelLists(processor::ExecutionContext* executionContext) { +row_idx_t RelCopyExecutor::populateRelLists(processor::ExecutionContext* executionContext) { auto relCopier = createRelCopier(RelCopierType::REL_LIST_COPIER); + auto sharedState = relCopier->getSharedState(); auto task = std::make_shared(std::move(relCopier), executionContext); taskScheduler.scheduleTaskAndWaitOrError(task, executionContext); + return sharedState->numRows; } std::unique_ptr RelCopyExecutor::createRelCopier(RelCopierType relCopierType) { - std::shared_ptr sharedState; + std::shared_ptr sharedState; std::unique_ptr relCopier; switch (copyDescription.fileType) { case CopyDescription::FileType::CSV: { - sharedState = std::make_shared(copyDescription.filePaths, - fileBlockInfos, copyDescription.csvReaderConfig.get(), tableSchema); + sharedState = std::make_shared( + copyDescription.filePaths, *copyDescription.csvReaderConfig, tableSchema); switch (relCopierType) { case RelCopierType::REL_COLUMN_COPIER_AND_LIST_COUNTER: { relCopier = std::make_unique(sharedState, @@ -120,8 +123,11 @@ std::unique_ptr RelCopyExecutor::createRelCopier(RelCopierType relCop } } break; case CopyDescription::FileType::PARQUET: { + std::unordered_map fileBlockInfos; TableCopyUtils::countNumLines(copyDescription, tableSchema, fileBlockInfos); - sharedState = std::make_shared(copyDescription.filePaths, fileBlockInfos); + sharedState = std::make_shared( + copyDescription.filePaths, *copyDescription.csvReaderConfig, tableSchema); + sharedState->fileBlockInfos = std::move(fileBlockInfos); switch (relCopierType) { case RelCopierType::REL_COLUMN_COPIER_AND_LIST_COUNTER: { relCopier = std::make_unique(sharedState, diff --git a/src/storage/copier/table_copy_utils.cpp b/src/storage/copier/table_copy_utils.cpp index f042083ca7..e83ef9f4b0 100644 --- a/src/storage/copier/table_copy_utils.cpp +++ b/src/storage/copier/table_copy_utils.cpp @@ -11,61 +11,6 @@ using namespace kuzu::common; namespace kuzu { namespace storage { -std::unique_ptr CopySharedState::getMorsel() { - std::unique_lock lck{mtx}; - while (true) { - if (fileIdx >= filePaths.size()) { - // No more files to read. - return nullptr; - } - auto filePath = filePaths[fileIdx]; - auto fileBlockInfo = fileBlockInfos.at(filePath); - if (blockIdx >= fileBlockInfo.numBlocks) { - // No more blocks to read in this file. - fileIdx++; - blockIdx = 0; - currRowIdxInFile = 1; - continue; - } - auto numRowsInBlock = fileBlockInfo.numRowsPerBlock[blockIdx]; - auto result = std::make_unique( - numRows, blockIdx, numRowsInBlock, filePath, currRowIdxInFile); - numRows += numRowsInBlock; - currRowIdxInFile += numRowsInBlock; - blockIdx++; - return result; - } -} - -std::unique_ptr CSVCopySharedState::getMorsel() { - std::unique_lock lck{mtx}; - while (true) { - if (fileIdx >= filePaths.size()) { - // No more files to read. - return nullptr; - } - auto filePath = filePaths[fileIdx]; - if (!reader) { - reader = TableCopyUtils::createCSVReader(filePath, csvReaderConfig, tableSchema); - } - std::shared_ptr recordBatch; - TableCopyUtils::throwCopyExceptionIfNotOK(reader->ReadNext(&recordBatch)); - if (recordBatch == nullptr) { - // No more blocks to read in this file. - fileIdx++; - currRowIdxInFile = 1; - reader.reset(); - continue; - } - auto numRowsInBatch = recordBatch->num_rows(); - auto result = std::make_unique( - numRows, filePath, currRowIdxInFile, std::move(recordBatch)); - numRows += numRowsInBatch; - currRowIdxInFile += numRowsInBatch; - return result; - } -} - row_idx_t TableCopyUtils::countNumLines(CopyDescription& copyDescription, catalog::TableSchema* tableSchema, std::unordered_map& fileBlockInfos) { @@ -97,7 +42,6 @@ row_idx_t TableCopyUtils::countNumLinesCSV(CopyDescription& copyDescription, std::shared_ptr currBatch; uint64_t numBlocks = 0; std::vector numLinesPerBlock; - auto startNodeOffset = numRows; while (true) { throwCopyExceptionIfNotOK(csvStreamingReader->ReadNext(&currBatch)); if (currBatch == nullptr) { @@ -108,8 +52,7 @@ row_idx_t TableCopyUtils::countNumLinesCSV(CopyDescription& copyDescription, numLinesPerBlock.push_back(currNumRows); numRows += currNumRows; } - fileBlockInfos.emplace( - filePath, FileBlockInfo{startNodeOffset, numBlocks, numLinesPerBlock}); + fileBlockInfos.emplace(filePath, FileBlockInfo{numBlocks, numLinesPerBlock}); } return numRows; } @@ -124,12 +67,10 @@ row_idx_t TableCopyUtils::countNumLinesParquet(CopyDescription& copyDescription, auto metadata = reader->parquet_reader()->metadata(); uint64_t numBlocks = metadata->num_row_groups(); std::vector numLinesPerBlock(numBlocks); - auto startNodeOffset = numRows; for (auto blockIdx = 0; blockIdx < numBlocks; ++blockIdx) { numLinesPerBlock[blockIdx] = metadata->RowGroup(blockIdx)->num_rows(); } - fileBlockInfos.emplace( - filePath, FileBlockInfo{startNodeOffset, numBlocks, numLinesPerBlock}); + fileBlockInfos.emplace(filePath, FileBlockInfo{numBlocks, numLinesPerBlock}); numRows += metadata->num_rows(); } return numRows; @@ -156,8 +97,7 @@ offset_t TableCopyUtils::countNumLinesNpy(CopyDescription& copyDescription, numNodesInFile - blockIdx * CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY); numLinesPerBlock[blockIdx] = numLines; } - fileBlockInfos.emplace( - filePath, FileBlockInfo{0 /* start node offset */, numBlocks, numLinesPerBlock}); + fileBlockInfos.emplace(filePath, FileBlockInfo{numBlocks, numLinesPerBlock}); } return numRows; } @@ -230,7 +170,7 @@ std::unique_ptr TableCopyUtils::createParquetReader( reader->parquet_reader()->metadata()->schema()->group_node()->field_count(); if (expectedNumColumns != actualNumColumns) { // Note: Some parquet files may contain an index column. - throw common::CopyException(common::StringUtils::string_format( + throw CopyException(StringUtils::string_format( "Unmatched number of columns in parquet file. Expect: {}, got: {}.", expectedNumColumns, actualNumColumns)); } @@ -282,8 +222,7 @@ std::unique_ptr TableCopyUtils::getArrowVarList(const std::string& l, int auto value = convertStringToValue(element, *childDataType, copyDescription); values.push_back(std::move(value)); } - auto numBytesOfOverflow = - values.size() * storage::StorageUtils::getDataTypeSize(*childDataType); + auto numBytesOfOverflow = values.size() * StorageUtils::getDataTypeSize(*childDataType); if (numBytesOfOverflow >= BufferPoolConstants::PAGE_4KB_SIZE) { throw CopyException(StringUtils::string_format( "Maximum num bytes of a LIST is {}. Input list's num bytes is {}.", @@ -299,7 +238,7 @@ std::unique_ptr TableCopyUtils::getArrowFixedList(const std::string& int64_t to, const LogicalType& dataType, const CopyDescription& copyDescription) { assert(dataType.getLogicalTypeID() == LogicalTypeID::FIXED_LIST); auto split = getListElementPos(l, from, to, copyDescription); - auto listVal = std::make_unique(storage::StorageUtils::getDataTypeSize(dataType)); + auto listVal = std::make_unique(StorageUtils::getDataTypeSize(dataType)); auto childDataType = FixedListType::getChildType(&dataType); uint64_t numElementsRead = 0; for (auto pair : split) {