From 2e7ec66f49fbcc16d6ac21885fbed478bdb5e955 Mon Sep 17 00:00:00 2001 From: MSebanc Date: Fri, 22 Mar 2024 15:14:20 -0700 Subject: [PATCH 1/8] Added progress for csv and parquet in_query_call operator I have read and agree to the CLA of the Kuzu repository. --- src/common/task_system/progress_bar.cpp | 3 +++ src/include/function/table/scan_functions.h | 5 ++++- src/include/function/table_functions.h | 8 +++++++ .../processor/operator/call/in_query_call.h | 2 ++ .../persistent/reader/csv/base_csv_reader.h | 5 +++-- .../reader/parquet/parquet_reader.h | 1 + src/processor/operator/call/in_query_call.cpp | 4 ++++ .../persistent/reader/csv/base_csv_reader.cpp | 4 ++++ .../reader/csv/parallel_csv_reader.cpp | 21 +++++++++++++++++-- .../reader/csv/serial_csv_reader.cpp | 19 +++++++++++++++-- .../reader/parquet/parquet_reader.cpp | 13 +++++++++++- 11 files changed, 77 insertions(+), 8 deletions(-) diff --git a/src/common/task_system/progress_bar.cpp b/src/common/task_system/progress_bar.cpp index 0c53154903..51d6a5ed36 100644 --- a/src/common/task_system/progress_bar.cpp +++ b/src/common/task_system/progress_bar.cpp @@ -30,6 +30,9 @@ void ProgressBar::finishPipeline() { return; } numPipelinesFinished++; + if (printing) { + std::cout << "\033[1A\033[2K\033[1B"; + } // This ensures that the progress bar is updated back to 0% after a pipeline is finished. prevCurPipelineProgress = -0.01; updateProgress(0.0); diff --git a/src/include/function/table/scan_functions.h b/src/include/function/table/scan_functions.h index 253037dbea..d600c48343 100644 --- a/src/include/function/table/scan_functions.h +++ b/src/include/function/table/scan_functions.h @@ -33,7 +33,10 @@ struct ScanFileSharedState : public ScanSharedState { ScanFileSharedState( common::ReaderConfig readerConfig, uint64_t numRows, main::ClientContext* context) - : ScanSharedState{std::move(readerConfig), numRows}, context{context} {} + : ScanSharedState{std::move(readerConfig), numRows}, context{context}, fileSize{0}, fileOffset{0} {} + + uint64_t fileSize; + uint64_t fileOffset; }; } // namespace function diff --git a/src/include/function/table_functions.h b/src/include/function/table_functions.h index 70753b49ab..052cad076a 100644 --- a/src/include/function/table_functions.h +++ b/src/include/function/table_functions.h @@ -63,6 +63,7 @@ using table_func_init_shared_t = using table_func_init_local_t = std::function( TableFunctionInitInput&, TableFuncSharedState*, storage::MemoryManager*)>; using table_func_can_parallel_t = std::function; +using table_func_progress_t = std::function; struct TableFunction final : public Function { table_func_t tableFunc; @@ -70,6 +71,7 @@ struct TableFunction final : public Function { table_func_init_shared_t initSharedStateFunc; table_func_init_local_t initLocalStateFunc; table_func_can_parallel_t canParallelFunc = [] { return true; }; + table_func_progress_t progressFunc = [](TableFuncSharedState* /*sharedState*/) { return 0.0; }; TableFunction() : Function{}, tableFunc{nullptr}, bindFunc{nullptr}, initSharedStateFunc{nullptr}, @@ -80,6 +82,12 @@ struct TableFunction final : public Function { : Function{FunctionType::TABLE, std::move(name), std::move(inputTypes)}, tableFunc{tableFunc}, bindFunc{bindFunc}, initSharedStateFunc{initSharedFunc}, initLocalStateFunc{initLocalFunc} {} + TableFunction(std::string name, table_func_t tableFunc, table_func_bind_t bindFunc, + table_func_init_shared_t initSharedFunc, table_func_init_local_t initLocalFunc, + table_func_progress_t progressFunc, std::vector inputTypes) + : Function{FunctionType::TABLE, std::move(name), std::move(inputTypes)}, + tableFunc{tableFunc}, bindFunc{bindFunc}, initSharedStateFunc{initSharedFunc}, + initLocalStateFunc{initLocalFunc}, progressFunc{progressFunc} {} inline std::string signatureToString() const override { return common::LogicalTypeUtils::toString(parameterTypeIDs); diff --git a/src/include/processor/operator/call/in_query_call.h b/src/include/processor/operator/call/in_query_call.h index 24179e1ebd..e533223361 100644 --- a/src/include/processor/operator/call/in_query_call.h +++ b/src/include/processor/operator/call/in_query_call.h @@ -74,6 +74,8 @@ class InQueryCall : public PhysicalOperator { bool getNextTuplesInternal(ExecutionContext* context) override; + double getProgress(ExecutionContext* context) const override; + std::unique_ptr clone() override { return std::make_unique(info.copy(), sharedState, id, paramsString); } diff --git a/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h b/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h index 9a90478173..59ff9933f8 100644 --- a/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h +++ b/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h @@ -26,6 +26,9 @@ class BaseCSVReader { uint64_t countRows(); bool isEOF() const; + uint64_t getFileSize(); + // Get the file offset of the current buffer position. + uint64_t getFileOffset() const; protected: template @@ -56,8 +59,6 @@ class BaseCSVReader { inline bool isNewLine(char c) { return c == '\n' || c == '\r'; } - // Get the file offset of the current buffer position. - uint64_t getFileOffset() const; uint64_t getLineNumber(); protected: diff --git a/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h b/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h index 9665fec268..46a34df6e6 100644 --- a/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h +++ b/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h @@ -95,6 +95,7 @@ struct ParquetScanSharedState final : public function::ScanFileSharedState { const common::ReaderConfig readerConfig, uint64_t numRows, main::ClientContext* context); std::vector> readers; + uint64_t numRowsRead; }; struct ParquetScanLocalState final : public function::TableFuncLocalState { diff --git a/src/processor/operator/call/in_query_call.cpp b/src/processor/operator/call/in_query_call.cpp index d3ddd81ce6..fb93471f56 100644 --- a/src/processor/operator/call/in_query_call.cpp +++ b/src/processor/operator/call/in_query_call.cpp @@ -65,5 +65,9 @@ bool InQueryCall::getNextTuplesInternal(ExecutionContext*) { return numTuplesScanned != 0; } +double InQueryCall::getProgress(ExecutionContext* /*context*/) const { + return info.function.progressFunc(sharedState->funcState.get()); +} + } // namespace processor } // namespace kuzu diff --git a/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp index 59e7499125..3e961821c7 100644 --- a/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp @@ -114,6 +114,10 @@ bool BaseCSVReader::isEOF() const { return getFileOffset() >= fileInfo->getFileSize(); } +uint64_t BaseCSVReader::getFileSize() { + return fileInfo->getFileSize(); +} + template void BaseCSVReader::addValue(Driver& driver, uint64_t rowNum, column_id_t columnIdx, std::string_view strVal, std::vector& escapePositions) { diff --git a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp index 4a11ad40d7..120c990456 100644 --- a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp @@ -124,6 +124,7 @@ static offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output) { auto result = parallelCSVLocalState->reader->continueBlock(outputChunk); outputChunk.state->selVector->selectedSize = result; if (result > 0) { + parallelCSVSharedState->fileOffset = parallelCSVLocalState->reader->getFileOffset(); return result; } } @@ -141,9 +142,11 @@ static offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output) { auto numRowsRead = parallelCSVLocalState->reader->parseBlock(blockIdx, outputChunk); outputChunk.state->selVector->selectedSize = numRowsRead; if (numRowsRead > 0) { + parallelCSVSharedState->fileOffset = parallelCSVLocalState->reader->getFileOffset(); return numRowsRead; } if (parallelCSVLocalState->reader->isEOF()) { + parallelCSVSharedState->fileOffset = parallelCSVLocalState->reader->getFileOffset(); parallelCSVSharedState->setFileComplete(parallelCSVLocalState->fileIdx); parallelCSVLocalState->reader = nullptr; } @@ -168,8 +171,12 @@ static std::unique_ptr initSharedState(TableFunctionInitIn auto bindData = reinterpret_cast(input.bindData); auto csvConfig = CSVReaderConfig::construct(bindData->config.options); row_idx_t numRows = 0; - return std::make_unique(bindData->config.copy(), numRows, + auto sharedState = std::make_unique(bindData->config.copy(), numRows, bindData->columnNames.size(), bindData->context, csvConfig.copy()); + auto reader = std::make_unique(sharedState->readerConfig.filePaths[0], + sharedState->csvReaderConfig.option.copy(), sharedState->numColumns, sharedState->context); + sharedState->fileSize = reader->getFileSize(); + return sharedState; } static std::unique_ptr initLocalState(TableFunctionInitInput& /*input*/, @@ -182,11 +189,21 @@ static std::unique_ptr initLocalState(TableFunctionInitInpu return localState; } +static double progressFunc(TableFuncSharedState* sharedState) { + auto state = reinterpret_cast(sharedState); + if (state->fileSize == 0) { + return 0.0; + } else if (state->fileOffset == state->fileSize) { + return 1.0; + } + return static_cast(state->fileOffset) / state->fileSize; +} + function_set ParallelCSVScan::getFunctionSet() { function_set functionSet; functionSet.push_back( std::make_unique(READ_CSV_PARALLEL_FUNC_NAME, tableFunc, bindFunc, - initSharedState, initLocalState, std::vector{LogicalTypeID::STRING})); + initSharedState, initLocalState, progressFunc, std::vector{LogicalTypeID::STRING})); return functionSet; } diff --git a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp index 1bfa025e91..e8356ee2c7 100644 --- a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp @@ -70,6 +70,7 @@ void SerialCSVScanSharedState::initReader(main::ClientContext* context) { static common::offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output) { auto serialCSVScanSharedState = reinterpret_cast(input.sharedState); serialCSVScanSharedState->read(output.dataChunk); + serialCSVScanSharedState->fileOffset = serialCSVScanSharedState->reader->getFileOffset(); return output.dataChunk.state->selVector->selectedSize; } @@ -115,8 +116,12 @@ static std::unique_ptr initSharedState(TableFunctionInitIn auto bindData = reinterpret_cast(input.bindData); auto csvConfig = CSVReaderConfig::construct(bindData->config.options); row_idx_t numRows = 0; - return std::make_unique(bindData->config.copy(), numRows, + auto sharedState = std::make_unique(bindData->config.copy(), numRows, bindData->columnNames.size(), csvConfig.copy(), bindData->context); + auto reader = std::make_unique(sharedState->readerConfig.filePaths[0], + sharedState->csvReaderConfig.option.copy(), sharedState->numColumns, sharedState->context); + sharedState->fileSize = reader->getFileSize(); + return sharedState; } static std::unique_ptr initLocalState(TableFunctionInitInput& /*input*/, @@ -124,11 +129,21 @@ static std::unique_ptr initLocalState(TableFunctionInitInpu return std::make_unique(); } +static double progressFunc(TableFuncSharedState* sharedState) { + auto state = reinterpret_cast(sharedState); + if (state->fileSize == 0) { + return 0.0; + } else if (state->fileOffset == state->fileSize) { + return 1.0; + } + return static_cast(state->fileOffset) / state->fileSize; +} + function_set SerialCSVScan::getFunctionSet() { function_set functionSet; functionSet.push_back( std::make_unique(READ_CSV_SERIAL_FUNC_NAME, tableFunc, bindFunc, - initSharedState, initLocalState, std::vector{LogicalTypeID::STRING})); + initSharedState, initLocalState, progressFunc, std::vector{LogicalTypeID::STRING})); return functionSet; } diff --git a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp index 76f2e3cd4d..a5c06964e8 100644 --- a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp +++ b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp @@ -563,6 +563,7 @@ ParquetScanSharedState::ParquetScanSharedState( : ScanFileSharedState{std::move(readerConfig), numRows, context} { readers.push_back( std::make_unique(this->readerConfig.filePaths[fileIdx], context)); + numRowsRead = 0; } static bool parquetSharedStateNext( @@ -601,9 +602,11 @@ static common::offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output do { parquetScanLocalState->reader->scan(*parquetScanLocalState->state, outputChunk); if (outputChunk.state->selVector->selectedSize > 0) { + parquetScanSharedState->numRowsRead += outputChunk.state->selVector->selectedSize; return outputChunk.state->selVector->selectedSize; } if (!parquetSharedStateNext(*parquetScanLocalState, *parquetScanSharedState)) { + parquetScanSharedState->numRowsRead += outputChunk.state->selVector->selectedSize; return outputChunk.state->selVector->selectedSize; } } while (true); @@ -674,11 +677,19 @@ static std::unique_ptr initLocalState( return localState; } +static double progressFunc(TableFuncSharedState* state) { + auto parquetScanSharedState = reinterpret_cast(state); + if (parquetScanSharedState->numRows == 0) { + return 0.0; + } + return parquetScanSharedState->numRowsRead / (double)parquetScanSharedState->numRows; +} + function_set ParquetScanFunction::getFunctionSet() { function_set functionSet; functionSet.push_back( std::make_unique(READ_PARQUET_FUNC_NAME, tableFunc, bindFunc, - initSharedState, initLocalState, std::vector{LogicalTypeID::STRING})); + initSharedState, initLocalState, progressFunc, std::vector{LogicalTypeID::STRING})); return functionSet; } From a71f42051cea9e7a9d9c05831789a272929fef25 Mon Sep 17 00:00:00 2001 From: MSebanc Date: Fri, 22 Mar 2024 15:21:00 -0700 Subject: [PATCH 2/8] Format fixes I have read and agree to the CLA of the Kuzu repository. --- src/include/function/table/scan_functions.h | 3 ++- .../persistent/reader/csv/parallel_csv_reader.cpp | 9 +++++---- .../operator/persistent/reader/csv/serial_csv_reader.cpp | 8 ++++---- .../persistent/reader/parquet/parquet_reader.cpp | 5 +++-- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/include/function/table/scan_functions.h b/src/include/function/table/scan_functions.h index d600c48343..c5bb792bd5 100644 --- a/src/include/function/table/scan_functions.h +++ b/src/include/function/table/scan_functions.h @@ -33,7 +33,8 @@ struct ScanFileSharedState : public ScanSharedState { ScanFileSharedState( common::ReaderConfig readerConfig, uint64_t numRows, main::ClientContext* context) - : ScanSharedState{std::move(readerConfig), numRows}, context{context}, fileSize{0}, fileOffset{0} {} + : ScanSharedState{std::move(readerConfig), numRows}, context{context}, fileSize{0}, + fileOffset{0} {} uint64_t fileSize; uint64_t fileOffset; diff --git a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp index 120c990456..3fc37487fb 100644 --- a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp @@ -171,8 +171,8 @@ static std::unique_ptr initSharedState(TableFunctionInitIn auto bindData = reinterpret_cast(input.bindData); auto csvConfig = CSVReaderConfig::construct(bindData->config.options); row_idx_t numRows = 0; - auto sharedState = std::make_unique(bindData->config.copy(), numRows, - bindData->columnNames.size(), bindData->context, csvConfig.copy()); + auto sharedState = std::make_unique(bindData->config.copy(), + numRows, bindData->columnNames.size(), bindData->context, csvConfig.copy()); auto reader = std::make_unique(sharedState->readerConfig.filePaths[0], sharedState->csvReaderConfig.option.copy(), sharedState->numColumns, sharedState->context); sharedState->fileSize = reader->getFileSize(); @@ -202,8 +202,9 @@ static double progressFunc(TableFuncSharedState* sharedState) { function_set ParallelCSVScan::getFunctionSet() { function_set functionSet; functionSet.push_back( - std::make_unique(READ_CSV_PARALLEL_FUNC_NAME, tableFunc, bindFunc, - initSharedState, initLocalState, progressFunc, std::vector{LogicalTypeID::STRING})); + std::make_unique(READ_CSV_PARALLEL_FUNC_NAME, tableFunc, + bindFunc, initSharedState, initLocalState, progressFunc, + std::vector{LogicalTypeID::STRING})); return functionSet; } diff --git a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp index e8356ee2c7..7cc7f3bb17 100644 --- a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp @@ -116,7 +116,7 @@ static std::unique_ptr initSharedState(TableFunctionInitIn auto bindData = reinterpret_cast(input.bindData); auto csvConfig = CSVReaderConfig::construct(bindData->config.options); row_idx_t numRows = 0; - auto sharedState = std::make_unique(bindData->config.copy(), numRows, + auto sharedState = std::make_unique(bindData->config.copy(), numRows, bindData->columnNames.size(), csvConfig.copy(), bindData->context); auto reader = std::make_unique(sharedState->readerConfig.filePaths[0], sharedState->csvReaderConfig.option.copy(), sharedState->numColumns, sharedState->context); @@ -141,9 +141,9 @@ static double progressFunc(TableFuncSharedState* sharedState) { function_set SerialCSVScan::getFunctionSet() { function_set functionSet; - functionSet.push_back( - std::make_unique(READ_CSV_SERIAL_FUNC_NAME, tableFunc, bindFunc, - initSharedState, initLocalState, progressFunc, std::vector{LogicalTypeID::STRING})); + functionSet.push_back(std::make_unique(READ_CSV_SERIAL_FUNC_NAME, tableFunc, + bindFunc, initSharedState, initLocalState, progressFunc, + std::vector{LogicalTypeID::STRING})); return functionSet; } diff --git a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp index a5c06964e8..d3b066ef5c 100644 --- a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp +++ b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp @@ -688,8 +688,9 @@ static double progressFunc(TableFuncSharedState* state) { function_set ParquetScanFunction::getFunctionSet() { function_set functionSet; functionSet.push_back( - std::make_unique(READ_PARQUET_FUNC_NAME, tableFunc, bindFunc, - initSharedState, initLocalState, progressFunc, std::vector{LogicalTypeID::STRING})); + std::make_unique(READ_PARQUET_FUNC_NAME, tableFunc, + bindFunc, initSharedState, initLocalState, progressFunc, + std::vector{LogicalTypeID::STRING})); return functionSet; } From 7fc41db160f713105599ed2a77e4b7c2be412146 Mon Sep 17 00:00:00 2001 From: MSebanc Date: Fri, 22 Mar 2024 15:27:01 -0700 Subject: [PATCH 3/8] format fixes I have read and agree to the CLA of the Kuzu repository. --- src/include/function/table/scan_functions.h | 2 +- .../operator/persistent/reader/csv/parallel_csv_reader.cpp | 7 +++---- .../operator/persistent/reader/parquet/parquet_reader.cpp | 7 +++---- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/include/function/table/scan_functions.h b/src/include/function/table/scan_functions.h index c5bb792bd5..72b4452267 100644 --- a/src/include/function/table/scan_functions.h +++ b/src/include/function/table/scan_functions.h @@ -34,7 +34,7 @@ struct ScanFileSharedState : public ScanSharedState { ScanFileSharedState( common::ReaderConfig readerConfig, uint64_t numRows, main::ClientContext* context) : ScanSharedState{std::move(readerConfig), numRows}, context{context}, fileSize{0}, - fileOffset{0} {} + fileOffset{0} {} uint64_t fileSize; uint64_t fileOffset; diff --git a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp index 3fc37487fb..6fcb6559ea 100644 --- a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp @@ -201,10 +201,9 @@ static double progressFunc(TableFuncSharedState* sharedState) { function_set ParallelCSVScan::getFunctionSet() { function_set functionSet; - functionSet.push_back( - std::make_unique(READ_CSV_PARALLEL_FUNC_NAME, tableFunc, - bindFunc, initSharedState, initLocalState, progressFunc, - std::vector{LogicalTypeID::STRING})); + functionSet.push_back(std::make_unique(READ_CSV_PARALLEL_FUNC_NAME, tableFunc, + bindFunc, initSharedState, initLocalState, progressFunc, + std::vector{LogicalTypeID::STRING})); return functionSet; } diff --git a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp index d3b066ef5c..914a9e5d26 100644 --- a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp +++ b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp @@ -687,10 +687,9 @@ static double progressFunc(TableFuncSharedState* state) { function_set ParquetScanFunction::getFunctionSet() { function_set functionSet; - functionSet.push_back( - std::make_unique(READ_PARQUET_FUNC_NAME, tableFunc, - bindFunc, initSharedState, initLocalState, progressFunc, - std::vector{LogicalTypeID::STRING})); + functionSet.push_back(std::make_unique(READ_PARQUET_FUNC_NAME, tableFunc, + bindFunc, initSharedState, initLocalState, progressFunc, + std::vector{LogicalTypeID::STRING})); return functionSet; } From 774b390a8c12802ce2b48e495b96632e2a0bb9bb Mon Sep 17 00:00:00 2001 From: MSebanc Date: Tue, 26 Mar 2024 14:44:51 -0700 Subject: [PATCH 4/8] Implemented feedback and added progress for python scans I have read and agree to the CLA of the Kuzu repository. --- src/common/task_system/progress_bar.cpp | 5 ++++- src/include/function/table/scan_functions.h | 7 ++---- .../reader/parquet/parquet_reader.h | 2 +- .../reader/csv/parallel_csv_reader.cpp | 20 +++++++++-------- .../reader/csv/serial_csv_reader.cpp | 22 ++++++++++++------- .../reader/parquet/parquet_reader.cpp | 16 +++++++++----- .../src_cpp/include/pandas/pandas_scan.h | 4 +++- .../python_api/src_cpp/pandas/pandas_scan.cpp | 12 +++++++++- .../src_cpp/pyarrow/pyarrow_scan.cpp | 13 +++++++++-- 9 files changed, 68 insertions(+), 33 deletions(-) diff --git a/src/common/task_system/progress_bar.cpp b/src/common/task_system/progress_bar.cpp index 51d6a5ed36..f4501e4b5e 100644 --- a/src/common/task_system/progress_bar.cpp +++ b/src/common/task_system/progress_bar.cpp @@ -40,10 +40,13 @@ void ProgressBar::finishPipeline() { void ProgressBar::updateProgress(double curPipelineProgress) { // Only update the progress bar if the progress has changed by at least 1%. - if (!trackProgress || curPipelineProgress - prevCurPipelineProgress < 0.01) { + if (!trackProgress) { return; } std::lock_guard lock(progressBarLock); + if (curPipelineProgress - prevCurPipelineProgress < 0.01) { + return; + } prevCurPipelineProgress = curPipelineProgress; if (printing) { std::cout << "\033[2A"; diff --git a/src/include/function/table/scan_functions.h b/src/include/function/table/scan_functions.h index 72b4452267..9c1baf9008 100644 --- a/src/include/function/table/scan_functions.h +++ b/src/include/function/table/scan_functions.h @@ -30,14 +30,11 @@ struct ScanSharedState : public BaseScanSharedState { struct ScanFileSharedState : public ScanSharedState { main::ClientContext* context; + uint64_t totalSize; ScanFileSharedState( common::ReaderConfig readerConfig, uint64_t numRows, main::ClientContext* context) - : ScanSharedState{std::move(readerConfig), numRows}, context{context}, fileSize{0}, - fileOffset{0} {} - - uint64_t fileSize; - uint64_t fileOffset; + : ScanSharedState{std::move(readerConfig), numRows}, context{context}, totalSize{0} {} }; } // namespace function diff --git a/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h b/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h index 46a34df6e6..00bbf6634c 100644 --- a/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h +++ b/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h @@ -95,7 +95,7 @@ struct ParquetScanSharedState final : public function::ScanFileSharedState { const common::ReaderConfig readerConfig, uint64_t numRows, main::ClientContext* context); std::vector> readers; - uint64_t numRowsRead; + uint64_t totalRowsGroups; }; struct ParquetScanLocalState final : public function::TableFuncLocalState { diff --git a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp index 6fcb6559ea..083f4a9b5f 100644 --- a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp @@ -124,7 +124,6 @@ static offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output) { auto result = parallelCSVLocalState->reader->continueBlock(outputChunk); outputChunk.state->selVector->selectedSize = result; if (result > 0) { - parallelCSVSharedState->fileOffset = parallelCSVLocalState->reader->getFileOffset(); return result; } } @@ -142,11 +141,9 @@ static offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output) { auto numRowsRead = parallelCSVLocalState->reader->parseBlock(blockIdx, outputChunk); outputChunk.state->selVector->selectedSize = numRowsRead; if (numRowsRead > 0) { - parallelCSVSharedState->fileOffset = parallelCSVLocalState->reader->getFileOffset(); return numRowsRead; } if (parallelCSVLocalState->reader->isEOF()) { - parallelCSVSharedState->fileOffset = parallelCSVLocalState->reader->getFileOffset(); parallelCSVSharedState->setFileComplete(parallelCSVLocalState->fileIdx); parallelCSVLocalState->reader = nullptr; } @@ -173,9 +170,11 @@ static std::unique_ptr initSharedState(TableFunctionInitIn row_idx_t numRows = 0; auto sharedState = std::make_unique(bindData->config.copy(), numRows, bindData->columnNames.size(), bindData->context, csvConfig.copy()); - auto reader = std::make_unique(sharedState->readerConfig.filePaths[0], - sharedState->csvReaderConfig.option.copy(), sharedState->numColumns, sharedState->context); - sharedState->fileSize = reader->getFileSize(); + for (auto filePath : sharedState->readerConfig.filePaths) { + auto reader = std::make_unique(filePath, sharedState->csvReaderConfig.option.copy(), + sharedState->numColumns, sharedState->context); + sharedState->totalSize += reader->getFileSize(); + } return sharedState; } @@ -191,12 +190,15 @@ static std::unique_ptr initLocalState(TableFunctionInitInpu static double progressFunc(TableFuncSharedState* sharedState) { auto state = reinterpret_cast(sharedState); - if (state->fileSize == 0) { + if (state->totalSize == 0) { return 0.0; - } else if (state->fileOffset == state->fileSize) { + } else if ((state->blockIdx * CopyConstants::PARALLEL_BLOCK_SIZE) * (state->fileIdx + 1) >= + state->totalSize) { return 1.0; } - return static_cast(state->fileOffset) / state->fileSize; + return static_cast( + (state->blockIdx * CopyConstants::PARALLEL_BLOCK_SIZE) * (state->fileIdx + 1)) / + state->totalSize; } function_set ParallelCSVScan::getFunctionSet() { diff --git a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp index 7cc7f3bb17..e69180ea84 100644 --- a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp @@ -70,7 +70,6 @@ void SerialCSVScanSharedState::initReader(main::ClientContext* context) { static common::offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output) { auto serialCSVScanSharedState = reinterpret_cast(input.sharedState); serialCSVScanSharedState->read(output.dataChunk); - serialCSVScanSharedState->fileOffset = serialCSVScanSharedState->reader->getFileOffset(); return output.dataChunk.state->selVector->selectedSize; } @@ -118,9 +117,11 @@ static std::unique_ptr initSharedState(TableFunctionInitIn row_idx_t numRows = 0; auto sharedState = std::make_unique(bindData->config.copy(), numRows, bindData->columnNames.size(), csvConfig.copy(), bindData->context); - auto reader = std::make_unique(sharedState->readerConfig.filePaths[0], - sharedState->csvReaderConfig.option.copy(), sharedState->numColumns, sharedState->context); - sharedState->fileSize = reader->getFileSize(); + for (auto filePath : sharedState->readerConfig.filePaths) { + auto reader = std::make_unique(filePath, + sharedState->csvReaderConfig.option.copy(), sharedState->numColumns, sharedState->context); + sharedState->totalSize += reader->getFileSize(); + } return sharedState; } @@ -131,12 +132,17 @@ static std::unique_ptr initLocalState(TableFunctionInitInpu static double progressFunc(TableFuncSharedState* sharedState) { auto state = reinterpret_cast(sharedState); - if (state->fileSize == 0) { + if (state->totalSize == 0) { return 0.0; - } else if (state->fileOffset == state->fileSize) { - return 1.0; + } else if (state->fileIdx >= state->readerConfig.getNumFiles()) { + return 1.0; + } + uint64_t totalReadSize = 0; + for (auto i = 0u; i < state->fileIdx; i++) { + totalReadSize += state->reader->getFileSize(); } - return static_cast(state->fileOffset) / state->fileSize; + totalReadSize += state->reader->getFileOffset(); + return static_cast(totalReadSize) / state->totalSize; } function_set SerialCSVScan::getFunctionSet() { diff --git a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp index 914a9e5d26..2ed347f980 100644 --- a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp +++ b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp @@ -563,7 +563,11 @@ ParquetScanSharedState::ParquetScanSharedState( : ScanFileSharedState{std::move(readerConfig), numRows, context} { readers.push_back( std::make_unique(this->readerConfig.filePaths[fileIdx], context)); - numRowsRead = 0; + totalRowsGroups = 0; + for (auto i = 0u; i < readerConfig.getNumFiles(); i++) { + auto reader = std::make_unique(readerConfig.filePaths[i], context); + totalRowsGroups += reader->getNumRowsGroups(); + } } static bool parquetSharedStateNext( @@ -602,11 +606,9 @@ static common::offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output do { parquetScanLocalState->reader->scan(*parquetScanLocalState->state, outputChunk); if (outputChunk.state->selVector->selectedSize > 0) { - parquetScanSharedState->numRowsRead += outputChunk.state->selVector->selectedSize; return outputChunk.state->selVector->selectedSize; } if (!parquetSharedStateNext(*parquetScanLocalState, *parquetScanSharedState)) { - parquetScanSharedState->numRowsRead += outputChunk.state->selVector->selectedSize; return outputChunk.state->selVector->selectedSize; } } while (true); @@ -679,10 +681,14 @@ static std::unique_ptr initLocalState( static double progressFunc(TableFuncSharedState* state) { auto parquetScanSharedState = reinterpret_cast(state); - if (parquetScanSharedState->numRows == 0) { + if (parquetScanSharedState->fileIdx >= parquetScanSharedState->readerConfig.getNumFiles()) { + return 1.0; + } + if (parquetScanSharedState->totalRowsGroups == 0) { return 0.0; } - return parquetScanSharedState->numRowsRead / (double)parquetScanSharedState->numRows; + return (double)(parquetScanSharedState->blockIdx * (parquetScanSharedState->fileIdx + 1)) / + parquetScanSharedState->totalRowsGroups; } function_set ParquetScanFunction::getFunctionSet() { diff --git a/tools/python_api/src_cpp/include/pandas/pandas_scan.h b/tools/python_api/src_cpp/include/pandas/pandas_scan.h index d7243c60ea..681f80f6b9 100644 --- a/tools/python_api/src_cpp/include/pandas/pandas_scan.h +++ b/tools/python_api/src_cpp/include/pandas/pandas_scan.h @@ -17,10 +17,12 @@ struct PandasScanLocalState : public function::TableFuncLocalState { }; struct PandasScanSharedState : public function::BaseScanSharedState { - explicit PandasScanSharedState(uint64_t numRows) : BaseScanSharedState{numRows}, position{0} {} + explicit PandasScanSharedState(uint64_t numRows) + : BaseScanSharedState{numRows}, position{0}, numReadRows{0} {} std::mutex lock; uint64_t position; + uint64_t numReadRows; }; struct PandasScanFunction { diff --git a/tools/python_api/src_cpp/pandas/pandas_scan.cpp b/tools/python_api/src_cpp/pandas/pandas_scan.cpp index f432ab2093..19ce1b374d 100644 --- a/tools/python_api/src_cpp/pandas/pandas_scan.cpp +++ b/tools/python_api/src_cpp/pandas/pandas_scan.cpp @@ -83,6 +83,7 @@ offset_t tableFunc( TableFuncInput& input, TableFuncOutput& output) { auto pandasScanData = reinterpret_cast(input.bindData); auto pandasLocalState = reinterpret_cast(input.localState); + auto pandasSharedState = reinterpret_cast(input.sharedState); if (pandasLocalState->start >= pandasLocalState->end) { if (!sharedStateNext(input.bindData, pandasLocalState, input.sharedState)) { @@ -97,6 +98,7 @@ offset_t tableFunc( } output.dataChunk.state->selVector->selectedSize = numValuesToOutput; pandasLocalState->start += numValuesToOutput; + pandasSharedState->numReadRows += numValuesToOutput; return numValuesToOutput; } @@ -109,9 +111,17 @@ std::vector> PandasScanFunctionData::copyC return result; } +static double progressFunc(TableFuncSharedState* sharedState) { + auto pandasSharedState = reinterpret_cast(sharedState); + if (pandasSharedState->numRows == 0) { + return 0.0; + } + return static_cast(pandasSharedState->numReadRows) / pandasSharedState->numRows; +} + static TableFunction getFunction() { return TableFunction(READ_PANDAS_FUNC_NAME, tableFunc, bindFunc, initSharedState, - initLocalState, std::vector{LogicalTypeID::POINTER}); + initLocalState, progressFunc, std::vector{LogicalTypeID::POINTER}); } function_set PandasScanFunction::getFunctionSet() { diff --git a/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp b/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp index e9739ea65a..520bb719cf 100644 --- a/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp +++ b/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp @@ -85,18 +85,27 @@ static common::offset_t tableFunc( return len; } +double progressFunc(function::TableFuncSharedState* sharedState) { + PyArrowTableScanSharedState* state = + dynamic_cast(sharedState); + if (state->chunks.size() == 0) { + return 0.0; + } + return static_cast(state->currentChunk) / state->chunks.size(); +} + function::function_set PyArrowTableScanFunction::getFunctionSet() { function_set functionSet; functionSet.push_back( std::make_unique(READ_PYARROW_FUNC_NAME, tableFunc, bindFunc, - initSharedState, initLocalState, std::vector{LogicalTypeID::POINTER})); + initSharedState, initLocalState, progressFunc, std::vector{LogicalTypeID::POINTER})); return functionSet; } TableFunction PyArrowTableScanFunction::getFunction() { return TableFunction(READ_PYARROW_FUNC_NAME, tableFunc, bindFunc, initSharedState, - initLocalState, std::vector{LogicalTypeID::POINTER}); + initLocalState, progressFunc, std::vector{LogicalTypeID::POINTER}); } } // namespace kuzu From 8c91513dce83817b62dd0eed29a076d0b0bc4d62 Mon Sep 17 00:00:00 2001 From: MSebanc Date: Tue, 26 Mar 2024 14:57:14 -0700 Subject: [PATCH 5/8] format fixes I have read and agree to the CLA of the Kuzu repository. --- .../operator/persistent/reader/csv/parallel_csv_reader.cpp | 7 ++++--- .../operator/persistent/reader/csv/serial_csv_reader.cpp | 5 +++-- .../operator/persistent/reader/parquet/parquet_reader.cpp | 6 +++--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp index 083f4a9b5f..f825a36d88 100644 --- a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp @@ -171,10 +171,11 @@ static std::unique_ptr initSharedState(TableFunctionInitIn auto sharedState = std::make_unique(bindData->config.copy(), numRows, bindData->columnNames.size(), bindData->context, csvConfig.copy()); for (auto filePath : sharedState->readerConfig.filePaths) { - auto reader = std::make_unique(filePath, sharedState->csvReaderConfig.option.copy(), - sharedState->numColumns, sharedState->context); + auto reader = std::make_unique(filePath, + sharedState->csvReaderConfig.option.copy(), sharedState->numColumns, + sharedState->context); sharedState->totalSize += reader->getFileSize(); - } + } return sharedState; } diff --git a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp index e69180ea84..65fd4023dd 100644 --- a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp @@ -118,8 +118,9 @@ static std::unique_ptr initSharedState(TableFunctionInitIn auto sharedState = std::make_unique(bindData->config.copy(), numRows, bindData->columnNames.size(), csvConfig.copy(), bindData->context); for (auto filePath : sharedState->readerConfig.filePaths) { - auto reader = std::make_unique(filePath, - sharedState->csvReaderConfig.option.copy(), sharedState->numColumns, sharedState->context); + auto reader = + std::make_unique(filePath, sharedState->csvReaderConfig.option.copy(), + sharedState->numColumns, sharedState->context); sharedState->totalSize += reader->getFileSize(); } return sharedState; diff --git a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp index 2ed347f980..be0f4c0099 100644 --- a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp +++ b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp @@ -565,9 +565,9 @@ ParquetScanSharedState::ParquetScanSharedState( std::make_unique(this->readerConfig.filePaths[fileIdx], context)); totalRowsGroups = 0; for (auto i = 0u; i < readerConfig.getNumFiles(); i++) { - auto reader = std::make_unique(readerConfig.filePaths[i], context); - totalRowsGroups += reader->getNumRowsGroups(); - } + auto reader = std::make_unique(readerConfig.filePaths[i], context); + totalRowsGroups += reader->getNumRowsGroups(); + } } static bool parquetSharedStateNext( From d83119b163d0c72ea3f1bd503272cbdd8dad3eaf Mon Sep 17 00:00:00 2001 From: MSebanc Date: Tue, 26 Mar 2024 14:59:10 -0700 Subject: [PATCH 6/8] format fixes I have read and agree to the CLA of the Kuzu repository. --- .../operator/persistent/reader/csv/serial_csv_reader.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp index 65fd4023dd..e7f815552c 100644 --- a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp @@ -136,8 +136,8 @@ static double progressFunc(TableFuncSharedState* sharedState) { if (state->totalSize == 0) { return 0.0; } else if (state->fileIdx >= state->readerConfig.getNumFiles()) { - return 1.0; - } + return 1.0; + } uint64_t totalReadSize = 0; for (auto i = 0u; i < state->fileIdx; i++) { totalReadSize += state->reader->getFileSize(); From 0b2ff6cb7818fc84cecdfc2f4d691c4e691bfa7e Mon Sep 17 00:00:00 2001 From: MSebanc Date: Thu, 28 Mar 2024 13:05:41 -0700 Subject: [PATCH 7/8] Implemented feedback I have read and agree to the CLA of the Kuzu repository. --- src/common/task_system/progress_bar.cpp | 2 +- .../reader/csv/parallel_csv_reader.h | 3 +- .../persistent/reader/csv/serial_csv_reader.h | 3 +- .../reader/parquet/parquet_reader.h | 1 + .../reader/csv/parallel_csv_reader.cpp | 25 ++++++----- .../reader/csv/serial_csv_reader.cpp | 15 +++---- .../reader/parquet/parquet_reader.cpp | 44 +++++++++++-------- .../python_api/src_cpp/pandas/pandas_scan.cpp | 15 ++++--- .../src_cpp/pyarrow/pyarrow_scan.cpp | 6 +-- 9 files changed, 61 insertions(+), 53 deletions(-) diff --git a/src/common/task_system/progress_bar.cpp b/src/common/task_system/progress_bar.cpp index f4501e4b5e..e571a8a58c 100644 --- a/src/common/task_system/progress_bar.cpp +++ b/src/common/task_system/progress_bar.cpp @@ -39,11 +39,11 @@ void ProgressBar::finishPipeline() { } void ProgressBar::updateProgress(double curPipelineProgress) { - // Only update the progress bar if the progress has changed by at least 1%. if (!trackProgress) { return; } std::lock_guard lock(progressBarLock); + // Only update the progress bar if the progress has changed by at least 1%. if (curPipelineProgress - prevCurPipelineProgress < 0.01) { return; } diff --git a/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h b/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h index 2294d26113..90709b2c20 100644 --- a/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h +++ b/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h @@ -39,11 +39,12 @@ struct ParallelCSVScanSharedState final : public function::ScanFileSharedState { explicit ParallelCSVScanSharedState(common::ReaderConfig readerConfig, uint64_t numRows, uint64_t numColumns, main::ClientContext* context, common::CSVReaderConfig csvReaderConfig) : ScanFileSharedState{std::move(readerConfig), numRows, context}, numColumns{numColumns}, - csvReaderConfig{std::move(csvReaderConfig)} {} + numBlocksReadByFiles{0}, csvReaderConfig{std::move(csvReaderConfig)} {} void setFileComplete(uint64_t completedFileIdx); uint64_t numColumns; + uint64_t numBlocksReadByFiles = 0; common::CSVReaderConfig csvReaderConfig; }; diff --git a/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h b/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h index 289d2a4007..c914a43454 100644 --- a/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h +++ b/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h @@ -25,12 +25,13 @@ class SerialCSVReader final : public BaseCSVReader { struct SerialCSVScanSharedState final : public function::ScanFileSharedState { std::unique_ptr reader; uint64_t numColumns; + uint64_t totalReadSizeByFile; common::CSVReaderConfig csvReaderConfig; SerialCSVScanSharedState(common::ReaderConfig readerConfig, uint64_t numRows, uint64_t numColumns, common::CSVReaderConfig csvReaderConfig, main::ClientContext* context) : ScanFileSharedState{std::move(readerConfig), numRows, context}, numColumns{numColumns}, - csvReaderConfig{std::move(csvReaderConfig)} { + totalReadSizeByFile{0}, csvReaderConfig{std::move(csvReaderConfig)} { initReader(context); } diff --git a/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h b/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h index 00bbf6634c..bfe624de11 100644 --- a/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h +++ b/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h @@ -96,6 +96,7 @@ struct ParquetScanSharedState final : public function::ScanFileSharedState { std::vector> readers; uint64_t totalRowsGroups; + uint64_t numBlocksReadByFiles; }; struct ParquetScanLocalState final : public function::TableFuncLocalState { diff --git a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp index f825a36d88..11a8db8f05 100644 --- a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp @@ -109,6 +109,7 @@ bool ParallelCSVReader::finishedBlock() const { void ParallelCSVScanSharedState::setFileComplete(uint64_t completedFileIdx) { std::lock_guard guard{lock}; if (completedFileIdx == fileIdx) { + numBlocksReadByFiles += blockIdx; blockIdx = 0; fileIdx++; } @@ -116,8 +117,8 @@ void ParallelCSVScanSharedState::setFileComplete(uint64_t completedFileIdx) { static offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output) { auto& outputChunk = output.dataChunk; - auto parallelCSVLocalState = reinterpret_cast(input.localState); - auto parallelCSVSharedState = reinterpret_cast(input.sharedState); + auto parallelCSVLocalState = ku_dynamic_cast(input.localState); + auto parallelCSVSharedState = ku_dynamic_cast(input.sharedState); do { if (parallelCSVLocalState->reader != nullptr && parallelCSVLocalState->reader->hasMoreToRead()) { @@ -152,7 +153,7 @@ static offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output) { static std::unique_ptr bindFunc( main::ClientContext* /*context*/, TableFuncBindInput* input) { - auto scanInput = reinterpret_cast(input); + auto scanInput = ku_dynamic_cast(input); std::vector detectedColumnNames; std::vector detectedColumnTypes; SerialCSVScan::bindColumns(scanInput, detectedColumnNames, detectedColumnTypes); @@ -165,7 +166,7 @@ static std::unique_ptr bindFunc( } static std::unique_ptr initSharedState(TableFunctionInitInput& input) { - auto bindData = reinterpret_cast(input.bindData); + auto bindData = ku_dynamic_cast(input.bindData); auto csvConfig = CSVReaderConfig::construct(bindData->config.options); row_idx_t numRows = 0; auto sharedState = std::make_unique(bindData->config.copy(), @@ -182,7 +183,7 @@ static std::unique_ptr initSharedState(TableFunctionInitIn static std::unique_ptr initLocalState(TableFunctionInitInput& /*input*/, TableFuncSharedState* state, storage::MemoryManager* /*mm*/) { auto localState = std::make_unique(); - auto sharedState = reinterpret_cast(state); + auto sharedState = ku_dynamic_cast(state); localState->reader = std::make_unique(sharedState->readerConfig.filePaths[0], sharedState->csvReaderConfig.option.copy(), sharedState->numColumns, sharedState->context); localState->fileIdx = 0; @@ -190,16 +191,16 @@ static std::unique_ptr initLocalState(TableFunctionInitInpu } static double progressFunc(TableFuncSharedState* sharedState) { - auto state = reinterpret_cast(sharedState); + auto state = ku_dynamic_cast(sharedState); + if (state->fileIdx >= state->readerConfig.getNumFiles()) { + return 1.0; + } if (state->totalSize == 0) { return 0.0; - } else if ((state->blockIdx * CopyConstants::PARALLEL_BLOCK_SIZE) * (state->fileIdx + 1) >= - state->totalSize) { - return 1.0; } - return static_cast( - (state->blockIdx * CopyConstants::PARALLEL_BLOCK_SIZE) * (state->fileIdx + 1)) / - state->totalSize; + uint64_t totalReadSize = + (state->numBlocksReadByFiles + state->blockIdx) * CopyConstants::PARALLEL_BLOCK_SIZE; + return static_cast(totalReadSize) / state->totalSize; } function_set ParallelCSVScan::getFunctionSet() { diff --git a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp index e7f815552c..eeb5e3b4e3 100644 --- a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp @@ -55,6 +55,7 @@ void SerialCSVScanSharedState::read(DataChunk& outputChunk) { if (numRows > 0) { return; } + totalReadSizeByFile += reader->getFileSize(); fileIdx++; initReader(context); } while (true); @@ -68,7 +69,7 @@ void SerialCSVScanSharedState::initReader(main::ClientContext* context) { } static common::offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output) { - auto serialCSVScanSharedState = reinterpret_cast(input.sharedState); + auto serialCSVScanSharedState = ku_dynamic_cast(input.sharedState); serialCSVScanSharedState->read(output.dataChunk); return output.dataChunk.state->selVector->selectedSize; } @@ -99,7 +100,7 @@ void SerialCSVScan::bindColumns(const ScanTableFuncBindInput* bindInput, static std::unique_ptr bindFunc( main::ClientContext* /*context*/, TableFuncBindInput* input) { - auto scanInput = reinterpret_cast(input); + auto scanInput = ku_dynamic_cast(input); std::vector detectedColumnNames; std::vector detectedColumnTypes; SerialCSVScan::bindColumns(scanInput, detectedColumnNames, detectedColumnTypes); @@ -112,7 +113,7 @@ static std::unique_ptr bindFunc( } static std::unique_ptr initSharedState(TableFunctionInitInput& input) { - auto bindData = reinterpret_cast(input.bindData); + auto bindData = ku_dynamic_cast(input.bindData); auto csvConfig = CSVReaderConfig::construct(bindData->config.options); row_idx_t numRows = 0; auto sharedState = std::make_unique(bindData->config.copy(), numRows, @@ -132,17 +133,13 @@ static std::unique_ptr initLocalState(TableFunctionInitInpu } static double progressFunc(TableFuncSharedState* sharedState) { - auto state = reinterpret_cast(sharedState); + auto state = ku_dynamic_cast(sharedState); if (state->totalSize == 0) { return 0.0; } else if (state->fileIdx >= state->readerConfig.getNumFiles()) { return 1.0; } - uint64_t totalReadSize = 0; - for (auto i = 0u; i < state->fileIdx; i++) { - totalReadSize += state->reader->getFileSize(); - } - totalReadSize += state->reader->getFileOffset(); + uint64_t totalReadSize = state->totalReadSizeByFile + state->reader->getFileOffset(); return static_cast(totalReadSize) / state->totalSize; } diff --git a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp index be0f4c0099..b605e53283 100644 --- a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp +++ b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp @@ -51,8 +51,8 @@ bool ParquetReader::scanInternal(ParquetReaderScanState& state, DataChunk& resul state.currentGroup++; state.groupOffset = 0; - auto& trans = - reinterpret_cast(*state.thriftFileProto->getTransport()); + auto& trans = ku_dynamic_cast( + *state.thriftFileProto->getTransport()); trans.ClearPrefetch(); state.currentGroupPrefetched = false; @@ -67,7 +67,8 @@ bool ParquetReader::scanInternal(ParquetReaderScanState& state, DataChunk& resul auto fileColIdx = colIdx; - auto rootReader = reinterpret_cast(state.rootReader.get()); + auto rootReader = + ku_dynamic_cast(state.rootReader.get()); toScanCompressedBytes += rootReader->getChildReader(fileColIdx)->getTotalCompressedSize(); } @@ -99,7 +100,7 @@ bool ParquetReader::scanInternal(ParquetReaderScanState& state, DataChunk& resul // Prefetch column-wise. for (auto colIdx = 0u; colIdx < result.getNumValueVectors(); colIdx++) { auto fileColIdx = colIdx; - auto rootReader = reinterpret_cast(state.rootReader.get()); + auto rootReader = ku_dynamic_cast(state.rootReader.get()); rootReader->getChildReader(fileColIdx) ->registerPrefetch(trans, true /* lazy fetch */); @@ -136,7 +137,7 @@ bool ParquetReader::scanInternal(ParquetReaderScanState& state, DataChunk& resul auto definePtr = (uint8_t*)state.defineBuf.ptr; auto repeatPtr = (uint8_t*)state.repeatBuf.ptr; - auto rootReader = reinterpret_cast(state.rootReader.get()); + auto rootReader = ku_dynamic_cast(state.rootReader.get()); for (auto colIdx = 0u; colIdx < result.getNumValueVectors(); colIdx++) { auto fileColIdx = colIdx; @@ -168,7 +169,7 @@ void ParquetReader::scan(processor::ParquetReaderScanState& state, DataChunk& re void ParquetReader::initMetadata() { auto fileInfo = context->getVFSUnsafe()->openFile(filePath, O_RDONLY, context); auto proto = createThriftProtocol(fileInfo.get(), false); - auto& transport = reinterpret_cast(*proto->getTransport()); + auto& transport = ku_dynamic_cast(*proto->getTransport()); auto fileSize = transport.GetSize(); // LCOV_EXCL_START if (fileSize < 12) { @@ -564,10 +565,11 @@ ParquetScanSharedState::ParquetScanSharedState( readers.push_back( std::make_unique(this->readerConfig.filePaths[fileIdx], context)); totalRowsGroups = 0; - for (auto i = 0u; i < readerConfig.getNumFiles(); i++) { - auto reader = std::make_unique(readerConfig.filePaths[i], context); + for (auto i = fileIdx; i < this->readerConfig.getNumFiles(); i++) { + auto reader = std::make_unique(this->readerConfig.filePaths[i], context); totalRowsGroups += reader->getNumRowsGroups(); } + numBlocksReadByFiles = 0; } static bool parquetSharedStateNext( @@ -584,6 +586,8 @@ static bool parquetSharedStateNext( sharedState.blockIdx++; return true; } else { + sharedState.numBlocksReadByFiles += + sharedState.readers[sharedState.fileIdx]->getNumRowsGroups(); sharedState.blockIdx = 0; sharedState.fileIdx++; if (sharedState.fileIdx >= sharedState.readerConfig.getNumFiles()) { @@ -601,8 +605,8 @@ static common::offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output if (input.localState == nullptr) { return 0; } - auto parquetScanLocalState = reinterpret_cast(input.localState); - auto parquetScanSharedState = reinterpret_cast(input.sharedState); + auto parquetScanLocalState = ku_dynamic_cast(input.localState); + auto parquetScanSharedState = ku_dynamic_cast(input.sharedState); do { parquetScanLocalState->reader->scan(*parquetScanLocalState->state, outputChunk); if (outputChunk.state->selVector->selectedSize > 0) { @@ -640,7 +644,8 @@ static void bindColumns(const ScanTableFuncBindInput* bindInput, static std::unique_ptr bindFunc( main::ClientContext* /*context*/, function::TableFuncBindInput* input) { - auto scanInput = reinterpret_cast(input); + auto scanInput = + ku_dynamic_cast(input); std::vector detectedColumnNames; std::vector detectedColumnTypes; bindColumns(scanInput, detectedColumnNames, detectedColumnTypes); @@ -658,7 +663,7 @@ static std::unique_ptr bindFunc( static std::unique_ptr initSharedState( TableFunctionInitInput& input) { - auto parquetScanBindData = reinterpret_cast(input.bindData); + auto parquetScanBindData = ku_dynamic_cast(input.bindData); row_idx_t numRows = 0; for (const auto& path : parquetScanBindData->config.filePaths) { auto reader = std::make_unique(path, parquetScanBindData->context); @@ -671,7 +676,8 @@ static std::unique_ptr initSharedState( static std::unique_ptr initLocalState( TableFunctionInitInput& /*input*/, TableFuncSharedState* state, storage::MemoryManager* /*mm*/) { - auto parquetScanSharedState = reinterpret_cast(state); + auto parquetScanSharedState = + ku_dynamic_cast(state); auto localState = std::make_unique(); if (!parquetSharedStateNext(*localState, *parquetScanSharedState)) { return nullptr; @@ -679,16 +685,16 @@ static std::unique_ptr initLocalState( return localState; } -static double progressFunc(TableFuncSharedState* state) { - auto parquetScanSharedState = reinterpret_cast(state); - if (parquetScanSharedState->fileIdx >= parquetScanSharedState->readerConfig.getNumFiles()) { +static double progressFunc(TableFuncSharedState* sharedState) { + auto state = ku_dynamic_cast(sharedState); + if (state->fileIdx >= state->readerConfig.getNumFiles()) { return 1.0; } - if (parquetScanSharedState->totalRowsGroups == 0) { + if (state->totalRowsGroups == 0) { return 0.0; } - return (double)(parquetScanSharedState->blockIdx * (parquetScanSharedState->fileIdx + 1)) / - parquetScanSharedState->totalRowsGroups; + uint64_t totalReadSize = state->numBlocksReadByFiles + state->blockIdx; + return static_cast(totalReadSize) / state->totalRowsGroups; } function_set ParquetScanFunction::getFunctionSet() { diff --git a/tools/python_api/src_cpp/pandas/pandas_scan.cpp b/tools/python_api/src_cpp/pandas/pandas_scan.cpp index 19ce1b374d..db418f1f34 100644 --- a/tools/python_api/src_cpp/pandas/pandas_scan.cpp +++ b/tools/python_api/src_cpp/pandas/pandas_scan.cpp @@ -35,7 +35,7 @@ std::unique_ptr bindFunc( bool sharedStateNext(const TableFuncBindData* /*bindData*/, PandasScanLocalState* localState, TableFuncSharedState* sharedState) { - auto pandasSharedState = reinterpret_cast(sharedState); + auto pandasSharedState = ku_dynamic_cast(sharedState); std::lock_guard lck{pandasSharedState->lock}; if (pandasSharedState->position >= pandasSharedState->numRows) { return false; @@ -63,7 +63,7 @@ std::unique_ptr initSharedState( throw RuntimeException("PandasScan called but GIL was already held!"); } // LCOV_EXCL_STOP - auto scanBindData = reinterpret_cast(input.bindData); + auto scanBindData = ku_dynamic_cast(input.bindData); return std::make_unique(scanBindData->numRows); } @@ -81,9 +81,9 @@ void pandasBackendScanSwitch( offset_t tableFunc( TableFuncInput& input, TableFuncOutput& output) { - auto pandasScanData = reinterpret_cast(input.bindData); - auto pandasLocalState = reinterpret_cast(input.localState); - auto pandasSharedState = reinterpret_cast(input.sharedState); + auto pandasScanData = ku_dynamic_cast(input.bindData); + auto pandasLocalState = ku_dynamic_cast(input.localState); + auto pandasSharedState = ku_dynamic_cast(input.sharedState); if (pandasLocalState->start >= pandasLocalState->end) { if (!sharedStateNext(input.bindData, pandasLocalState, input.sharedState)) { @@ -112,11 +112,12 @@ std::vector> PandasScanFunctionData::copyC } static double progressFunc(TableFuncSharedState* sharedState) { - auto pandasSharedState = reinterpret_cast(sharedState); + auto pandasSharedState = ku_dynamic_cast(sharedState); if (pandasSharedState->numRows == 0) { return 0.0; } - return static_cast(pandasSharedState->numReadRows) / pandasSharedState->numRows; + return static_cast(pandasSharedState->numReadRows) / + pandasSharedState->numRows; } static TableFunction getFunction() { diff --git a/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp b/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp index 520bb719cf..e566c413dd 100644 --- a/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp +++ b/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp @@ -86,12 +86,12 @@ static common::offset_t tableFunc( } double progressFunc(function::TableFuncSharedState* sharedState) { - PyArrowTableScanSharedState* state = - dynamic_cast(sharedState); + PyArrowTableScanSharedState* state = + ku_dynamic_cast(sharedState); if (state->chunks.size() == 0) { return 0.0; } - return static_cast(state->currentChunk) / state->chunks.size(); + return static_cast(state->currentChunk) / state->chunks.size(); } function::function_set PyArrowTableScanFunction::getFunctionSet() { From 76a2ec6a3c4c363b6715941ff80e94b0c6165103 Mon Sep 17 00:00:00 2001 From: MSebanc Date: Thu, 28 Mar 2024 15:41:49 -0700 Subject: [PATCH 8/8] format fixes I have read and agree to the CLA of the Kuzu repository. --- .../persistent/reader/csv/serial_csv_reader.h | 2 +- .../reader/csv/parallel_csv_reader.cpp | 6 ++++-- .../reader/csv/serial_csv_reader.cpp | 3 ++- .../reader/parquet/parquet_reader.cpp | 18 ++++++++++++------ 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h b/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h index c914a43454..78f1f26684 100644 --- a/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h +++ b/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h @@ -25,7 +25,7 @@ class SerialCSVReader final : public BaseCSVReader { struct SerialCSVScanSharedState final : public function::ScanFileSharedState { std::unique_ptr reader; uint64_t numColumns; - uint64_t totalReadSizeByFile; + uint64_t totalReadSizeByFile; common::CSVReaderConfig csvReaderConfig; SerialCSVScanSharedState(common::ReaderConfig readerConfig, uint64_t numRows, diff --git a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp index 11a8db8f05..1071ad77a4 100644 --- a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp @@ -117,8 +117,10 @@ void ParallelCSVScanSharedState::setFileComplete(uint64_t completedFileIdx) { static offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output) { auto& outputChunk = output.dataChunk; - auto parallelCSVLocalState = ku_dynamic_cast(input.localState); - auto parallelCSVSharedState = ku_dynamic_cast(input.sharedState); + auto parallelCSVLocalState = + ku_dynamic_cast(input.localState); + auto parallelCSVSharedState = + ku_dynamic_cast(input.sharedState); do { if (parallelCSVLocalState->reader != nullptr && parallelCSVLocalState->reader->hasMoreToRead()) { diff --git a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp index eeb5e3b4e3..74cdc5a9fa 100644 --- a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp @@ -69,7 +69,8 @@ void SerialCSVScanSharedState::initReader(main::ClientContext* context) { } static common::offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output) { - auto serialCSVScanSharedState = ku_dynamic_cast(input.sharedState); + auto serialCSVScanSharedState = + ku_dynamic_cast(input.sharedState); serialCSVScanSharedState->read(output.dataChunk); return output.dataChunk.state->selVector->selectedSize; } diff --git a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp index b605e53283..ab92d89394 100644 --- a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp +++ b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp @@ -51,8 +51,9 @@ bool ParquetReader::scanInternal(ParquetReaderScanState& state, DataChunk& resul state.currentGroup++; state.groupOffset = 0; - auto& trans = ku_dynamic_cast( - *state.thriftFileProto->getTransport()); + auto& trans = + ku_dynamic_cast( + *state.thriftFileProto->getTransport()); trans.ClearPrefetch(); state.currentGroupPrefetched = false; @@ -100,7 +101,8 @@ bool ParquetReader::scanInternal(ParquetReaderScanState& state, DataChunk& resul // Prefetch column-wise. for (auto colIdx = 0u; colIdx < result.getNumValueVectors(); colIdx++) { auto fileColIdx = colIdx; - auto rootReader = ku_dynamic_cast(state.rootReader.get()); + auto rootReader = + ku_dynamic_cast(state.rootReader.get()); rootReader->getChildReader(fileColIdx) ->registerPrefetch(trans, true /* lazy fetch */); @@ -169,7 +171,9 @@ void ParquetReader::scan(processor::ParquetReaderScanState& state, DataChunk& re void ParquetReader::initMetadata() { auto fileInfo = context->getVFSUnsafe()->openFile(filePath, O_RDONLY, context); auto proto = createThriftProtocol(fileInfo.get(), false); - auto& transport = ku_dynamic_cast(*proto->getTransport()); + auto& transport = + ku_dynamic_cast( + *proto->getTransport()); auto fileSize = transport.GetSize(); // LCOV_EXCL_START if (fileSize < 12) { @@ -605,8 +609,10 @@ static common::offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output if (input.localState == nullptr) { return 0; } - auto parquetScanLocalState = ku_dynamic_cast(input.localState); - auto parquetScanSharedState = ku_dynamic_cast(input.sharedState); + auto parquetScanLocalState = + ku_dynamic_cast(input.localState); + auto parquetScanSharedState = + ku_dynamic_cast(input.sharedState); do { parquetScanLocalState->reader->scan(*parquetScanLocalState->state, outputChunk); if (outputChunk.state->selVector->selectedSize > 0) {