diff --git a/src/include/common/constants.h b/src/include/common/constants.h index 555e7417d4..60e0172389 100644 --- a/src/include/common/constants.h +++ b/src/include/common/constants.h @@ -7,7 +7,7 @@ namespace kuzu { namespace common { -constexpr char KUZU_VERSION[] = "v0.4.0"; +constexpr char KUZU_VERSION[] = "v0.0.8.5"; constexpr uint64_t DEFAULT_VECTOR_CAPACITY_LOG_2 = 11; constexpr uint64_t DEFAULT_VECTOR_CAPACITY = (uint64_t)1 << DEFAULT_VECTOR_CAPACITY_LOG_2; @@ -118,9 +118,6 @@ struct CopyConstants { // Size (in bytes) of the chunks to be read in Node/Rel Copier static constexpr uint64_t CSV_READING_BLOCK_SIZE = 1 << 23; - // Number of rows per block for npy files - static constexpr uint64_t NUM_ROWS_PER_BLOCK_FOR_NPY = 2048; - // Default configuration for csv file parsing static constexpr const char* STRING_CSV_PARSING_OPTIONS[5] = { "ESCAPE", "DELIM", "QUOTE", "LIST_BEGIN", "LIST_END"}; diff --git a/src/include/common/data_chunk/sel_vector.h b/src/include/common/data_chunk/sel_vector.h index 88916e6239..f5d2b9c808 100644 --- a/src/include/common/data_chunk/sel_vector.h +++ b/src/include/common/data_chunk/sel_vector.h @@ -37,7 +37,10 @@ class SelectionVector { public: sel_t* selectedPositions; - sel_t selectedSize; + // TODO: type of `selectedSize` was changed from `sel_t` to `uint64_t`, which should be reverted + // when we removed arrow array in ValueVector. Currently, we need to keep size of arrow array, + // which could be larger than MAX of `sel_t`. + uint64_t selectedSize; private: std::unique_ptr selectedPositionsBuffer; diff --git a/src/include/processor/operator/persistent/copy_node.h b/src/include/processor/operator/persistent/copy_node.h index 45f95f4625..b18b18aadd 100644 --- a/src/include/processor/operator/persistent/copy_node.h +++ b/src/include/processor/operator/persistent/copy_node.h @@ -52,7 +52,6 @@ class CopyNodeSharedState { struct CopyNodeInfo { std::vector dataColumnPoses; - DataPos nodeOffsetPos; common::CopyDescription copyDesc; storage::NodeTable* table; storage::RelsStore* relsStore; @@ -71,7 +70,6 @@ class CopyNode : public Sink { for (auto& arrowColumnPos : copyNodeInfo.dataColumnPoses) { dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get()); } - nodeOffsetVector = resultSet->getValueVector(copyNodeInfo.nodeOffsetPos).get(); localNodeGroup = std::make_unique(sharedState->tableSchema, &sharedState->copyDesc); } @@ -111,7 +109,6 @@ class CopyNode : public Sink { storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes); private: - common::ValueVector* nodeOffsetVector; std::shared_ptr sharedState; CopyNodeInfo copyNodeInfo; std::vector dataColumnVectors; diff --git a/src/include/processor/operator/persistent/reader.h b/src/include/processor/operator/persistent/reader.h index dd90f0746f..d2ce5ab6c1 100644 --- a/src/include/processor/operator/persistent/reader.h +++ b/src/include/processor/operator/persistent/reader.h @@ -51,9 +51,21 @@ class Reader : public PhysicalOperator { bool getNextTuplesInternal(ExecutionContext* context) final; private: - void getNextNodeGroupInSerial(); - void getNextNodeGroupInParallel(); - void readNextNodeGroupInParallel(); + template + void readNextDataChunk(); + + template + inline void lockForSerial() { + if constexpr (READ_MODE == ReaderSharedState::ReadMode::SERIAL) { + sharedState->mtx.lock(); + } + } + template + inline void unlockForSerial() { + if constexpr (READ_MODE == ReaderSharedState::ReadMode::SERIAL) { + sharedState->mtx.unlock(); + } + } private: std::unique_ptr info; @@ -66,8 +78,7 @@ class Reader : public PhysicalOperator { read_rows_func_t readFunc; init_reader_data_func_t initFunc; - // For parallel reading. - std::unique_ptr readFuncData; + std::shared_ptr readFuncData; }; } // namespace processor diff --git a/src/include/processor/operator/persistent/reader_functions.h b/src/include/processor/operator/persistent/reader_functions.h new file mode 100644 index 0000000000..aa9054056c --- /dev/null +++ b/src/include/processor/operator/persistent/reader_functions.h @@ -0,0 +1,113 @@ +#pragma once + +#include "processor/operator/persistent/csv_reader.h" +#include "storage/copier/npy_reader.h" +#include "storage/copier/table_copy_utils.h" + +namespace kuzu { +namespace processor { + +// TODO(Xiyang): Move functors to system built-in functions. +struct ReaderFunctionData { + common::CSVReaderConfig csvReaderConfig; + catalog::TableSchema* tableSchema; + common::vector_idx_t fileIdx; + + virtual ~ReaderFunctionData() = default; +}; + +struct RelCSVReaderFunctionData : public ReaderFunctionData { + std::shared_ptr reader = nullptr; +}; + +struct NodeCSVReaderFunctionData : public ReaderFunctionData { + std::unique_ptr reader = nullptr; +}; + +struct ParquetReaderFunctionData : public ReaderFunctionData { + std::unique_ptr reader = nullptr; +}; + +struct NPYReaderFunctionData : public ReaderFunctionData { + std::unique_ptr reader = nullptr; +}; + +struct FileBlocksInfo { + common::row_idx_t numRows = 0; + common::block_idx_t numBlocks = 0; +}; + +using validate_func_t = + std::function& paths, catalog::TableSchema* tableSchema)>; +using init_reader_data_func_t = std::function& paths, common::vector_idx_t fileIdx, + common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema)>; +using count_blocks_func_t = std::function( + const std::vector& paths, common::CSVReaderConfig csvReaderConfig, + catalog::TableSchema* tableSchema, storage::MemoryManager* memoryManager)>; +using read_rows_func_t = std::function; + +struct ReaderFunctions { + static validate_func_t getValidateFunc(common::CopyDescription::FileType fileType); + static count_blocks_func_t getCountBlocksFunc( + common::CopyDescription::FileType fileType, common::TableType tableType); + static init_reader_data_func_t getInitDataFunc( + common::CopyDescription::FileType fileType, common::TableType tableType); + static read_rows_func_t getReadRowsFunc( + common::CopyDescription::FileType fileType, common::TableType tableType); + static std::shared_ptr getReadFuncData( + common::CopyDescription::FileType fileType, common::TableType tableType); + + static inline void validateCSVFiles( + const std::vector& paths, catalog::TableSchema* tableSchema) { + // DO NOTHING. + } + static inline void validateParquetFiles( + const std::vector& paths, catalog::TableSchema* tableSchema) { + // DO NOTHING. + } + static void validateNPYFiles( + const std::vector& paths, catalog::TableSchema* tableSchema); + + static std::vector countRowsInRelCSVFile(const std::vector& paths, + common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema, + storage::MemoryManager* memoryManager); + static std::vector countRowsInNodeCSVFile(const std::vector& paths, + common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema, + storage::MemoryManager* memoryManager); + static std::vector countRowsInParquetFile(const std::vector& paths, + common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema, + storage::MemoryManager* memoryManager); + static std::vector countRowsInNPYFile(const std::vector& paths, + common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema, + storage::MemoryManager* memoryManager); + + static void initRelCSVReadData(ReaderFunctionData& funcData, + const std::vector& paths, common::vector_idx_t fileIdx, + common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema); + static void initNodeCSVReadData(ReaderFunctionData& funcData, + const std::vector& paths, common::vector_idx_t fileIdx, + common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema); + static void initParquetReadData(ReaderFunctionData& funcData, + const std::vector& paths, common::vector_idx_t fileIdx, + common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema); + static void initNPYReadData(ReaderFunctionData& funcData, const std::vector& paths, + common::vector_idx_t fileIdx, common::CSVReaderConfig csvReaderConfig, + catalog::TableSchema* tableSchema); + + static void readRowsFromRelCSVFile(const ReaderFunctionData& funcData, + common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead); + static void readRowsFromNodeCSVFile(const ReaderFunctionData& funcData, + common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead); + static void readRowsFromParquetFile(const ReaderFunctionData& funcData, + common::block_idx_t blockIdx, common::DataChunk* vectorsToRead); + static void readRowsFromNPYFile(const ReaderFunctionData& funcData, + common::block_idx_t blockIdx, common::DataChunk* vectorsToRead); + + static std::unique_ptr getDataChunkToRead( + catalog::TableSchema* tableSchema, storage::MemoryManager* memoryManager); +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/reader_state.h b/src/include/processor/operator/persistent/reader_state.h index 5ba4201f36..3a5cd1c8f5 100644 --- a/src/include/processor/operator/persistent/reader_state.h +++ b/src/include/processor/operator/persistent/reader_state.h @@ -1,80 +1,18 @@ #pragma once #include "common/data_chunk/data_chunk.h" -#include "csv_reader.h" -#include "storage/copier/npy_reader.h" -#include "storage/copier/table_copy_utils.h" +#include "processor/operator/persistent/reader_functions.h" namespace kuzu { namespace processor { -struct ReaderFunctionData { - ReaderFunctionData(common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema, - common::vector_idx_t fileIdx) - : csvReaderConfig{csvReaderConfig}, tableSchema{tableSchema}, fileIdx{fileIdx} {} - virtual ~ReaderFunctionData() = default; - - common::CSVReaderConfig csvReaderConfig; - catalog::TableSchema* tableSchema; - common::vector_idx_t fileIdx; -}; - -struct RelCSVReaderFunctionData : public ReaderFunctionData { - RelCSVReaderFunctionData(common::CSVReaderConfig csvReaderConfig, - catalog::TableSchema* tableSchema, common::vector_idx_t fileIdx, - std::shared_ptr reader) - : ReaderFunctionData{csvReaderConfig, tableSchema, fileIdx}, reader{std::move(reader)} {} - - std::shared_ptr reader; -}; - -struct NodeCSVReaderFunctionData : public ReaderFunctionData { - NodeCSVReaderFunctionData(common::CSVReaderConfig csvReaderConfig, - catalog::TableSchema* tableSchema, common::vector_idx_t fileIdx, - std::unique_ptr csvReader) - : ReaderFunctionData{csvReaderConfig, tableSchema, fileIdx}, csvReader{ - std::move(csvReader)} {} - - std::unique_ptr csvReader; -}; - -struct ParquetReaderFunctionData : public ReaderFunctionData { - ParquetReaderFunctionData(common::CSVReaderConfig csvReaderConfig, - catalog::TableSchema* tableSchema, common::vector_idx_t fileIdx, - std::unique_ptr reader) - : ReaderFunctionData{csvReaderConfig, tableSchema, fileIdx}, reader{std::move(reader)} {} - - std::unique_ptr reader; -}; - -struct NPYReaderFunctionData : public ReaderFunctionData { - NPYReaderFunctionData(common::CSVReaderConfig csvReaderConfig, - catalog::TableSchema* tableSchema, common::vector_idx_t fileIdx, - std::unique_ptr reader) - : ReaderFunctionData{csvReaderConfig, tableSchema, fileIdx}, reader{std::move(reader)} {} - - std::unique_ptr reader; -}; - -struct FileBlocksInfo { - common::row_idx_t numRows = 0; - std::vector numRowsPerBlock; -}; - struct ReaderMorsel { - ReaderMorsel() - : fileIdx{common::INVALID_VECTOR_IDX}, blockIdx{common::INVALID_BLOCK_IDX}, - rowIdx{common::INVALID_ROW_IDX} {} - - ReaderMorsel( - common::vector_idx_t fileIdx, common::block_idx_t blockIdx, common::row_idx_t rowIdx) - : fileIdx{fileIdx}, blockIdx{blockIdx}, rowIdx{rowIdx} {} - - virtual ~ReaderMorsel() = default; - common::vector_idx_t fileIdx; common::block_idx_t blockIdx; - common::row_idx_t rowIdx; + + ReaderMorsel() : fileIdx{common::INVALID_VECTOR_IDX}, blockIdx{common::INVALID_BLOCK_IDX} {} + ReaderMorsel(common::vector_idx_t fileIdx, common::block_idx_t blockIdx) + : fileIdx{fileIdx}, blockIdx{blockIdx} {} }; class LeftArrowArrays { @@ -92,106 +30,49 @@ class LeftArrowArrays { std::vector leftArrays; }; -using validate_func_t = - std::function& paths, catalog::TableSchema* tableSchema)>; -using init_reader_data_func_t = std::function( - const std::vector& paths, common::vector_idx_t fileIdx, - common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema)>; -using count_blocks_func_t = std::function( - const std::vector& paths, common::CSVReaderConfig csvReaderConfig, - catalog::TableSchema* tableSchema, storage::MemoryManager*)>; -using read_rows_func_t = std::function; - -struct ReaderFunctions { - static validate_func_t getValidateFunc(common::CopyDescription::FileType fileType); - static count_blocks_func_t getCountBlocksFunc( - common::CopyDescription::FileType fileType, common::TableType tableType); - static init_reader_data_func_t getInitDataFunc( - common::CopyDescription::FileType fileType, common::TableType tableType); - static read_rows_func_t getReadRowsFunc( - common::CopyDescription::FileType fileType, common::TableType tableType); - - static inline void validateCSVFiles( - const std::vector& paths, catalog::TableSchema* tableSchema) { - // DO NOTHING. - } - static inline void validateParquetFiles( - const std::vector& paths, catalog::TableSchema* tableSchema) { - // DO NOTHING. - } - static void validateNPYFiles( - const std::vector& paths, catalog::TableSchema* tableSchema); - - static std::vector countRowsInRelCSVFile(const std::vector& paths, - common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema, - storage::MemoryManager* memoryManager); - static std::vector countRowsInNodeCSVFile(const std::vector& paths, - common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema, - storage::MemoryManager* memoryManager); - static std::vector countRowsInParquetFile(const std::vector& paths, - common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema, - storage::MemoryManager* memoryManager); - static std::vector countRowsInNPYFile(const std::vector& paths, - common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema, - storage::MemoryManager* memoryManager); - - static std::unique_ptr initRelCSVReadData( - const std::vector& paths, common::vector_idx_t fileIdx, - common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema); - static std::unique_ptr initNodeCSVReadData( - const std::vector& paths, common::vector_idx_t fileIdx, - common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema); - static std::unique_ptr initParquetReadData( - const std::vector& paths, common::vector_idx_t fileIdx, - common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema); - static std::unique_ptr initNPYReadData( - const std::vector& paths, common::vector_idx_t fileIdx, - common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema); - - static void readRowsFromRelCSVFile(const ReaderFunctionData& functionData, - common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead); - static void readRowsFromNodeCSVFile(const ReaderFunctionData& functionData, - common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead); - static void readRowsFromParquetFile(const ReaderFunctionData& functionData, - common::block_idx_t blockIdx, common::DataChunk* vectorsToRead); - static void readRowsFromNPYFile(const ReaderFunctionData& functionData, - common::block_idx_t blockIdx, common::DataChunk* vectorsToRead); - - static std::unique_ptr getDataChunkToRead( - catalog::TableSchema* tableSchema, storage::MemoryManager* memoryManager); -}; - class ReaderSharedState { friend class Reader; public: + enum class ReadMode : uint8_t { + PARALLEL = 0, + SERIAL = 1, + }; + ReaderSharedState( std::unique_ptr copyDescription, catalog::TableSchema* tableSchema) : copyDescription{std::move(copyDescription)}, tableSchema{tableSchema}, numRows{0}, - currFileIdx{0}, currBlockIdx{0}, currRowIdx{0} { - validateFunc = ReaderFunctions::getValidateFunc(this->copyDescription->fileType); - initFunc = ReaderFunctions::getInitDataFunc( - this->copyDescription->fileType, tableSchema->tableType); - countBlocksFunc = ReaderFunctions::getCountBlocksFunc( - this->copyDescription->fileType, tableSchema->tableType); - readFunc = ReaderFunctions::getReadRowsFunc( - this->copyDescription->fileType, tableSchema->tableType); - } + currFileIdx{0}, currBlockIdx{0}, currRowIdx{0} {} - void validate(); + void initialize(); + void validate() const; void countBlocks(storage::MemoryManager* memoryManager); - std::unique_ptr getSerialMorsel(common::DataChunk* vectorsToRead); - std::unique_ptr getParallelMorsel(); + inline void moveToNextFile() { + currFileIdx += (copyDescription->fileType == common::CopyDescription::FileType::NPY ? + copyDescription->filePaths.size() : + 1); + currBlockIdx = 0; + } + + template + std::unique_ptr getMorsel(); - inline void lock() { mtx.lock(); } - inline void unlock() { mtx.unlock(); } inline common::row_idx_t& getNumRowsRef() { return std::ref(numRows); } private: - std::unique_ptr getMorselOfNextBlock(); - void readNextBlock(common::DataChunk* dataChunk); + template + inline void lockForParallel() { + if constexpr (READ_MODE == ReadMode::PARALLEL) { + mtx.lock(); + } + } + template + inline void unlockForParallel() { + if constexpr (READ_MODE == ReadMode::PARALLEL) { + mtx.unlock(); + } + } public: std::mutex mtx; @@ -203,14 +84,14 @@ class ReaderSharedState { init_reader_data_func_t initFunc; count_blocks_func_t countBlocksFunc; read_rows_func_t readFunc; - std::unique_ptr readFuncData; + std::shared_ptr readFuncData; common::row_idx_t numRows; - std::vector blockInfos; + std::vector fileInfos; common::vector_idx_t currFileIdx; common::block_idx_t currBlockIdx; - common::row_idx_t currRowIdx; + std::atomic currRowIdx; private: LeftArrowArrays leftArrowArrays; diff --git a/src/include/processor/plan_mapper.h b/src/include/processor/plan_mapper.h index f9a34d585c..a3b4875673 100644 --- a/src/include/processor/plan_mapper.h +++ b/src/include/processor/plan_mapper.h @@ -98,7 +98,7 @@ class PlanMapper { std::unique_ptr createReader(common::CopyDescription* copyDesc, catalog::TableSchema* tableSchema, planner::Schema* outSchema, const std::vector>& dataColumnExpressions, - const std::shared_ptr& offsetExpression, bool preserveOrder); + const std::shared_ptr& offsetExpression, bool readingInSerial); std::unique_ptr createIndexLookup(catalog::RelTableSchema* tableSchema, const std::vector& dataPoses, const DataPos& boundOffsetDataPos, const DataPos& nbrOffsetDataPos, std::unique_ptr readerOp); diff --git a/src/include/storage/copier/table_copy_utils.h b/src/include/storage/copier/table_copy_utils.h index 1cdb3f0a76..529972aab1 100644 --- a/src/include/storage/copier/table_copy_utils.h +++ b/src/include/storage/copier/table_copy_utils.h @@ -20,13 +20,6 @@ namespace kuzu { namespace storage { -struct FileBlockInfo { - FileBlockInfo(uint64_t numBlocks, std::vector numRowsPerBlock) - : numBlocks{numBlocks}, numRowsPerBlock{std::move(numRowsPerBlock)} {} - uint64_t numBlocks; - std::vector numRowsPerBlock; -}; - struct StructFieldIdxAndValue { StructFieldIdxAndValue(common::struct_field_idx_t fieldIdx, std::string fieldValue) : fieldIdx{fieldIdx}, fieldValue{std::move(fieldValue)} {} @@ -52,10 +45,6 @@ class TableCopyUtils { static std::unique_ptr createParquetReader( const std::string& filePath, catalog::TableSchema* tableSchema); - static common::row_idx_t countNumLines(common::CopyDescription& copyDescription, - catalog::TableSchema* tableSchema, - std::unordered_map& fileBlockInfos); - static std::vector> splitByDelimiter(const std::string& l, int64_t from, int64_t to, const common::CSVReaderConfig& csvReaderConfig); @@ -74,15 +63,6 @@ class TableCopyUtils { uint64_t length); private: - static common::row_idx_t countNumLinesCSV(common::CopyDescription& copyDescription, - catalog::TableSchema* tableSchema, - std::unordered_map& fileBlockInfos); - static common::row_idx_t countNumLinesParquet(common::CopyDescription& copyDescription, - catalog::TableSchema* tableSchema, - std::unordered_map& fileBlockInfos); - static common::row_idx_t countNumLinesNpy(common::CopyDescription& copyDescription, - catalog::TableSchema* tableSchema, - std::unordered_map& fileBlockInfos); static std::unique_ptr convertStringToValue(std::string element, const common::LogicalType& type, const common::CSVReaderConfig& csvReaderConfig); static std::vector getColumnNamesToRead(catalog::TableSchema* tableSchema); diff --git a/src/processor/map/map_copy_from.cpp b/src/processor/map/map_copy_from.cpp index b07b4a728f..80a3465787 100644 --- a/src/processor/map/map_copy_from.cpp +++ b/src/processor/map/map_copy_from.cpp @@ -33,7 +33,7 @@ std::unique_ptr PlanMapper::mapCopyFrom(LogicalOperator* logic std::unique_ptr PlanMapper::createReader(CopyDescription* copyDesc, TableSchema* tableSchema, Schema* outSchema, const std::vector>& dataColumnExpressions, - const std::shared_ptr& offsetExpression, bool preserveOrder) { + const std::shared_ptr& offsetExpression, bool containsSerial) { auto readerSharedState = std::make_shared(copyDesc->copy(), tableSchema); std::vector dataColumnsPos; dataColumnsPos.reserve(dataColumnExpressions.size()); @@ -41,7 +41,7 @@ std::unique_ptr PlanMapper::createReader(CopyDescription* copy dataColumnsPos.emplace_back(outSchema->getExpressionPos(*expression)); } auto nodeOffsetPos = DataPos(outSchema->getExpressionPos(*offsetExpression)); - auto readInfo = std::make_unique(nodeOffsetPos, dataColumnsPos, preserveOrder); + auto readInfo = std::make_unique(nodeOffsetPos, dataColumnsPos, containsSerial); return std::make_unique( std::move(readInfo), readerSharedState, getOperatorID(), tableSchema->tableName); } @@ -62,9 +62,9 @@ std::unique_ptr PlanMapper::mapCopyNodeFrom( auto copyNodeSharedState = std::make_shared(readerOp->getSharedState()->getNumRowsRef(), tableSchema, nodeTable, *copyFromInfo->copyDesc, memoryManager); - CopyNodeInfo copyNodeDataInfo{readerInfo->dataColumnsPos, readerInfo->nodeOffsetPos, - *copyFromInfo->copyDesc, nodeTable, &storageManager.getRelsStore(), catalog, - storageManager.getWAL(), copyFromInfo->containsSerial}; + CopyNodeInfo copyNodeDataInfo{readerInfo->dataColumnsPos, *copyFromInfo->copyDesc, nodeTable, + &storageManager.getRelsStore(), catalog, storageManager.getWAL(), + copyFromInfo->containsSerial}; auto copyNode = std::make_unique(copyNodeSharedState, copyNodeDataInfo, std::make_unique(copyFrom->getSchema()), std::move(reader), getOperatorID(), copyFrom->getExpressionsForPrinting()); diff --git a/src/processor/operator/persistent/CMakeLists.txt b/src/processor/operator/persistent/CMakeLists.txt index 02585cc06b..8b61616f64 100644 --- a/src/processor/operator/persistent/CMakeLists.txt +++ b/src/processor/operator/persistent/CMakeLists.txt @@ -14,6 +14,7 @@ add_library(kuzu_processor_operator_persistent insert_executor.cpp merge.cpp reader.cpp + reader_functions.cpp reader_state.cpp set.cpp set_executor.cpp) diff --git a/src/processor/operator/persistent/copy_node.cpp b/src/processor/operator/persistent/copy_node.cpp index f4ff5f12d2..7cdd5221f9 100644 --- a/src/processor/operator/persistent/copy_node.cpp +++ b/src/processor/operator/persistent/copy_node.cpp @@ -50,8 +50,7 @@ CopyNode::CopyNode(std::shared_ptr sharedState, CopyNodeInf std::unique_ptr child, uint32_t id, const std::string& paramsString) : Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child), id, paramsString}, - nodeOffsetVector{nullptr}, sharedState{std::move(sharedState)}, copyNodeInfo{std::move( - copyNodeInfo)} {} + sharedState{std::move(sharedState)}, copyNodeInfo{std::move(copyNodeInfo)} {} void CopyNodeSharedState::appendLocalNodeGroup(std::unique_ptr localNodeGroup) { std::unique_lock xLck{mtx}; diff --git a/src/processor/operator/persistent/reader.cpp b/src/processor/operator/persistent/reader.cpp index 7943a05510..9f760f0f09 100644 --- a/src/processor/operator/persistent/reader.cpp +++ b/src/processor/operator/persistent/reader.cpp @@ -10,6 +10,7 @@ namespace kuzu { namespace processor { void Reader::initGlobalStateInternal(ExecutionContext* context) { + sharedState->initialize(); sharedState->validate(); sharedState->countBlocks(context->memoryManager); } @@ -25,49 +26,62 @@ void Reader::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* cont readFunc = ReaderFunctions::getReadRowsFunc( sharedState->copyDescription->fileType, sharedState->tableSchema->getTableType()); nodeOffsetVector = resultSet->getValueVector(info->nodeOffsetPos).get(); + assert(!sharedState->copyDescription->filePaths.empty()); + switch (sharedState->copyDescription->fileType) { + case CopyDescription::FileType::CSV: { + readFuncData = sharedState->readFuncData; + } break; + default: { + readFuncData = ReaderFunctions::getReadFuncData( + sharedState->copyDescription->fileType, sharedState->tableSchema->getTableType()); + initFunc(*readFuncData, sharedState->copyDescription->filePaths, 0, + *sharedState->copyDescription->csvReaderConfig, sharedState->tableSchema); + } + } } bool Reader::getNextTuplesInternal(ExecutionContext* context) { sharedState->copyDescription->fileType == common::CopyDescription::FileType::CSV ? - getNextNodeGroupInSerial() : - getNextNodeGroupInParallel(); + readNextDataChunk() : + readNextDataChunk(); return dataChunk->state->selVector->selectedSize != 0; } -void Reader::getNextNodeGroupInSerial() { - auto morsel = sharedState->getSerialMorsel(dataChunk.get()); - if (morsel->fileIdx == INVALID_VECTOR_IDX) { - return; - } - nodeOffsetVector->setValue( - nodeOffsetVector->state->selVector->selectedPositions[0], morsel->rowIdx); -} - -void Reader::getNextNodeGroupInParallel() { - readNextNodeGroupInParallel(); - if (leftArrowArrays.getLeftNumRows() == 0) { +template +void Reader::readNextDataChunk() { + lockForSerial(); + while (true) { + if (leftArrowArrays.getLeftNumRows() > 0) { + auto numLeftToAppend = + std::min(leftArrowArrays.getLeftNumRows(), DEFAULT_VECTOR_CAPACITY); + leftArrowArrays.appendToDataChunk(dataChunk.get(), numLeftToAppend); + auto currRowIdx = sharedState->currRowIdx.fetch_add(numLeftToAppend); + nodeOffsetVector->setValue( + nodeOffsetVector->state->selVector->selectedPositions[0], currRowIdx); + break; + } dataChunk->state->selVector->selectedSize = 0; - } else { - int64_t numRowsToReturn = - std::min(leftArrowArrays.getLeftNumRows(), DEFAULT_VECTOR_CAPACITY); - leftArrowArrays.appendToDataChunk(dataChunk.get(), numRowsToReturn); - } -} - -void Reader::readNextNodeGroupInParallel() { - if (leftArrowArrays.getLeftNumRows() == 0) { - auto morsel = sharedState->getParallelMorsel(); + auto morsel = sharedState->getMorsel(); if (morsel->fileIdx == INVALID_VECTOR_IDX) { - return; + // No more files to read. + break; } - if (!readFuncData || morsel->fileIdx != readFuncData->fileIdx) { - readFuncData = initFunc(sharedState->copyDescription->filePaths, morsel->fileIdx, + if (morsel->fileIdx != readFuncData->fileIdx) { + initFunc(*readFuncData, sharedState->copyDescription->filePaths, morsel->fileIdx, *sharedState->copyDescription->csvReaderConfig, sharedState->tableSchema); } readFunc(*readFuncData, morsel->blockIdx, dataChunk.get()); - leftArrowArrays.appendFromDataChunk(dataChunk.get()); + if (dataChunk->state->selVector->selectedSize > 0) { + leftArrowArrays.appendFromDataChunk(dataChunk.get()); + } else { + sharedState->moveToNextFile(); + } } + unlockForSerial(); } +template void Reader::readNextDataChunk(); +template void Reader::readNextDataChunk(); + } // namespace processor } // namespace kuzu diff --git a/src/processor/operator/persistent/reader_functions.cpp b/src/processor/operator/persistent/reader_functions.cpp new file mode 100644 index 0000000000..6108a57f5f --- /dev/null +++ b/src/processor/operator/persistent/reader_functions.cpp @@ -0,0 +1,302 @@ +#include "processor/operator/persistent/reader_functions.h" + +using namespace kuzu::common; +using namespace kuzu::catalog; +using namespace kuzu::storage; + +namespace kuzu { +namespace processor { + +validate_func_t ReaderFunctions::getValidateFunc(CopyDescription::FileType fileType) { + switch (fileType) { + case CopyDescription::FileType::CSV: + return validateCSVFiles; + case CopyDescription::FileType::PARQUET: + return validateParquetFiles; + case CopyDescription::FileType::NPY: + return validateNPYFiles; + default: + throw NotImplementedException{"ReaderFunctions::getValidateFunc"}; + } +} + +count_blocks_func_t ReaderFunctions::getCountBlocksFunc( + CopyDescription::FileType fileType, TableType tableType) { + switch (fileType) { + case CopyDescription::FileType::CSV: { + switch (tableType) { + case TableType::NODE: + return countRowsInNodeCSVFile; + case TableType::REL: + return countRowsInRelCSVFile; + default: + throw NotImplementedException{"ReaderFunctions::getCountBlocksFunc"}; + } + } + case CopyDescription::FileType::PARQUET: { + return countRowsInParquetFile; + } + case CopyDescription::FileType::NPY: { + return countRowsInNPYFile; + } + default: { + throw NotImplementedException{"ReaderFunctions::getRowsCounterFunc"}; + } + } +} + +init_reader_data_func_t ReaderFunctions::getInitDataFunc( + CopyDescription::FileType fileType, TableType tableType) { + switch (fileType) { + case CopyDescription::FileType::CSV: { + switch (tableType) { + case TableType::NODE: + return initNodeCSVReadData; + case TableType::REL: + return initRelCSVReadData; + default: + throw NotImplementedException{"ReaderFunctions::getInitDataFunc"}; + } + } + case CopyDescription::FileType::PARQUET: { + return initParquetReadData; + } + case CopyDescription::FileType::NPY: { + return initNPYReadData; + } + default: { + throw NotImplementedException{"ReaderFunctions::getInitDataFunc"}; + } + } +} + +read_rows_func_t ReaderFunctions::getReadRowsFunc( + CopyDescription::FileType fileType, common::TableType tableType) { + switch (fileType) { + case CopyDescription::FileType::CSV: { + switch (tableType) { + case TableType::NODE: + return readRowsFromNodeCSVFile; + case TableType::REL: + return readRowsFromRelCSVFile; + default: + throw NotImplementedException{"ReaderFunctions::getReadRowsFunc"}; + } + } + case CopyDescription::FileType::PARQUET: { + return readRowsFromParquetFile; + } + case CopyDescription::FileType::NPY: { + return readRowsFromNPYFile; + } + default: { + throw NotImplementedException{"ReaderFunctions::getReadRowsFunc"}; + } + } +} + +std::shared_ptr ReaderFunctions::getReadFuncData( + CopyDescription::FileType fileType, TableType tableType) { + switch (fileType) { + case CopyDescription::FileType::CSV: { + switch (tableType) { + case TableType::NODE: + return std::make_shared(); + case TableType::REL: + return std::make_shared(); + default: + throw NotImplementedException{"ReaderFunctions::getReadFuncData"}; + } + } + case CopyDescription::FileType::PARQUET: { + return std::make_shared(); + } + case CopyDescription::FileType::NPY: { + return std::make_shared(); + } + default: { + throw NotImplementedException{"ReaderFunctions::getReadFuncData"}; + } + } +} + +void ReaderFunctions::validateNPYFiles( + const std::vector& paths, TableSchema* tableSchema) { + assert(!paths.empty() && paths.size() == tableSchema->getNumProperties()); + row_idx_t numRows; + for (auto i = 0u; i < paths.size(); i++) { + auto reader = make_unique(paths[i]); + if (i == 0) { + numRows = reader->getNumRows(); + } + auto tableType = tableSchema->getProperty(i)->getDataType(); + reader->validate(*tableType, numRows, tableSchema->tableName); + } +} + +std::vector ReaderFunctions::countRowsInRelCSVFile( + const std::vector& paths, CSVReaderConfig csvReaderConfig, + TableSchema* tableSchema, MemoryManager* memoryManager) { + std::vector fileInfos(paths.size(), {INVALID_ROW_IDX, INVALID_BLOCK_IDX}); + return fileInfos; +} + +std::vector ReaderFunctions::countRowsInNodeCSVFile( + const std::vector& paths, common::CSVReaderConfig csvReaderConfig, + catalog::TableSchema* tableSchema, storage::MemoryManager* memoryManager) { + std::vector fileInfos; + fileInfos.reserve(paths.size()); + auto dataChunk = getDataChunkToRead(tableSchema, memoryManager); + // We should add a countNumRows() API to csvReader, so that it doesn't need to read data to + // valueVector when counting the csv file. + for (const auto& path : paths) { + auto reader = std::make_unique(path, csvReaderConfig, tableSchema); + row_idx_t numRowsInFile = 0; + block_idx_t numBlocks = 0; + while (true) { + dataChunk->state->selVector->selectedSize = 0; + auto numRowsRead = reader->ParseCSV(*dataChunk); + if (numRowsRead == 0) { + break; + } + numRowsInFile += numRowsRead; + numBlocks++; + } + FileBlocksInfo fileBlocksInfo{numRowsInFile, numBlocks}; + fileInfos.push_back(fileBlocksInfo); + } + return fileInfos; +} + +std::vector ReaderFunctions::countRowsInParquetFile( + const std::vector& paths, CSVReaderConfig csvReaderConfig, + TableSchema* tableSchema, MemoryManager* memoryManager) { + std::vector fileInfos; + fileInfos.reserve(paths.size()); + for (const auto& path : paths) { + std::unique_ptr reader = + TableCopyUtils::createParquetReader(path, tableSchema); + auto metadata = reader->parquet_reader()->metadata(); + FileBlocksInfo fileBlocksInfo{ + (row_idx_t)metadata->num_rows(), (block_idx_t)metadata->num_row_groups()}; + fileInfos.push_back(fileBlocksInfo); + } + return fileInfos; +} + +std::vector ReaderFunctions::countRowsInNPYFile( + const std::vector& paths, CSVReaderConfig csvReaderConfig, + TableSchema* tableSchema, MemoryManager* memoryManager) { + assert(!paths.empty()); + auto reader = make_unique(paths[0]); + auto numRows = reader->getNumRows(); + auto numBlocks = + (block_idx_t)((numRows + DEFAULT_VECTOR_CAPACITY - 1) / DEFAULT_VECTOR_CAPACITY); + return {{numRows, numBlocks}}; +} + +void ReaderFunctions::initRelCSVReadData(ReaderFunctionData& funcData, + const std::vector& paths, vector_idx_t fileIdx, CSVReaderConfig csvReaderConfig, + TableSchema* tableSchema) { + assert(fileIdx < paths.size()); + funcData.fileIdx = fileIdx; + funcData.tableSchema = tableSchema; + reinterpret_cast(funcData).reader = + TableCopyUtils::createCSVReader(paths[fileIdx], &csvReaderConfig, tableSchema); +} + +void ReaderFunctions::initNodeCSVReadData(ReaderFunctionData& funcData, + const std::vector& paths, vector_idx_t fileIdx, CSVReaderConfig csvReaderConfig, + TableSchema* tableSchema) { + assert(fileIdx < paths.size()); + funcData.fileIdx = fileIdx; + funcData.tableSchema = tableSchema; + reinterpret_cast(funcData).reader = + std::make_unique(paths[fileIdx], csvReaderConfig, tableSchema); +} + +void ReaderFunctions::initParquetReadData(ReaderFunctionData& funcData, + const std::vector& paths, vector_idx_t fileIdx, CSVReaderConfig csvReaderConfig, + TableSchema* tableSchema) { + assert(fileIdx < paths.size()); + funcData.fileIdx = fileIdx; + funcData.tableSchema = tableSchema; + reinterpret_cast(funcData).reader = + TableCopyUtils::createParquetReader(paths[fileIdx], tableSchema); +} + +void ReaderFunctions::initNPYReadData(ReaderFunctionData& funcData, + const std::vector& paths, vector_idx_t fileIdx, CSVReaderConfig csvReaderConfig, + TableSchema* tableSchema) { + funcData.fileIdx = fileIdx; + funcData.tableSchema = tableSchema; + reinterpret_cast(funcData).reader = + make_unique(paths); +} + +void ReaderFunctions::readRowsFromRelCSVFile( + const ReaderFunctionData& functionData, block_idx_t blockIdx, DataChunk* dataChunkToRead) { + auto& readerData = (RelCSVReaderFunctionData&)(functionData); + std::shared_ptr recordBatch; + TableCopyUtils::throwCopyExceptionIfNotOK(readerData.reader->ReadNext(&recordBatch)); + if (recordBatch == nullptr) { + dataChunkToRead->state->selVector->selectedSize = 0; + return; + } + for (auto i = 0u; i < dataChunkToRead->getNumValueVectors(); i++) { + ArrowColumnVector::setArrowColumn(dataChunkToRead->getValueVector(i).get(), + std::make_shared(recordBatch->column((int)i))); + } + dataChunkToRead->state->selVector->selectedSize = recordBatch->num_rows(); +} + +void ReaderFunctions::readRowsFromNodeCSVFile( + const ReaderFunctionData& functionData, block_idx_t blockIdx, DataChunk* dataChunkToRead) { + auto& readerData = reinterpret_cast(functionData); + readerData.reader->ParseCSV(*dataChunkToRead); +} + +void ReaderFunctions::readRowsFromParquetFile(const ReaderFunctionData& functionData, + block_idx_t blockIdx, common::DataChunk* dataChunkToRead) { + auto& readerData = (ParquetReaderFunctionData&)(functionData); + std::shared_ptr table; + TableCopyUtils::throwCopyExceptionIfNotOK( + readerData.reader->RowGroup(static_cast(blockIdx))->ReadTable(&table)); + assert(table); + for (auto i = 0u; i < dataChunkToRead->getNumValueVectors(); i++) { + ArrowColumnVector::setArrowColumn( + dataChunkToRead->getValueVector(i).get(), table->column((int)i)); + } + dataChunkToRead->state->selVector->selectedSize = table->num_rows(); +} + +void ReaderFunctions::readRowsFromNPYFile(const ReaderFunctionData& functionData, + common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead) { + auto& readerData = (NPYReaderFunctionData&)(functionData); + auto recordBatch = readerData.reader->readBlock(blockIdx); + for (auto i = 0u; i < dataChunkToRead->getNumValueVectors(); i++) { + ArrowColumnVector::setArrowColumn(dataChunkToRead->getValueVector(i).get(), + std::make_shared(recordBatch->column((int)i))); + } + dataChunkToRead->state->selVector->selectedSize = recordBatch->num_rows(); +} + +std::unique_ptr ReaderFunctions::getDataChunkToRead( + catalog::TableSchema* tableSchema, MemoryManager* memoryManager) { + std::vector> valueVectorsToRead; + for (auto i = 0u; i < tableSchema->getNumProperties(); i++) { + auto property = tableSchema->getProperty(i); + if (property->getDataType()->getLogicalTypeID() != LogicalTypeID::SERIAL) { + valueVectorsToRead.emplace_back(std::make_unique( + *tableSchema->getProperty(i)->getDataType(), memoryManager)); + } + } + auto dataChunk = std::make_unique(valueVectorsToRead.size()); + for (auto i = 0u; i < valueVectorsToRead.size(); i++) { + dataChunk->insert(i, std::move(valueVectorsToRead[i])); + } + return dataChunk; +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/persistent/reader_state.cpp b/src/processor/operator/persistent/reader_state.cpp index e8ba7f3b28..fd96aaf8a1 100644 --- a/src/processor/operator/persistent/reader_state.cpp +++ b/src/processor/operator/persistent/reader_state.cpp @@ -8,9 +8,8 @@ namespace kuzu { namespace processor { void LeftArrowArrays::appendFromDataChunk(common::DataChunk* dataChunk) { + leftNumRows += dataChunk->state->selVector->selectedSize; if (dataChunk->getValueVector(0)->dataType.getPhysicalType() == PhysicalTypeID::ARROW_COLUMN) { - leftNumRows += - ArrowColumnVector::getArrowColumn(dataChunk->getValueVector(0).get())->length(); leftArrays.resize(dataChunk->getNumValueVectors()); for (auto i = 0u; i < dataChunk->getNumValueVectors(); i++) { for (auto& array : @@ -18,403 +17,101 @@ void LeftArrowArrays::appendFromDataChunk(common::DataChunk* dataChunk) { leftArrays[i].push_back(array); } } - } else { - leftNumRows += dataChunk->state->selVector->selectedSize; } } void LeftArrowArrays::appendToDataChunk(common::DataChunk* dataChunk, uint64_t numRowsToAppend) { leftNumRows -= numRowsToAppend; - if (dataChunk->getValueVector(0)->dataType.getPhysicalType() == PhysicalTypeID::ARROW_COLUMN) { - int64_t numRowsAppended = 0; - auto& arrayVectorToComputeSlice = leftArrays[0]; - std::vector arrayVectors; - arrayVectors.resize(leftArrays.size()); - uint64_t arrayVectorIdx; - for (arrayVectorIdx = 0u; arrayVectorIdx < arrayVectorToComputeSlice.size(); - arrayVectorIdx++) { - if (numRowsAppended >= numRowsToAppend) { - break; + if (dataChunk->getValueVector(0)->dataType.getPhysicalType() != PhysicalTypeID::ARROW_COLUMN) { + return; + } + int64_t numRowsAppended = 0; + auto& arrayVectorToComputeSlice = leftArrays[0]; + std::vector arrayVectors; + arrayVectors.resize(leftArrays.size()); + uint64_t arrayVectorIdx; + for (arrayVectorIdx = 0u; arrayVectorIdx < arrayVectorToComputeSlice.size(); arrayVectorIdx++) { + if (numRowsAppended >= numRowsToAppend) { + break; + } else { + auto arrayToComputeSlice = arrayVectorToComputeSlice[arrayVectorIdx]; + int64_t numRowsToAppendInCurArray = arrayToComputeSlice->length(); + if (numRowsToAppend - numRowsAppended < arrayToComputeSlice->length()) { + numRowsToAppendInCurArray = (int64_t)numRowsToAppend - numRowsAppended; + for (auto j = 0u; j < leftArrays.size(); j++) { + auto vectorToSlice = leftArrays[j][arrayVectorIdx]; + leftArrays[j].push_back(vectorToSlice->Slice(numRowsToAppendInCurArray)); + arrayVectors[j].push_back(vectorToSlice->Slice(0, numRowsToAppendInCurArray)); + } } else { - auto arrayToComputeSlice = arrayVectorToComputeSlice[arrayVectorIdx]; - int64_t numRowsToAppendInCurArray = arrayToComputeSlice->length(); - if (numRowsToAppend - numRowsAppended < arrayToComputeSlice->length()) { - numRowsToAppendInCurArray = (int64_t)numRowsToAppend - numRowsAppended; - for (auto j = 0u; j < leftArrays.size(); j++) { - auto vectorToSlice = leftArrays[j][arrayVectorIdx]; - leftArrays[j].push_back(vectorToSlice->Slice(numRowsToAppendInCurArray)); - arrayVectors[j].push_back( - vectorToSlice->Slice(0, numRowsToAppendInCurArray)); - } - } else { - for (auto j = 0u; j < leftArrays.size(); j++) { - arrayVectors[j].push_back(leftArrays[j][arrayVectorIdx]); - } + for (auto j = 0u; j < leftArrays.size(); j++) { + arrayVectors[j].push_back(leftArrays[j][arrayVectorIdx]); } - numRowsAppended += numRowsToAppendInCurArray; } + numRowsAppended += numRowsToAppendInCurArray; } - for (auto& arrayVector : leftArrays) { - arrayVector.erase(arrayVector.begin(), arrayVector.begin() + arrayVectorIdx); - } - for (auto i = 0u; i < dataChunk->getNumValueVectors(); i++) { - auto chunkArray = std::make_shared(std::move(arrayVectors[i])); - ArrowColumnVector::setArrowColumn( - dataChunk->getValueVector(i).get(), std::move(chunkArray)); - } - dataChunk->state->selVector->selectedSize = numRowsToAppend; - } -} - -validate_func_t ReaderFunctions::getValidateFunc(CopyDescription::FileType fileType) { - switch (fileType) { - case CopyDescription::FileType::CSV: - return validateCSVFiles; - case CopyDescription::FileType::PARQUET: - return validateParquetFiles; - case CopyDescription::FileType::NPY: - return validateNPYFiles; - default: - throw NotImplementedException{"ReaderFunctions::getValidateFunc"}; - } -} - -count_blocks_func_t ReaderFunctions::getCountBlocksFunc( - CopyDescription::FileType fileType, TableType tableType) { - switch (fileType) { - case CopyDescription::FileType::CSV: { - switch (tableType) { - case TableType::NODE: - return countRowsInNodeCSVFile; - case TableType::REL: - return countRowsInRelCSVFile; - default: - throw NotImplementedException{"ReaderFunctions::getCountBlocksFunc"}; - } - } - case CopyDescription::FileType::PARQUET: { - return countRowsInParquetFile; - } - case CopyDescription::FileType::NPY: { - return countRowsInNPYFile; - } - default: { - throw NotImplementedException{"ReaderFunctions::getRowsCounterFunc"}; - } - } -} - -init_reader_data_func_t ReaderFunctions::getInitDataFunc( - CopyDescription::FileType fileType, TableType tableType) { - switch (fileType) { - case CopyDescription::FileType::CSV: { - switch (tableType) { - case TableType::NODE: - return initNodeCSVReadData; - case TableType::REL: - return initRelCSVReadData; - default: - throw NotImplementedException{"ReaderFunctions::getInitDataFunc"}; - } - } - case CopyDescription::FileType::PARQUET: { - return initParquetReadData; - } - case CopyDescription::FileType::NPY: { - return initNPYReadData; - } - default: { - throw NotImplementedException{"ReaderFunctions::getInitDataFunc"}; - } - } -} - -read_rows_func_t ReaderFunctions::getReadRowsFunc( - CopyDescription::FileType fileType, common::TableType tableType) { - switch (fileType) { - case CopyDescription::FileType::CSV: { - switch (tableType) { - case TableType::NODE: - return readRowsFromNodeCSVFile; - case TableType::REL: - return readRowsFromRelCSVFile; - default: - throw NotImplementedException{"ReaderFunctions::getReadRowsFunc"}; - } - } - case CopyDescription::FileType::PARQUET: { - return readRowsFromParquetFile; - } - case CopyDescription::FileType::NPY: { - return readRowsFromNPYFile; - } - default: { - throw NotImplementedException{"ReaderFunctions::getReadRowsFunc"}; - } - } -} - -void ReaderFunctions::validateNPYFiles( - const std::vector& paths, TableSchema* tableSchema) { - assert(!paths.empty() && paths.size() == tableSchema->getNumProperties()); - row_idx_t numRows; - for (auto i = 0u; i < paths.size(); i++) { - auto reader = make_unique(paths[i]); - if (i == 0) { - numRows = reader->getNumRows(); - } - auto tableType = tableSchema->getProperty(i)->getDataType(); - reader->validate(*tableType, numRows, tableSchema->tableName); } -} - -std::vector ReaderFunctions::countRowsInRelCSVFile( - const std::vector& paths, common::CSVReaderConfig csvReaderConfig, - catalog::TableSchema* tableSchema, MemoryManager* memoryManager) { - std::vector result(paths.size()); - for (auto i = 0u; i < paths.size(); i++) { - auto csvStreamingReader = - TableCopyUtils::createCSVReader(paths[i], &csvReaderConfig, tableSchema); - std::shared_ptr currBatch; - row_idx_t numRows = 0; - std::vector blocks; - while (true) { - TableCopyUtils::throwCopyExceptionIfNotOK(csvStreamingReader->ReadNext(&currBatch)); - if (currBatch == nullptr) { - break; - } - auto numRowsInBatch = currBatch->num_rows(); - numRows += numRowsInBatch; - blocks.push_back(numRowsInBatch); - } - result[i] = {numRows, blocks}; + for (auto& arrayVector : leftArrays) { + arrayVector.erase(arrayVector.begin(), arrayVector.begin() + arrayVectorIdx); } - return result; -} - -std::vector ReaderFunctions::countRowsInNodeCSVFile( - const std::vector& paths, common::CSVReaderConfig csvReaderConfig, - catalog::TableSchema* tableSchema, storage::MemoryManager* memoryManager) { - std::vector result(paths.size()); - auto dataChunk = getDataChunkToRead(tableSchema, memoryManager); - // We should add a countNumRows() API to csvReader, so that it doesn't need to read data to - // valueVector when counting the csv file. - for (auto i = 0u; i < paths.size(); i++) { - auto reader = std::make_unique(paths[i], csvReaderConfig, tableSchema); - row_idx_t numRows = 0; - std::vector blocks; - while (true) { - dataChunk->state->selVector->selectedSize = 0; - auto numRowsRead = reader->ParseCSV(*dataChunk); - if (numRowsRead == 0) { - break; - } - numRows += numRowsRead; - blocks.push_back(numRowsRead); - } - result[i] = {numRows, blocks}; - } - return result; -} - -std::vector ReaderFunctions::countRowsInParquetFile( - const std::vector& paths, CSVReaderConfig csvReaderConfig, - TableSchema* tableSchema, MemoryManager* memoryManager) { - std::vector result(paths.size()); - for (auto i = 0u; i < paths.size(); i++) { - std::unique_ptr reader = - TableCopyUtils::createParquetReader(paths[i], tableSchema); - auto metadata = reader->parquet_reader()->metadata(); - auto numBlocks = metadata->num_row_groups(); - std::vector blocks; - blocks.reserve(numBlocks); - for (auto blockIdx = 0; blockIdx < numBlocks; blockIdx++) { - blocks.push_back(metadata->RowGroup(blockIdx)->num_rows()); - } - result[i] = {(row_idx_t)metadata->num_rows(), blocks}; - } - return result; -} - -std::vector ReaderFunctions::countRowsInNPYFile( - const std::vector& paths, CSVReaderConfig csvReaderConfig, - TableSchema* tableSchema, MemoryManager* memoryManager) { - assert(!paths.empty()); - auto reader = make_unique(paths[0]); - auto numRows = reader->getNumRows(); - auto numBlocks = (block_idx_t)((numRows + CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY) / - CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY); - std::vector blocks(numBlocks, CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY); - return {FileBlocksInfo{numRows, blocks}}; -} - -std::unique_ptr ReaderFunctions::initRelCSVReadData( - const std::vector& paths, vector_idx_t fileIdx, CSVReaderConfig csvReaderConfig, - TableSchema* tableSchema) { - assert(fileIdx < paths.size()); - auto reader = TableCopyUtils::createCSVReader(paths[fileIdx], &csvReaderConfig, tableSchema); - return std::make_unique( - csvReaderConfig, tableSchema, fileIdx, std::move(reader)); -} - -std::unique_ptr ReaderFunctions::initNodeCSVReadData( - const std::vector& paths, vector_idx_t fileIdx, CSVReaderConfig csvReaderConfig, - TableSchema* tableSchema) { - assert(fileIdx < paths.size()); - auto reader = std::make_unique(paths[fileIdx], csvReaderConfig, tableSchema); - return std::make_unique( - csvReaderConfig, tableSchema, fileIdx, std::move(reader)); -} - -std::unique_ptr ReaderFunctions::initParquetReadData( - const std::vector& paths, vector_idx_t fileIdx, CSVReaderConfig csvReaderConfig, - TableSchema* tableSchema) { - assert(fileIdx < paths.size()); - auto reader = TableCopyUtils::createParquetReader(paths[fileIdx], tableSchema); - return std::make_unique( - csvReaderConfig, tableSchema, fileIdx, std::move(reader)); -} - -std::unique_ptr ReaderFunctions::initNPYReadData( - const std::vector& paths, vector_idx_t fileIdx, CSVReaderConfig csvReaderConfig, - TableSchema* tableSchema) { - auto reader = make_unique(paths); - return std::make_unique( - csvReaderConfig, tableSchema, fileIdx, std::move(reader)); -} - -void ReaderFunctions::readRowsFromRelCSVFile( - const ReaderFunctionData& functionData, block_idx_t blockIdx, DataChunk* dataChunkToRead) { - auto& readerData = (RelCSVReaderFunctionData&)(functionData); - std::shared_ptr recordBatch; - TableCopyUtils::throwCopyExceptionIfNotOK(readerData.reader->ReadNext(&recordBatch)); - assert(recordBatch); - for (auto i = 0u; i < dataChunkToRead->getNumValueVectors(); i++) { - ArrowColumnVector::setArrowColumn(dataChunkToRead->getValueVector(i).get(), - std::make_shared(recordBatch->column((int)i))); - } -} - -void ReaderFunctions::readRowsFromNodeCSVFile( - const ReaderFunctionData& functionData, block_idx_t blockIdx, DataChunk* dataChunkToRead) { - auto& readerData = reinterpret_cast(functionData); - readerData.csvReader->ParseCSV(*dataChunkToRead); -} - -void ReaderFunctions::readRowsFromParquetFile(const ReaderFunctionData& functionData, - block_idx_t blockIdx, common::DataChunk* dataChunkToRead) { - auto& readerData = (ParquetReaderFunctionData&)(functionData); - std::shared_ptr table; - TableCopyUtils::throwCopyExceptionIfNotOK( - readerData.reader->RowGroup(static_cast(blockIdx))->ReadTable(&table)); - assert(table); - for (auto i = 0u; i < dataChunkToRead->getNumValueVectors(); i++) { + for (auto i = 0u; i < dataChunk->getNumValueVectors(); i++) { + auto chunkArray = std::make_shared(std::move(arrayVectors[i])); ArrowColumnVector::setArrowColumn( - dataChunkToRead->getValueVector(i).get(), table->column((int)i)); + dataChunk->getValueVector(i).get(), std::move(chunkArray)); } + dataChunk->state->selVector->selectedSize = numRowsToAppend; } -void ReaderFunctions::readRowsFromNPYFile(const ReaderFunctionData& functionData, - common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead) { - auto& readerData = (NPYReaderFunctionData&)(functionData); - auto recordBatch = readerData.reader->readBlock(blockIdx); - for (auto i = 0u; i < dataChunkToRead->getNumValueVectors(); i++) { - ArrowColumnVector::setArrowColumn(dataChunkToRead->getValueVector(i).get(), - std::make_shared(recordBatch->column((int)i))); - } +void ReaderSharedState::initialize() { + validateFunc = ReaderFunctions::getValidateFunc(this->copyDescription->fileType); + initFunc = + ReaderFunctions::getInitDataFunc(this->copyDescription->fileType, tableSchema->tableType); + countBlocksFunc = ReaderFunctions::getCountBlocksFunc( + this->copyDescription->fileType, tableSchema->tableType); + readFunc = + ReaderFunctions::getReadRowsFunc(this->copyDescription->fileType, tableSchema->tableType); + readFuncData = + ReaderFunctions::getReadFuncData(this->copyDescription->fileType, tableSchema->tableType); } -std::unique_ptr ReaderFunctions::getDataChunkToRead( - catalog::TableSchema* tableSchema, MemoryManager* memoryManager) { - std::vector> valueVectorsToRead; - for (auto i = 0u; i < tableSchema->getNumProperties(); i++) { - auto property = tableSchema->getProperty(i); - if (property->getDataType()->getLogicalTypeID() != LogicalTypeID::SERIAL) { - valueVectorsToRead.emplace_back(std::make_unique( - *tableSchema->getProperty(i)->getDataType(), memoryManager)); - } - } - auto dataChunk = std::make_unique(valueVectorsToRead.size()); - for (auto i = 0u; i < valueVectorsToRead.size(); i++) { - dataChunk->insert(i, std::move(valueVectorsToRead[i])); - } - return dataChunk; -} - -void ReaderSharedState::validate() { +void ReaderSharedState::validate() const { validateFunc(copyDescription->filePaths, tableSchema); } void ReaderSharedState::countBlocks(MemoryManager* memoryManager) { - readFuncData = initFunc(copyDescription->filePaths, 0 /* fileIdx */, + initFunc(*readFuncData, copyDescription->filePaths, 0 /* fileIdx */, *copyDescription->csvReaderConfig, tableSchema); - blockInfos = countBlocksFunc( + fileInfos = countBlocksFunc( copyDescription->filePaths, *copyDescription->csvReaderConfig, tableSchema, memoryManager); - for (auto& blockInfo : blockInfos) { - numRows += blockInfo.numRows; + for (auto& fileInfo : fileInfos) { + numRows += fileInfo.numRows; } } -std::unique_ptr ReaderSharedState::getSerialMorsel(DataChunk* dataChunk) { - std::unique_lock xLck{mtx}; - readNextBlock(dataChunk); - if (leftArrowArrays.getLeftNumRows() == 0) { - dataChunk->state->selVector->selectedSize = 0; - return std::make_unique(); - } else { - auto numRowsToReturn = std::min(leftArrowArrays.getLeftNumRows(), DEFAULT_VECTOR_CAPACITY); - leftArrowArrays.appendToDataChunk(dataChunk, numRowsToReturn); - auto result = std::make_unique(currFileIdx, currBlockIdx, currRowIdx); - currRowIdx += numRowsToReturn; - return result; - } -} - -void ReaderSharedState::readNextBlock(common::DataChunk* dataChunk) { - if (leftArrowArrays.getLeftNumRows() == 0) { - auto morsel = getMorselOfNextBlock(); - if (morsel->fileIdx >= copyDescription->filePaths.size()) { - // No more blocks. - return; - } - if (morsel->fileIdx != readFuncData->fileIdx) { - readFuncData = initFunc(copyDescription->filePaths, morsel->fileIdx, - *copyDescription->csvReaderConfig, tableSchema); - } - readFunc(*readFuncData, morsel->blockIdx, dataChunk); - leftArrowArrays.appendFromDataChunk(dataChunk); - } -} - -std::unique_ptr ReaderSharedState::getParallelMorsel() { - std::unique_lock xLck{mtx}; +template +std::unique_ptr ReaderSharedState::getMorsel() { + std::unique_ptr morsel; + lockForParallel(); while (true) { - auto morsel = getMorselOfNextBlock(); - if (morsel->fileIdx >= copyDescription->filePaths.size()) { + if (currFileIdx >= copyDescription->filePaths.size()) { // No more blocks. + morsel = std::make_unique(); break; } - assert(morsel->fileIdx < blockInfos.size() && - morsel->blockIdx < blockInfos[morsel->fileIdx].numRowsPerBlock.size()); - currRowIdx += blockInfos[morsel->fileIdx].numRowsPerBlock[morsel->blockIdx]; - return morsel; + if (currBlockIdx < fileInfos[currFileIdx].numBlocks) { + morsel = std::make_unique(currFileIdx, currBlockIdx++); + break; + } + moveToNextFile(); } - return std::make_unique(); + unlockForParallel(); + return morsel; } -std::unique_ptr ReaderSharedState::getMorselOfNextBlock() { - if (currFileIdx >= blockInfos.size()) { - return std::make_unique(currFileIdx, INVALID_BLOCK_IDX, INVALID_ROW_IDX); - } - auto numBlocksInFile = blockInfos[currFileIdx].numRowsPerBlock.size(); - if (currBlockIdx >= numBlocksInFile) { - currFileIdx += copyDescription->fileType == CopyDescription::FileType::NPY ? - copyDescription->filePaths.size() : - 1; - currBlockIdx = 0; - } - return std::make_unique(currFileIdx, currBlockIdx++, currRowIdx); -} +template std::unique_ptr +ReaderSharedState::getMorsel(); +template std::unique_ptr +ReaderSharedState::getMorsel(); } // namespace processor } // namespace kuzu diff --git a/src/storage/copier/npy_reader.cpp b/src/storage/copier/npy_reader.cpp index 8e7bf0d44f..2a3e9c5fa0 100644 --- a/src/storage/copier/npy_reader.cpp +++ b/src/storage/copier/npy_reader.cpp @@ -224,11 +224,10 @@ std::shared_ptr NpyReader::getArrowType() const { } std::shared_ptr NpyReader::readBlock(block_idx_t blockIdx) const { - uint64_t rowNumber = CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY * blockIdx; + uint64_t rowNumber = DEFAULT_VECTOR_CAPACITY * blockIdx; auto rowPointer = getPointerToRow(rowNumber); auto arrowType = getArrowType(); - auto numRowsToRead = - std::min(CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY, getNumRows() - rowNumber); + auto numRowsToRead = std::min(DEFAULT_VECTOR_CAPACITY, getNumRows() - rowNumber); auto buffer = std::make_shared( rowPointer, numRowsToRead * arrowType->byte_width() * getNumElementsPerRow()); std::shared_ptr field; diff --git a/src/storage/copier/table_copy_utils.cpp b/src/storage/copier/table_copy_utils.cpp index 9db90d705e..9710859152 100644 --- a/src/storage/copier/table_copy_utils.cpp +++ b/src/storage/copier/table_copy_utils.cpp @@ -11,97 +11,6 @@ using namespace kuzu::common; namespace kuzu { namespace storage { -row_idx_t TableCopyUtils::countNumLines(CopyDescription& copyDescription, - catalog::TableSchema* tableSchema, - std::unordered_map& fileBlockInfos) { - switch (copyDescription.fileType) { - case CopyDescription::FileType::CSV: { - return countNumLinesCSV(copyDescription, tableSchema, fileBlockInfos); - } - case CopyDescription::FileType::PARQUET: { - return countNumLinesParquet(copyDescription, tableSchema, fileBlockInfos); - } - case CopyDescription::FileType::NPY: { - return countNumLinesNpy(copyDescription, tableSchema, fileBlockInfos); - } - default: { - throw CopyException{StringUtils::string_format("Unrecognized file type: {}.", - CopyDescription::getFileTypeName(copyDescription.fileType))}; - } - } -} - -row_idx_t TableCopyUtils::countNumLinesCSV(CopyDescription& copyDescription, - catalog::TableSchema* tableSchema, - std::unordered_map& fileBlockInfos) { - row_idx_t numRows = 0; - // TODO: Count each file as a task. - for (auto& filePath : copyDescription.filePaths) { - auto csvStreamingReader = - createCSVReader(filePath, copyDescription.csvReaderConfig.get(), tableSchema); - std::shared_ptr currBatch; - uint64_t numBlocks = 0; - std::vector numLinesPerBlock; - while (true) { - throwCopyExceptionIfNotOK(csvStreamingReader->ReadNext(&currBatch)); - if (currBatch == nullptr) { - break; - } - ++numBlocks; - auto currNumRows = currBatch->num_rows(); - numLinesPerBlock.push_back(currNumRows); - numRows += currNumRows; - } - fileBlockInfos.emplace(filePath, FileBlockInfo{numBlocks, numLinesPerBlock}); - } - return numRows; -} - -row_idx_t TableCopyUtils::countNumLinesParquet(CopyDescription& copyDescription, - catalog::TableSchema* tableSchema, - std::unordered_map& fileBlockInfos) { - row_idx_t numRows = 0; - for (auto& filePath : copyDescription.filePaths) { - std::unique_ptr reader = - createParquetReader(filePath, tableSchema); - auto metadata = reader->parquet_reader()->metadata(); - uint64_t numBlocks = metadata->num_row_groups(); - std::vector numLinesPerBlock(numBlocks); - for (auto blockIdx = 0; blockIdx < numBlocks; ++blockIdx) { - numLinesPerBlock[blockIdx] = metadata->RowGroup(blockIdx)->num_rows(); - } - fileBlockInfos.emplace(filePath, FileBlockInfo{numBlocks, numLinesPerBlock}); - numRows += metadata->num_rows(); - } - return numRows; -} - -offset_t TableCopyUtils::countNumLinesNpy(CopyDescription& copyDescription, - catalog::TableSchema* tableSchema, - std::unordered_map& fileBlockInfos) { - offset_t numRows = 0; - for (auto i = 0u; i < copyDescription.filePaths.size(); i++) { - auto filePath = copyDescription.filePaths[i]; - auto property = tableSchema->properties[i].get(); - auto reader = std::make_unique(filePath); - auto numNodesInFile = reader->getNumRows(); - if (i == 0) { - numRows = numNodesInFile; - } - reader->validate(*property->getDataType(), numRows, tableSchema->tableName); - auto numBlocks = (numNodesInFile + CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY - 1) / - CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY; - std::vector numLinesPerBlock(numBlocks); - for (auto blockIdx = 0; blockIdx < numBlocks; ++blockIdx) { - auto numLines = std::min(CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY, - numNodesInFile - blockIdx * CopyConstants::NUM_ROWS_PER_BLOCK_FOR_NPY); - numLinesPerBlock[blockIdx] = numLines; - } - fileBlockInfos.emplace(filePath, FileBlockInfo{numBlocks, numLinesPerBlock}); - } - return numRows; -} - static bool skipCopyForProperty(const Property& property) { return TableSchema::isReservedPropertyName(property.getName()) || property.getDataType()->getLogicalTypeID() == LogicalTypeID::SERIAL; diff --git a/test/test_files/tinysnb/function/table.test b/test/test_files/tinysnb/function/table.test index 99c0c67c82..f5c7f72760 100644 --- a/test/test_files/tinysnb/function/table.test +++ b/test/test_files/tinysnb/function/table.test @@ -79,4 +79,4 @@ height -LOG ReturnDBVersion -STATEMENT CALL db_version() RETURN version ---- 1 -v0.4.0 +v0.0.8.5