From 67cbaca20d10c2507841e983fc9df8fb0d3b87a2 Mon Sep 17 00:00:00 2001 From: Keenan Gugeler Date: Fri, 6 Oct 2023 20:03:20 -0400 Subject: [PATCH] reader: remove counting of blocks This allows us to move toward removing all upfront counting. None of the readers actually depend on upfront counting of the number of blocks. --- .../persistent/reader/csv/base_csv_reader.h | 1 + .../operator/persistent/reader_functions.h | 41 +++---- .../operator/persistent/reader_state.h | 34 ++--- src/processor/operator/persistent/reader.cpp | 8 +- .../persistent/reader/csv/base_csv_reader.cpp | 18 +++ .../persistent/reader/npy/npy_reader.cpp | 15 ++- .../operator/persistent/reader_functions.cpp | 116 ++++++------------ .../operator/persistent/reader_state.cpp | 65 +++++++--- 8 files changed, 140 insertions(+), 158 deletions(-) diff --git a/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h b/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h index 23a1bc280c..97001938ed 100644 --- a/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h +++ b/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h @@ -22,6 +22,7 @@ class BaseCSVReader { virtual uint64_t parseBlock(common::block_idx_t blockIdx, common::DataChunk& resultChunk) = 0; uint64_t countRows(); + bool isEOF() const; protected: template diff --git a/src/include/processor/operator/persistent/reader_functions.h b/src/include/processor/operator/persistent/reader_functions.h index 57bd0ede75..ac71424f85 100644 --- a/src/include/processor/operator/persistent/reader_functions.h +++ b/src/include/processor/operator/persistent/reader_functions.h @@ -18,30 +18,32 @@ struct ReaderFunctionData { virtual ~ReaderFunctionData() = default; - virtual inline bool emptyBlockImpliesDone() const { return false; } + // Called after receiving an empty block from readFunc. + virtual inline bool doneAfterEmptyBlock() const { return true; } + + // Called to determine if the current block has more data. virtual inline bool hasMoreToRead() const { return false; } }; -struct RelCSVReaderFunctionData : public ReaderFunctionData { +struct RelCSVReaderFunctionData final : public ReaderFunctionData { std::shared_ptr reader = nullptr; - - inline bool emptyBlockImpliesDone() const override { return true; } }; -struct SerialCSVReaderFunctionData : public ReaderFunctionData { +struct SerialCSVReaderFunctionData final : public ReaderFunctionData { std::unique_ptr reader = nullptr; }; -struct ParallelCSVReaderFunctionData : public ReaderFunctionData { +struct ParallelCSVReaderFunctionData final : public ReaderFunctionData { std::unique_ptr reader = nullptr; - // NOTE: It is *critical* that `emptyBlockImpliesDone` is false for Parallel CSV Reader! + // NOTE: It is *critical* that `doneAfterEmptyBlock` is false for Parallel CSV Reader! // Otherwise, when the parallel CSV reader gets a block that resides in the middle of a header // or a long line, it will return zero and cause rows to not be loaded! + inline bool doneAfterEmptyBlock() const override { return reader->isEOF(); } inline bool hasMoreToRead() const override { return reader->hasMoreToRead(); } }; -struct RelParquetReaderFunctionData : public ReaderFunctionData { +struct RelParquetReaderFunctionData final : public ReaderFunctionData { std::unique_ptr reader = nullptr; }; @@ -60,23 +62,18 @@ struct RDFReaderFunctionData : public ReaderFunctionData { std::unique_ptr reader = nullptr; }; -struct FileBlocksInfo { - common::row_idx_t numRows = 0; - common::block_idx_t numBlocks = 0; -}; - using validate_func_t = std::function; using init_reader_data_func_t = std::function; -using count_blocks_func_t = std::function( +using count_rows_func_t = std::function; using read_rows_func_t = std::function; struct ReaderFunctions { static validate_func_t getValidateFunc(common::FileType fileType); - static count_blocks_func_t getCountBlocksFunc( + static count_rows_func_t getCountRowsFunc( const common::ReaderConfig& config, common::TableType tableType); static init_reader_data_func_t getInitDataFunc( const common::ReaderConfig& config, common::TableType tableType); @@ -90,19 +87,17 @@ struct ReaderFunctions { } static void validateNPYFiles(const common::ReaderConfig& config); - static std::vector countRowsNoOp( - const common::ReaderConfig& config, storage::MemoryManager* memoryManager); - static std::vector countRowsInSerialCSVFile( + static common::row_idx_t countRowsNoOp( const common::ReaderConfig& config, storage::MemoryManager* memoryManager); - static std::vector countRowsInParallelCSVFile( + static common::row_idx_t countRowsInCSVFile( const common::ReaderConfig& config, storage::MemoryManager* memoryManager); - static std::vector countRowsInRelParquetFile( + static common::row_idx_t countRowsInRelParquetFile( const common::ReaderConfig& config, storage::MemoryManager* memoryManager); - static std::vector countRowsInParquetFile( + static common::row_idx_t countRowsInParquetFile( const common::ReaderConfig& config, storage::MemoryManager* memoryManager); - static std::vector countRowsInNPYFile( + static common::row_idx_t countRowsInNPYFile( const common::ReaderConfig& config, storage::MemoryManager* memoryManager); - static std::vector countRowsInRDFFile( + static common::row_idx_t countRowsInRDFFile( const common::ReaderConfig& config, storage::MemoryManager* memoryManager); static void initRelCSVReadData(ReaderFunctionData& funcData, common::vector_idx_t fileIdx, diff --git a/src/include/processor/operator/persistent/reader_state.h b/src/include/processor/operator/persistent/reader_state.h index 1f59834852..d777a1384d 100644 --- a/src/include/processor/operator/persistent/reader_state.h +++ b/src/include/processor/operator/persistent/reader_state.h @@ -40,46 +40,30 @@ class ReaderSharedState { }; explicit ReaderSharedState(std::unique_ptr readerConfig) - : readerConfig{std::move(readerConfig)}, numRows{0}, currFileIdx{0}, currBlockIdx{0}, - currRowIdx{0} {} + : readerConfig{std::move(readerConfig)}, numRows{common::INVALID_ROW_IDX}, currFileIdx{0}, + currBlockIdx{0}, currRowIdx{0} {} - void initialize(common::TableType tableType); + void initialize(storage::MemoryManager* memoryManager, common::TableType tableType); void validate() const; - void countBlocks(storage::MemoryManager* memoryManager); + void countRows(storage::MemoryManager* memoryManager); - inline void moveToNextFile() { - currFileIdx += - (readerConfig->fileType == common::FileType::NPY ? readerConfig->filePaths.size() : 1); - currBlockIdx = 0; - } + // Signal that we are done the given file. + // No-op if we have already moved to the next file. + template + void doneFile(common::vector_idx_t fileIdx); template std::unique_ptr getMorsel(); inline common::row_idx_t& getNumRowsRef() { return std::ref(numRows); } -private: - template - inline void lockForParallel() { - if constexpr (READ_MODE == ReadMode::PARALLEL) { - mtx.lock(); - } - } - template - inline void unlockForParallel() { - if constexpr (READ_MODE == ReadMode::PARALLEL) { - mtx.unlock(); - } - } - public: validate_func_t validateFunc; init_reader_data_func_t initFunc; - count_blocks_func_t countBlocksFunc; + count_rows_func_t countRowsFunc; std::shared_ptr readFuncData; common::row_idx_t numRows; - std::vector fileInfos; common::vector_idx_t currFileIdx; common::block_idx_t currBlockIdx; diff --git a/src/processor/operator/persistent/reader.cpp b/src/processor/operator/persistent/reader.cpp index aa0b385483..8a286bc9f3 100644 --- a/src/processor/operator/persistent/reader.cpp +++ b/src/processor/operator/persistent/reader.cpp @@ -10,9 +10,9 @@ namespace kuzu { namespace processor { void Reader::initGlobalStateInternal(ExecutionContext* context) { - sharedState->initialize(info->tableType); + sharedState->initialize(context->memoryManager, info->tableType); sharedState->validate(); - sharedState->countBlocks(context->memoryManager); + sharedState->countRows(context->memoryManager); } void Reader::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { @@ -80,8 +80,8 @@ void Reader::readNextDataChunk() { readFunc(*readFuncData, morsel->blockIdx, dataChunk.get()); if (dataChunk->state->selVector->selectedSize > 0) { leftArrowArrays.appendFromDataChunk(dataChunk.get()); - } else if (readFuncData->emptyBlockImpliesDone()) { - sharedState->moveToNextFile(); + } else if (readFuncData->doneAfterEmptyBlock()) { + sharedState->doneFile(morsel->fileIdx); } } unlockForSerial(); diff --git a/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp index 4273731d20..9388e8d352 100644 --- a/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp @@ -119,6 +119,24 @@ uint64_t BaseCSVReader::countRows() { goto in_quotes; } +bool BaseCSVReader::isEOF() const { + uint64_t offset = getFileOffset(); + uint64_t end = lseek(fd, 0, SEEK_END); + if (end == -1) { + // LCOV_EXCL_START + throw CopyException(StringUtils::string_format( + "Could not seek to end of file {}: {}", filePath, posixErrMessage())); + // LCOV_EXCL_END + } + if (lseek(fd, offset, SEEK_SET) == -1) { + // LCOV_EXCL_START + throw CopyException(StringUtils::string_format( + "Could not reset position of file {}: {}", filePath, posixErrMessage())); + // LCOV_EXCL_END + } + return offset >= end; +} + template void BaseCSVReader::addValue(Driver& driver, uint64_t rowNum, column_id_t columnIdx, std::string_view strVal, std::vector& escapePositions) { diff --git a/src/processor/operator/persistent/reader/npy/npy_reader.cpp b/src/processor/operator/persistent/reader/npy/npy_reader.cpp index 030bd9436d..9f235cf6e7 100644 --- a/src/processor/operator/persistent/reader/npy/npy_reader.cpp +++ b/src/processor/operator/persistent/reader/npy/npy_reader.cpp @@ -209,11 +209,16 @@ void NpyReader::validate(const LogicalType& type_, offset_t numRows) { void NpyReader::readBlock(block_idx_t blockIdx, common::ValueVector* vectorToRead) const { uint64_t rowNumber = DEFAULT_VECTOR_CAPACITY * blockIdx; - auto rowPointer = getPointerToRow(rowNumber); - auto numRowsToRead = std::min(DEFAULT_VECTOR_CAPACITY, getNumRows() - rowNumber); - memcpy( - vectorToRead->getData(), rowPointer, numRowsToRead * vectorToRead->getNumBytesPerValue()); - vectorToRead->state->selVector->selectedSize = numRowsToRead; + auto numRows = getNumRows(); + if (rowNumber >= numRows) { + vectorToRead->state->selVector->selectedSize = 0; + } else { + auto rowPointer = getPointerToRow(rowNumber); + auto numRowsToRead = std::min(DEFAULT_VECTOR_CAPACITY, getNumRows() - rowNumber); + memcpy(vectorToRead->getData(), rowPointer, + numRowsToRead * vectorToRead->getNumBytesPerValue()); + vectorToRead->state->selVector->selectedSize = numRowsToRead; + } } NpyMultiFileReader::NpyMultiFileReader(const std::vector& filePaths) { diff --git a/src/processor/operator/persistent/reader_functions.cpp b/src/processor/operator/persistent/reader_functions.cpp index 9c0a7797b2..82757d7474 100644 --- a/src/processor/operator/persistent/reader_functions.cpp +++ b/src/processor/operator/persistent/reader_functions.cpp @@ -24,16 +24,11 @@ validate_func_t ReaderFunctions::getValidateFunc(FileType fileType) { } } -count_blocks_func_t ReaderFunctions::getCountBlocksFunc( +count_rows_func_t ReaderFunctions::getCountRowsFunc( const ReaderConfig& config, TableType tableType) { switch (config.fileType) { case FileType::CSV: { - if (tableType == TableType::REL) { - return countRowsNoOp; - } else { - return config.csvReaderConfig->parallel ? countRowsInParallelCSVFile : - countRowsInSerialCSVFile; - } + return tableType == TableType::REL ? countRowsNoOp : countRowsInCSVFile; } case FileType::PARQUET: { switch (tableType) { @@ -174,98 +169,50 @@ void ReaderFunctions::validateNPYFiles(const common::ReaderConfig& config) { } } -std::vector ReaderFunctions::countRowsNoOp( +row_idx_t ReaderFunctions::countRowsNoOp( const common::ReaderConfig& config, MemoryManager* memoryManager) { - std::vector fileInfos( - config.getNumFiles(), {INVALID_ROW_IDX, INVALID_BLOCK_IDX}); - return fileInfos; + return INVALID_ROW_IDX; } -std::vector ReaderFunctions::countRowsInSerialCSVFile( +row_idx_t ReaderFunctions::countRowsInCSVFile( const common::ReaderConfig& config, storage::MemoryManager* memoryManager) { - std::vector fileInfos; - fileInfos.reserve(config.getNumFiles()); + row_idx_t numRows = 0; for (const auto& path : config.filePaths) { auto reader = make_unique(path, config); - row_idx_t numRowsInFile = reader->countRows(); - block_idx_t numBlocks = - (numRowsInFile + DEFAULT_VECTOR_CAPACITY - 1) / DEFAULT_VECTOR_CAPACITY; - FileBlocksInfo fileBlocksInfo{numRowsInFile, numBlocks}; - fileInfos.push_back(fileBlocksInfo); + numRows += reader->countRows(); } - return fileInfos; -} - -std::vector ReaderFunctions::countRowsInParallelCSVFile( - const common::ReaderConfig& config, storage::MemoryManager* memoryManager) { - std::vector fileInfos; - fileInfos.reserve(config.getNumFiles()); - for (const auto& path : config.filePaths) { - int fd = open(path.c_str(), O_RDONLY); - if (fd == -1) { - // LCOV_EXCL_START - throw CopyException( - StringUtils::string_format("Failed to open file {}: {}", path, posixErrMessage())); - // LCOV_EXCL_END - } - uint64_t length = lseek(fd, 0, SEEK_END); - close(fd); - if (length == -1) { - // LCOV_EXCL_START - throw CopyException(StringUtils::string_format( - "Failed to seek to end of file {}: {}", path, posixErrMessage())); - // LCOV_EXCL_END - } - auto reader = make_unique(path, config); - row_idx_t numRowsInFile = reader->countRows(); - block_idx_t numBlocks = - (length + CopyConstants::PARALLEL_BLOCK_SIZE - 1) / CopyConstants::PARALLEL_BLOCK_SIZE; - FileBlocksInfo fileBlocksInfo{numRowsInFile, numBlocks}; - fileInfos.push_back(fileBlocksInfo); - } - return fileInfos; + return numRows; } -std::vector ReaderFunctions::countRowsInRelParquetFile( +row_idx_t ReaderFunctions::countRowsInRelParquetFile( const common::ReaderConfig& config, MemoryManager* memoryManager) { - std::vector fileInfos; - fileInfos.reserve(config.getNumFiles()); + row_idx_t numRows = 0; for (const auto& path : config.filePaths) { std::unique_ptr reader = TableCopyUtils::createParquetReader(path, config); - auto metadata = reader->parquet_reader()->metadata(); - FileBlocksInfo fileBlocksInfo{ - (row_idx_t)metadata->num_rows(), (block_idx_t)metadata->num_row_groups()}; - fileInfos.push_back(fileBlocksInfo); + numRows += (row_idx_t)reader->parquet_reader()->metadata()->num_rows(); } - return fileInfos; + return numRows; } -std::vector ReaderFunctions::countRowsInParquetFile( +row_idx_t ReaderFunctions::countRowsInParquetFile( const common::ReaderConfig& config, MemoryManager* memoryManager) { - std::vector fileInfos; - fileInfos.reserve(config.filePaths.size()); + row_idx_t numRows = 0; for (const auto& path : config.filePaths) { auto reader = std::make_unique(path, memoryManager); - auto numRows = reader->getMetadata()->num_rows; - FileBlocksInfo fileBlocksInfo{ - (row_idx_t)numRows, (block_idx_t)reader->getMetadata()->row_groups.size()}; - fileInfos.push_back(fileBlocksInfo); + numRows += reader->getMetadata()->num_rows; } - return fileInfos; + return numRows; } -std::vector ReaderFunctions::countRowsInNPYFile( +row_idx_t ReaderFunctions::countRowsInNPYFile( const common::ReaderConfig& config, MemoryManager* memoryManager) { assert(config.getNumFiles() != 0); auto reader = make_unique(config.filePaths[0]); - auto numRows = reader->getNumRows(); - auto numBlocks = - (block_idx_t)((numRows + DEFAULT_VECTOR_CAPACITY - 1) / DEFAULT_VECTOR_CAPACITY); - return {{numRows, numBlocks}}; + return reader->getNumRows(); } -std::vector ReaderFunctions::countRowsInRDFFile( +row_idx_t ReaderFunctions::countRowsInRDFFile( const common::ReaderConfig& config, MemoryManager* memoryManager) { assert(config.getNumFiles() == 1); auto reader = make_unique(config.filePaths[0]); @@ -273,19 +220,16 @@ std::vector ReaderFunctions::countRowsInRDFFile( dataChunk->insert(0, std::make_unique(LogicalTypeID::STRING, memoryManager)); dataChunk->insert(1, std::make_unique(LogicalTypeID::STRING, memoryManager)); dataChunk->insert(2, std::make_unique(LogicalTypeID::STRING, memoryManager)); - row_idx_t numRowsInFile = 0; - block_idx_t numBlocks = 0; + row_idx_t numRows = 0; while (true) { dataChunk->resetAuxiliaryBuffer(); auto numRowsRead = reader->read(dataChunk.get()); if (numRowsRead == 0) { break; } - numRowsInFile += numRowsRead; - numBlocks++; + numRows += numRowsRead; } - FileBlocksInfo fileBlocksInfo{numRowsInFile, numBlocks}; - return {fileBlocksInfo}; + return numRows; } void ReaderFunctions::initRelCSVReadData(ReaderFunctionData& funcData, vector_idx_t fileIdx, @@ -382,6 +326,10 @@ void ReaderFunctions::readRowsFromParallelCSVFile( void ReaderFunctions::readRowsFromRelParquetFile(const ReaderFunctionData& functionData, block_idx_t blockIdx, common::DataChunk* dataChunkToRead) { auto& readerData = reinterpret_cast(functionData); + if (blockIdx >= readerData.reader->num_row_groups()) { + dataChunkToRead->state->selVector->selectedSize = 0; + return; + } std::shared_ptr table; TableCopyUtils::throwCopyExceptionIfNotOK( readerData.reader->RowGroup(static_cast(blockIdx))->ReadTable(&table)); @@ -396,9 +344,15 @@ void ReaderFunctions::readRowsFromRelParquetFile(const ReaderFunctionData& funct void ReaderFunctions::readRowsFromParquetFile(const ReaderFunctionData& functionData, block_idx_t blockIdx, common::DataChunk* dataChunkToRead) { auto& readerData = reinterpret_cast(functionData); - if (blockIdx != UINT64_MAX && - (readerData.state->groupIdxList.empty() || readerData.state->groupIdxList[0] != blockIdx)) { - readerData.reader->initializeScan(*readerData.state, {blockIdx}); + if (blockIdx != UINT64_MAX) { + if (blockIdx >= readerData.reader->getMetadata()->row_groups.size()) { + dataChunkToRead->state->selVector->selectedSize = 0; + return; + } + if (readerData.state->groupIdxList.empty() || + readerData.state->groupIdxList[0] != blockIdx) { + readerData.reader->initializeScan(*readerData.state, {blockIdx}); + } } readerData.reader->scan(*readerData.state, *dataChunkToRead); } diff --git a/src/processor/operator/persistent/reader_state.cpp b/src/processor/operator/persistent/reader_state.cpp index 3fd5a5a8dd..f9686a2c74 100644 --- a/src/processor/operator/persistent/reader_state.cpp +++ b/src/processor/operator/persistent/reader_state.cpp @@ -64,42 +64,67 @@ void LeftArrowArrays::appendToDataChunk(common::DataChunk* dataChunk, uint64_t n dataChunk->state->selVector->selectedSize = numRowsToAppend; } -void ReaderSharedState::initialize(TableType tableType) { +void ReaderSharedState::initialize(MemoryManager* memoryManager, TableType tableType) { validateFunc = ReaderFunctions::getValidateFunc(readerConfig->fileType); initFunc = ReaderFunctions::getInitDataFunc(*readerConfig, tableType); - countBlocksFunc = ReaderFunctions::getCountBlocksFunc(*readerConfig, tableType); + countRowsFunc = ReaderFunctions::getCountRowsFunc(*readerConfig, tableType); readFuncData = ReaderFunctions::getReadFuncData(*readerConfig, tableType); + + // Initialize for readers that share readFuncData. + if (readerConfig->fileType == FileType::CSV && !readerConfig->csvParallelRead(tableType)) { + initFunc(*readFuncData, 0 /* fileIdx */, *readerConfig, memoryManager); + } } void ReaderSharedState::validate() const { validateFunc(*readerConfig); } -void ReaderSharedState::countBlocks(MemoryManager* memoryManager) { - initFunc(*readFuncData, 0 /* fileIdx */, *readerConfig, memoryManager); - fileInfos = countBlocksFunc(*readerConfig, memoryManager); - for (auto& fileInfo : fileInfos) { - numRows += fileInfo.numRows; +void ReaderSharedState::countRows(MemoryManager* memoryManager) { + numRows = countRowsFunc(*readerConfig, memoryManager); +} + +template +static void lockForParallel(std::mutex& mtx) { + if constexpr (READ_MODE == ReaderSharedState::ReadMode::PARALLEL) { + mtx.lock(); + } +} + +template +static void unlockForParallel(std::mutex& mtx) { + if constexpr (READ_MODE == ReaderSharedState::ReadMode::PARALLEL) { + mtx.unlock(); + } +} + +template +void ReaderSharedState::doneFile(vector_idx_t fileIdx) { + lockForParallel(mtx); + if (fileIdx == currFileIdx) { + currFileIdx += + (readerConfig->fileType == common::FileType::NPY ? readerConfig->filePaths.size() : 1); + currBlockIdx = 0; } + unlockForParallel(mtx); } +template void ReaderSharedState::doneFile( + vector_idx_t fileIdx); +template void ReaderSharedState::doneFile( + vector_idx_t fileIdx); + template std::unique_ptr ReaderSharedState::getMorsel() { std::unique_ptr morsel; - lockForParallel(); - while (true) { - if (currFileIdx >= readerConfig->getNumFiles()) { - // No more blocks. - morsel = std::make_unique(); - break; - } - if (currBlockIdx < fileInfos[currFileIdx].numBlocks) { - morsel = std::make_unique(currFileIdx, currBlockIdx++); - break; - } - moveToNextFile(); + lockForParallel(mtx); + if (currFileIdx >= readerConfig->getNumFiles()) { + // No more blocks. + morsel = std::make_unique(); + } else { + morsel = std::make_unique(currFileIdx, currBlockIdx++); } - unlockForParallel(); + unlockForParallel(mtx); return morsel; }