diff --git a/src/include/storage/copier/read_file_state.h b/src/include/storage/copier/read_file_state.h deleted file mode 100644 index d876be4230..0000000000 --- a/src/include/storage/copier/read_file_state.h +++ /dev/null @@ -1,114 +0,0 @@ -#pragma once - -#include "storage/copier/npy_reader.h" -#include "storage/copier/table_copy_utils.h" - -namespace kuzu { -namespace storage { - -class ReadFileMorsel { -public: - ReadFileMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx, - common::row_idx_t rowsRead, std::string filePath, common::row_idx_t rowIdxInFile) - : rowIdx{rowIdx}, blockIdx{blockIdx}, rowsRead{rowsRead}, filePath{std::move(filePath)}, - rowIdxInFile{rowIdxInFile} {}; - virtual ~ReadFileMorsel() = default; - -public: - // When reading from multiple files, rowIdx is accumulated. - common::row_idx_t rowIdx; - common::block_idx_t blockIdx; - common::row_idx_t rowsRead; - std::string filePath; - // Row idx in the current file. Equal to `rowIdx` when reading from only a single file. - common::row_idx_t rowIdxInFile; -}; - -// For CSV file, we need to read in streaming mode, so we need to read one batch at a time. -class ReadSerialMorsel : public ReadFileMorsel { -public: - ReadSerialMorsel(common::offset_t startRowIdx, std::string filePath, - common::row_idx_t rowIdxInFile, common::row_idx_t rowsRead, - std::shared_ptr recordTable) - : ReadFileMorsel{startRowIdx, common::INVALID_BLOCK_IDX, rowsRead, std::move(filePath), - rowIdxInFile}, - recordTable{std::move(recordTable)} {} - - std::shared_ptr recordTable; -}; - -class ReadFileSharedState { -public: - ReadFileSharedState(std::vector filePaths, common::CSVReaderConfig csvReaderConfig, - catalog::TableSchema* tableSchema) - : numRows{0}, rowsRead{0}, tableSchema{tableSchema}, filePaths{std::move(filePaths)}, - csvReaderConfig{csvReaderConfig}, currRowIdx{0}, currBlockIdx{0}, currFileIdx{0}, - currRowIdxInCurrFile{1}, leftOverData{nullptr} {}; - virtual ~ReadFileSharedState() = default; - - virtual void countNumRows() = 0; - virtual std::unique_ptr getMorsel() = 0; - virtual std::unique_ptr getMorselSerial() = 0; - -public: - std::mutex mtx; - common::row_idx_t numRows; - catalog::TableSchema* tableSchema; - std::vector filePaths; - common::CSVReaderConfig csvReaderConfig; - std::unordered_map fileBlockInfos; - common::row_idx_t currRowIdx; - common::block_idx_t currBlockIdx; - common::vector_idx_t currFileIdx; - common::row_idx_t currRowIdxInCurrFile; - common::row_idx_t rowsRead; - std::shared_ptr leftOverData; -}; - -// For CSV file, we need to read in streaming mode, so we need to keep the reader in the shared -// state. -class ReadCSVSharedState : public ReadFileSharedState { -public: - ReadCSVSharedState(std::vector filePaths, common::CSVReaderConfig csvReaderConfig, - catalog::TableSchema* tableSchema) - : ReadFileSharedState{std::move(filePaths), csvReaderConfig, tableSchema} {}; - - void countNumRows() final; - std::unique_ptr getMorsel() final; - std::unique_ptr getMorselSerial() final; - -private: - std::shared_ptr reader; -}; - -class ReadParquetSharedState : public ReadFileSharedState { -public: - explicit ReadParquetSharedState(std::vector filePaths, - common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema) - : ReadFileSharedState{std::move(filePaths), csvReaderConfig, tableSchema} {} - -private: - void countNumRows() override; - - std::unique_ptr getMorsel() final; - std::unique_ptr getMorselSerial() final; - - std::shared_ptr reader; -}; - -class ReadNPYSharedState : public ReadFileSharedState { -public: - ReadNPYSharedState(std::vector filePaths, common::CSVReaderConfig csvReaderConfig, - catalog::NodeTableSchema* tableSchema) - : ReadFileSharedState{std::move(filePaths), csvReaderConfig, tableSchema} {} - -private: - std::unique_ptr getMorsel() final; - std::unique_ptr getMorselSerial() final; - void countNumRows() final; - - std::shared_ptr reader; -}; - -} // namespace storage -} // namespace kuzu diff --git a/src/storage/copier/reader_state.cpp b/src/storage/copier/reader_state.cpp index 5079fe001f..9044559239 100644 --- a/src/storage/copier/reader_state.cpp +++ b/src/storage/copier/reader_state.cpp @@ -237,14 +237,13 @@ std::unique_ptr ReaderSharedState::getParallelMorsel() { } std::unique_ptr ReaderSharedState::getMorselOfNextBlock() { + if (currFileIdx >= blockInfos.size()) { + return std::make_unique(currFileIdx, INVALID_BLOCK_IDX, INVALID_ROW_IDX); + } auto numBlocksInFile = blockInfos[currFileIdx].numRowsPerBlock.size(); if (currBlockIdx >= numBlocksInFile) { currFileIdx += fileType == CopyDescription::FileType::NPY ? filePaths.size() : 1; currBlockIdx = 0; - if (currFileIdx >= filePaths.size()) { - // End of all files. - return std::make_unique(); - } } return std::make_unique(currFileIdx, currBlockIdx++, currRowIdx); }