diff --git a/src/common/task_system/task_scheduler.cpp b/src/common/task_system/task_scheduler.cpp index e7725c648d..66fff95455 100644 --- a/src/common/task_system/task_scheduler.cpp +++ b/src/common/task_system/task_scheduler.cpp @@ -69,8 +69,11 @@ void TaskScheduler::scheduleTaskAndWaitOrError( } auto scheduledTask = scheduleTask(task); while (!task->isCompleted()) { - if (context->clientContext->isTimeOutEnabled()) { + if (context != nullptr && context->clientContext->isTimeOutEnabled()) { interruptTaskIfTimeOutNoLock(context); + } else if (task->hasException()) { + // Interrupt tasks that errored, so other threads can stop working on them early. + context->clientContext->interrupt(); } std::this_thread::sleep_for( std::chrono::microseconds(THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS)); diff --git a/src/include/common/constants.h b/src/include/common/constants.h index f67666efe8..1338af8ddd 100644 --- a/src/include/common/constants.h +++ b/src/include/common/constants.h @@ -49,7 +49,7 @@ struct BufferPoolConstants { // `EvictionQueue::removeNonEvictableCandidates()` for more details. static constexpr uint64_t EVICTION_QUEUE_PURGING_INTERVAL = 1024; // The default max size for a VMRegion. - static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = (uint64_t)1 << 45; // (32TB) + static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = (uint64_t)1 << 43; // (8TB) static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 26; // (64MB) }; diff --git a/src/include/main/client_context.h b/src/include/main/client_context.h index a344e3f847..e6508e6114 100644 --- a/src/include/main/client_context.h +++ b/src/include/main/client_context.h @@ -40,8 +40,6 @@ class ClientContext { inline bool isTimeOutEnabled() const { return timeoutInMS != 0; } - inline uint64_t getTimeOutMS() const { return timeoutInMS; } - void startTimingIfEnabled(); private: diff --git a/src/include/storage/copier/copy_task.h b/src/include/storage/copier/copy_task.h index d222a1d561..19cfe12eb9 100644 --- a/src/include/storage/copier/copy_task.h +++ b/src/include/storage/copier/copy_task.h @@ -7,14 +7,13 @@ namespace storage { class CopyTask : public common::Task { public: - // CopyCSV tasks are intended to be tasks on which only 1 thread works. - CopyTask() : Task(1){}; + CopyTask(uint64_t numThreads) : Task(numThreads){}; }; template class ParameterizedCopyTask : public CopyTask { public: - ParameterizedCopyTask(F&& func) : f(func){}; + ParameterizedCopyTask(F&& func, uint64_t numThreads = 1) : CopyTask(numThreads), f(func){}; void run() override { f(); } private: @@ -25,15 +24,21 @@ class CopyTaskFactory { private: template - static std::shared_ptr createCopyTaskInternal(F&& f) { - return std::shared_ptr(new ParameterizedCopyTask(std::forward(f))); + static std::shared_ptr createCopyTaskInternal(F&& f, uint64_t numThreads = 1) { + return std::shared_ptr( + new ParameterizedCopyTask(std::forward(f), numThreads)); }; public: template static std::shared_ptr createCopyTask(F function, Args&&... args) { - return std::shared_ptr( - createCopyTaskInternal(std::move(std::bind(function, args...)))); + return createParallelCopyTask(1 /* num threads */, function, args...); + }; + + template + static std::shared_ptr createParallelCopyTask( + uint64_t numThreads, F function, Args&&... args) { + return createCopyTaskInternal(std::move(std::bind(function, args...)), numThreads); }; }; } // namespace storage diff --git a/src/include/storage/copier/node_copier.h b/src/include/storage/copier/node_copier.h index 92a97d3db8..d8a6df67a2 100644 --- a/src/include/storage/copier/node_copier.h +++ b/src/include/storage/copier/node_copier.h @@ -8,9 +8,117 @@ namespace kuzu { namespace storage { +using lock_t = std::unique_lock; + using set_element_func_t = std::function; +template +class NodeCopyMorsel { + +public: + static constexpr common::block_idx_t INVALID_BLOCK_IDX = -1ull; + +public: + NodeCopyMorsel(common::offset_t startOffset, common::block_idx_t blockIdx) + : startOffset{startOffset}, blockIdx{blockIdx} {}; + + virtual ~NodeCopyMorsel() = default; + + virtual const std::vector>& getArrowColumns() = 0; + + bool success() { return blockIdx != INVALID_BLOCK_IDX; } + +public: + common::offset_t startOffset; + common::block_idx_t blockIdx; +}; + +class CSVNodeCopyMorsel : public NodeCopyMorsel { + +public: + CSVNodeCopyMorsel(std::shared_ptr recordBatch, common::offset_t startOffset, + common::block_idx_t blockIdx) + : NodeCopyMorsel{startOffset, blockIdx}, recordBatch{recordBatch} {}; + + const std::vector>& getArrowColumns() override { + return recordBatch->columns(); + } + +private: + std::shared_ptr recordBatch; +}; + +class ParquetNodeCopyMorsel : public NodeCopyMorsel { + +public: + ParquetNodeCopyMorsel(std::shared_ptr currTable, common::offset_t startOffset, + common::block_idx_t blockIdx) + : NodeCopyMorsel{startOffset, blockIdx}, currTable{currTable} {}; + + const std::vector>& getArrowColumns() override { + return currTable->columns(); + } + +private: + std::shared_ptr currTable; +}; + +template +class NodeCopySharedState { + +public: + NodeCopySharedState( + std::string filePath, HashIndexBuilder* pkIndex, common::offset_t startOffset) + : filePath{filePath}, pkIndex{pkIndex}, startOffset{startOffset}, blockIdx{0} {}; + + virtual ~NodeCopySharedState() = default; + + virtual std::unique_ptr> getMorsel() = 0; + +public: + std::string filePath; + HashIndexBuilder* pkIndex; + common::offset_t startOffset; + +protected: + common::block_idx_t blockIdx; + std::mutex mtx; +}; + +template +class CSVNodeCopySharedState : public NodeCopySharedState { + +public: + CSVNodeCopySharedState(std::string filePath, HashIndexBuilder* pkIndex, + common::offset_t startOffset, + std::shared_ptr csvStreamingReader) + : NodeCopySharedState{filePath, pkIndex, startOffset}, + csvStreamingReader{move(csvStreamingReader)} {}; + std::unique_ptr> getMorsel() override; + +private: + std::shared_ptr csvStreamingReader; +}; + +template +class ParquetNodeCopySharedState : public NodeCopySharedState { + +public: + ParquetNodeCopySharedState(std::string filePath, HashIndexBuilder* pkIndex, + common::offset_t startOffset, uint64_t numBlocks, + std::unique_ptr parquetReader) + : NodeCopySharedState{filePath, pkIndex, startOffset}, + numBlocks{numBlocks}, parquetReader{std::move(parquetReader)} {}; + std::unique_ptr> getMorsel() override; + +public: + uint64_t numBlocks; + +private: + std::unique_ptr parquetReader; +}; + class NodeCopier : public TableCopier { public: @@ -23,7 +131,7 @@ class NodeCopier : public TableCopier { protected: void initializeColumnsAndLists() override; - void populateColumnsAndLists() override; + void populateColumnsAndLists(processor::ExecutionContext* executionContext) override; void saveToFile() override; @@ -36,13 +144,15 @@ class NodeCopier : public TableCopier { private: template - arrow::Status populateColumns(); + arrow::Status populateColumns(processor::ExecutionContext* executionContext); template - arrow::Status populateColumnsFromCSV(std::unique_ptr>& pkIndex); + arrow::Status populateColumnsFromCSV(processor::ExecutionContext* executionContext, + std::unique_ptr>& pkIndex); template - arrow::Status populateColumnsFromParquet(std::unique_ptr>& pkIndex); + arrow::Status populateColumnsFromParquet(processor::ExecutionContext* executionContext, + std::unique_ptr>& pkIndex); template static void putPropsOfLinesIntoColumns(InMemColumnChunk* columnChunk, NodeInMemColumn* column, @@ -51,21 +161,9 @@ class NodeCopier : public TableCopier { PageByteCursor& overflowCursor); // Concurrent tasks. - // Note that primaryKeyPropertyIdx is *NOT* the property ID of the primary key property. template - static arrow::Status batchPopulateColumnsTask(uint64_t primaryKeyPropertyIdx, uint64_t blockIdx, - uint64_t startOffset, HashIndexBuilder* pkIndex, NodeCopier* copier, - const std::vector>& batchColumns, std::string filePath); - - template - arrow::Status assignCopyCSVTasks(arrow::csv::StreamingReader* csvStreamingReader, - common::offset_t startOffset, std::string filePath, - std::unique_ptr>& pkIndex); - - template - arrow::Status assignCopyParquetTasks(parquet::arrow::FileReader* parquetReader, - common::offset_t startOffset, std::string filePath, - std::unique_ptr>& pkIndex); + static void batchPopulateColumnsTask(NodeCopySharedState* sharedState, + NodeCopier* copier, processor::ExecutionContext* executionContext); template static void appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile, diff --git a/src/include/storage/copier/npy_node_copier.h b/src/include/storage/copier/npy_node_copier.h index 9c7fae1e9c..edc95d5eef 100644 --- a/src/include/storage/copier/npy_node_copier.h +++ b/src/include/storage/copier/npy_node_copier.h @@ -27,9 +27,9 @@ class NpyNodeCopier : public NodeCopier { ~NpyNodeCopier() override = default; private: - void populateColumnsAndLists() override; + void populateColumnsAndLists(processor::ExecutionContext* executionContext) override; - void populateInMemoryStructures() override; + void populateInMemoryStructures(processor::ExecutionContext* executionContext) override; void initializeNpyReaders(); diff --git a/src/include/storage/copier/rel_copier.h b/src/include/storage/copier/rel_copier.h index de0c448cb0..f8de2a75af 100644 --- a/src/include/storage/copier/rel_copier.h +++ b/src/include/storage/copier/rel_copier.h @@ -25,7 +25,7 @@ class RelCopier : public TableCopier { void initializeColumnsAndLists() override; - void populateColumnsAndLists() override; + void populateColumnsAndLists(processor::ExecutionContext* executionContext) override; void saveToFile() override; diff --git a/src/include/storage/copier/table_copier.h b/src/include/storage/copier/table_copier.h index 48e26a3c25..b7514c310c 100644 --- a/src/include/storage/copier/table_copier.h +++ b/src/include/storage/copier/table_copier.h @@ -41,16 +41,16 @@ class TableCopier { virtual ~TableCopier() = default; - uint64_t copy(); + uint64_t copy(processor::ExecutionContext* executionContext); protected: virtual void initializeColumnsAndLists() = 0; - virtual void populateColumnsAndLists() = 0; + virtual void populateColumnsAndLists(processor::ExecutionContext* executionContext) = 0; virtual void saveToFile() = 0; - virtual void populateInMemoryStructures(); + virtual void populateInMemoryStructures(processor::ExecutionContext* executionContext); inline void updateTableStatistics() { tablesStatistics->setNumTuplesForTable(tableSchema->tableID, numRows); @@ -65,13 +65,8 @@ class TableCopier { arrow::Status initCSVReaderAndCheckStatus( std::shared_ptr& csv_streaming_reader, const std::string& filePath); - arrow::Status initCSVReader(std::shared_ptr& csv_streaming_reader, - const std::string& filePath); - arrow::Status initArrowReaderAndCheckStatus( - std::shared_ptr& ipc_reader, - const std::string& filePath); - arrow::Status initArrowReader(std::shared_ptr& ipc_reader, + arrow::Status initCSVReader(std::shared_ptr& csv_streaming_reader, const std::string& filePath); arrow::Status initParquetReaderAndCheckStatus( diff --git a/src/main/database.cpp b/src/main/database.cpp index 2a9836ce59..baafaf4c42 100644 --- a/src/main/database.cpp +++ b/src/main/database.cpp @@ -27,6 +27,7 @@ SystemConfig::SystemConfig(uint64_t bufferPoolSize_) { (double_t)std::min(systemMemSize, (std::uint64_t)UINTPTR_MAX)); } bufferPoolSize = bufferPoolSize_; + auto maxConcurrency = std::thread::hardware_concurrency(); maxNumThreads = std::thread::hardware_concurrency(); } diff --git a/src/processor/operator/copy/copy_node.cpp b/src/processor/operator/copy/copy_node.cpp index 2d401854a7..652e9042b5 100644 --- a/src/processor/operator/copy/copy_node.cpp +++ b/src/processor/operator/copy/copy_node.cpp @@ -15,11 +15,11 @@ uint64_t CopyNode::executeInternal( if (copyDescription.fileType == common::CopyDescription::FileType::NPY) { auto nodeCopier = std::make_unique(copyDescription, wal->getDirectory(), *taskScheduler, *catalog, tableID, nodesStatistics); - numNodesCopied = nodeCopier->copy(); + numNodesCopied = nodeCopier->copy(executionContext); } else { auto nodeCopier = std::make_unique(copyDescription, wal->getDirectory(), *taskScheduler, *catalog, tableID, nodesStatistics); - numNodesCopied = nodeCopier->copy(); + numNodesCopied = nodeCopier->copy(executionContext); } for (auto& relTableSchema : catalog->getAllRelTableSchemasContainBoundTable(tableID)) { relsStore.getRelTable(relTableSchema->tableID) diff --git a/src/processor/operator/copy/copy_rel.cpp b/src/processor/operator/copy/copy_rel.cpp index 0d35f98984..ea3ad53e09 100644 --- a/src/processor/operator/copy/copy_rel.cpp +++ b/src/processor/operator/copy/copy_rel.cpp @@ -9,9 +9,9 @@ namespace processor { uint64_t CopyRel::executeInternal( kuzu::common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) { - auto relCSVCopier = make_unique(copyDescription, wal->getDirectory(), *taskScheduler, + auto relCopier = make_unique(copyDescription, wal->getDirectory(), *taskScheduler, *catalog, nodesStore, executionContext->bufferManager, tableID, relsStatistics); - auto numRelsCopied = relCSVCopier->copy(); + auto numRelsCopied = relCopier->copy(executionContext); wal->logCopyRelRecord(tableID); return numRelsCopied; } diff --git a/src/storage/copier/node_copier.cpp b/src/storage/copier/node_copier.cpp index 0dc2c9e826..917f495ba9 100644 --- a/src/storage/copier/node_copier.cpp +++ b/src/storage/copier/node_copier.cpp @@ -10,6 +10,52 @@ using namespace kuzu::common; namespace kuzu { namespace storage { +template +std::unique_ptr> CSVNodeCopySharedState::getMorsel() { + lock_t lck{this->mtx}; + std::shared_ptr recordBatch; + auto result = csvStreamingReader->ReadNext(&recordBatch); + if (!result.ok()) { + throw common::CopyException( + "Error reading a batch of rows from CSV using Arrow CSVStreamingReader."); + } + if (recordBatch == NULL) { + return make_unique(move(recordBatch), INVALID_NODE_OFFSET, + NodeCopyMorsel::INVALID_BLOCK_IDX); + } + auto numRows = recordBatch->num_rows(); + this->startOffset += numRows; + this->blockIdx++; + return make_unique( + move(recordBatch), this->startOffset - numRows, this->blockIdx - 1); +} + +template +std::unique_ptr> ParquetNodeCopySharedState::getMorsel() { + lock_t lck{this->mtx}; + std::shared_ptr currTable; + if (this->blockIdx == numBlocks) { + return make_unique(move(currTable), INVALID_NODE_OFFSET, + NodeCopyMorsel::INVALID_BLOCK_IDX); + } + auto result = parquetReader->RowGroup(this->blockIdx)->ReadTable(&currTable); + if (!result.ok()) { + throw common::CopyException( + "Error reading a batch of rows from CSV using Arrow CSVStreamingReader."); + } + // TODO(Semih): I have not verified that, similar to CSV reading, that if ReadTable runs out of + // blocks to read, then it sets the currTable to NULL. + if (currTable == NULL) { + return make_unique(move(currTable), INVALID_NODE_OFFSET, + NodeCopyMorsel::INVALID_BLOCK_IDX); + } + auto numRows = currTable->num_rows(); + this->startOffset += numRows; + this->blockIdx++; + return make_unique( + move(currTable), this->startOffset - numRows, this->blockIdx - 1); +} + void NodeCopier::initializeColumnsAndLists() { logger->info("Initializing in memory columns."); for (auto& property : tableSchema->properties) { @@ -21,15 +67,15 @@ void NodeCopier::initializeColumnsAndLists() { logger->info("Done initializing in memory columns."); } -void NodeCopier::populateColumnsAndLists() { +void NodeCopier::populateColumnsAndLists(processor::ExecutionContext* executionContext) { arrow::Status status; auto primaryKey = reinterpret_cast(tableSchema)->getPrimaryKey(); switch (primaryKey.dataType.typeID) { case INT64: { - status = populateColumns(); + status = populateColumns(executionContext); } break; case STRING: { - status = populateColumns(); + status = populateColumns(executionContext); } break; default: { throw CopyException(StringUtils::string_format("Unsupported data type {} for the ID index.", @@ -51,7 +97,7 @@ void NodeCopier::saveToFile() { } template -arrow::Status NodeCopier::populateColumns() { +arrow::Status NodeCopier::populateColumns(processor::ExecutionContext* executionContext) { logger->info("Populating properties"); auto pkIndex = std::make_unique>(StorageUtils::getNodeIndexFName(this->outputDirectory, @@ -61,10 +107,10 @@ arrow::Status NodeCopier::populateColumns() { arrow::Status status; switch (copyDescription.fileType) { case CopyDescription::FileType::CSV: - status = populateColumnsFromCSV(pkIndex); + status = populateColumnsFromCSV(executionContext, pkIndex); break; case CopyDescription::FileType::PARQUET: - status = populateColumnsFromParquet(pkIndex); + status = populateColumnsFromParquet(executionContext, pkIndex); break; default: { throw CopyException(StringUtils::string_format("Unsupported file type {}.", @@ -78,26 +124,37 @@ arrow::Status NodeCopier::populateColumns() { } template -arrow::Status NodeCopier::populateColumnsFromCSV(std::unique_ptr>& pkIndex) { +arrow::Status NodeCopier::populateColumnsFromCSV( + processor::ExecutionContext* executionContext, std::unique_ptr>& pkIndex) { for (auto& filePath : copyDescription.filePaths) { std::shared_ptr csvStreamingReader; auto status = initCSVReaderAndCheckStatus(csvStreamingReader, filePath); - status = assignCopyCSVTasks( - csvStreamingReader.get(), fileBlockInfos.at(filePath).startOffset, filePath, pkIndex); throwCopyExceptionIfNotOK(status); + CSVNodeCopySharedState sharedState{ + filePath, pkIndex.get(), fileBlockInfos.at(filePath).startOffset, csvStreamingReader}; + taskScheduler.scheduleTaskAndWaitOrError( + CopyTaskFactory::createParallelCopyTask(executionContext->numThreads, + batchPopulateColumnsTask, &sharedState, this, executionContext), + executionContext); } return arrow::Status::OK(); } template arrow::Status NodeCopier::populateColumnsFromParquet( - std::unique_ptr>& pkIndex) { + processor::ExecutionContext* executionContext, std::unique_ptr>& pkIndex) { for (auto& filePath : copyDescription.filePaths) { - std::unique_ptr reader; - auto status = initParquetReaderAndCheckStatus(reader, filePath); - status = assignCopyParquetTasks( - reader.get(), fileBlockInfos.at(filePath).startOffset, filePath, pkIndex); + std::unique_ptr parquetReader; + auto status = initParquetReaderAndCheckStatus(parquetReader, filePath); throwCopyExceptionIfNotOK(status); + ParquetNodeCopySharedState sharedState{filePath, pkIndex.get(), + fileBlockInfos.at(filePath).startOffset, fileBlockInfos.at(filePath).numBlocks, + std::move(parquetReader)}; + taskScheduler.scheduleTaskAndWaitOrError( + CopyTaskFactory::createParallelCopyTask(executionContext->numThreads, + batchPopulateColumnsTask, &sharedState, this, + executionContext), + executionContext); } return arrow::Status::OK(); } @@ -116,36 +173,45 @@ void NodeCopier::populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* ove } template -arrow::Status NodeCopier::batchPopulateColumnsTask(uint64_t primaryKeypropertyID, uint64_t blockIdx, - uint64_t startOffset, HashIndexBuilder* pkIndex, NodeCopier* copier, - const std::vector>& batchArrays, std::string filePath) { - copier->logger->trace("Start: path={0} blkIdx={1}", filePath, blockIdx); - auto numLinesInCurBlock = copier->fileBlockInfos.at(filePath).numLinesPerBlock[blockIdx]; +void NodeCopier::batchPopulateColumnsTask(NodeCopySharedState* sharedState, + NodeCopier* copier, processor::ExecutionContext* executionContext) { + while (true) { + if (executionContext->clientContext->isInterrupted()) { + throw common::InterruptException{}; + } + auto result = sharedState->getMorsel(); + if (!result->success()) { + break; + } + copier->logger->trace( + "Start: path={0} blkIdx={1}", sharedState->filePath, result->blockIdx); + auto numLinesInCurBlock = + copier->fileBlockInfos.at(sharedState->filePath).numLinesPerBlock[result->blockIdx]; + // Create a column chunk for tuples within the [StartOffset, endOffset] range. + auto endOffset = result->startOffset + numLinesInCurBlock - 1; + std::unordered_map> chunks; + for (auto& [propertyIdx, column] : copier->columns) { + chunks[propertyIdx] = std::make_unique(result->startOffset, endOffset, + column->getNumBytesForElement(), column->getNumElementsInAPage()); + } + std::vector overflowCursors(copier->tableSchema->getNumProperties()); + for (auto& [propertyIdx, column] : copier->columns) { + putPropsOfLinesIntoColumns(chunks.at(propertyIdx).get(), column.get(), + result->getArrowColumns()[propertyIdx], result->startOffset, numLinesInCurBlock, + copier->copyDescription, overflowCursors[propertyIdx]); + } + // Flush each page within the [StartOffset, endOffset] range. + for (auto& [propertyIdx, column] : copier->columns) { + column->flushChunk(chunks[propertyIdx].get(), result->startOffset, endOffset); + } - // Create a column chunk for tuples within the [StartOffset, endOffset] range. - auto endOffset = startOffset + numLinesInCurBlock - 1; - std::unordered_map> chunks; - for (auto& [propertyID, column] : copier->columns) { - chunks[propertyID] = std::make_unique(startOffset, endOffset, - column->getNumBytesForElement(), column->getNumElementsInAPage()); - } - std::vector overflowCursors(copier->tableSchema->getNumProperties()); - for (auto& [propertyID, column] : copier->columns) { - putPropsOfLinesIntoColumns(chunks.at(propertyID).get(), column.get(), - batchArrays[propertyID], startOffset, numLinesInCurBlock, copier->copyDescription, - overflowCursors[propertyID]); + auto primaryKeyPropertyIdx = + reinterpret_cast(copier->tableSchema)->primaryKeyPropertyID; + auto pkColumn = copier->columns.at(primaryKeyPropertyIdx).get(); + populatePKIndex(chunks[primaryKeyPropertyIdx].get(), pkColumn->getInMemOverflowFile(), + pkColumn->getNullMask(), sharedState->pkIndex, result->startOffset, numLinesInCurBlock); + copier->logger->info("End: path={0} blkIdx={1}", sharedState->filePath, result->blockIdx); } - // Flush each page within the [StartOffset, endOffset] range. - for (auto& [propertyID, column] : copier->columns) { - column->flushChunk(chunks[propertyID].get(), startOffset, endOffset); - } - - auto pkColumn = copier->columns.at(primaryKeypropertyID).get(); - populatePKIndex(chunks[primaryKeypropertyID].get(), pkColumn->getInMemOverflowFile(), - pkColumn->getNullMask(), pkIndex, startOffset, numLinesInCurBlock); - - copier->logger->trace("End: path={0} blkIdx={1}", filePath, blockIdx); - return arrow::Status::OK(); } template @@ -164,61 +230,6 @@ void NodeCopier::putPropsOfLinesIntoColumns(InMemColumnChunk* columnChunk, NodeI } } -template -arrow::Status NodeCopier::assignCopyCSVTasks(arrow::csv::StreamingReader* csvStreamingReader, - offset_t startOffset, std::string filePath, std::unique_ptr>& pkIndex) { - auto it = csvStreamingReader->begin(); - auto endIt = csvStreamingReader->end(); - std::shared_ptr currBatch; - block_idx_t blockIdx = 0; - while (it != endIt) { - for (int i = 0; i < common::CopyConstants::NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH; ++i) { - if (it == endIt) { - break; - } - ARROW_ASSIGN_OR_RAISE(currBatch, *it); - taskScheduler.scheduleTask( - CopyTaskFactory::createCopyTask(batchPopulateColumnsTask, - reinterpret_cast(tableSchema)->primaryKeyPropertyID, blockIdx, - startOffset, pkIndex.get(), this, currBatch->columns(), filePath)); - startOffset += currBatch->num_rows(); - ++blockIdx; - ++it; - } - taskScheduler.waitUntilEnoughTasksFinish( - CopyConstants::MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE); - } - taskScheduler.waitAllTasksToCompleteOrError(); - return arrow::Status::OK(); -} - -template -arrow::Status NodeCopier::assignCopyParquetTasks(parquet::arrow::FileReader* parquetReader, - common::offset_t startOffset, std::string filePath, - std::unique_ptr>& pkIndex) { - auto numBlocks = fileBlockInfos.at(filePath).numBlocks; - auto blockIdx = 0u; - std::shared_ptr currTable; - while (blockIdx < numBlocks) { - for (int i = 0; i < common::CopyConstants::NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH; ++i) { - if (blockIdx == numBlocks) { - break; - } - ARROW_RETURN_NOT_OK(parquetReader->RowGroup(blockIdx)->ReadTable(&currTable)); - taskScheduler.scheduleTask( - CopyTaskFactory::createCopyTask(batchPopulateColumnsTask, - reinterpret_cast(tableSchema)->primaryKeyPropertyID, blockIdx, - startOffset, pkIndex.get(), this, currTable->columns(), filePath)); - startOffset += currTable->num_rows(); - ++blockIdx; - } - taskScheduler.waitUntilEnoughTasksFinish( - common::CopyConstants::MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE); - } - taskScheduler.waitAllTasksToCompleteOrError(); - return arrow::Status::OK(); -} - template<> void NodeCopier::appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile, common::offset_t offset, HashIndexBuilder* pkIndex) { diff --git a/src/storage/copier/npy_node_copier.cpp b/src/storage/copier/npy_node_copier.cpp index c7ecb90a1b..98112d6c02 100644 --- a/src/storage/copier/npy_node_copier.cpp +++ b/src/storage/copier/npy_node_copier.cpp @@ -10,11 +10,11 @@ using namespace kuzu::common; namespace kuzu { namespace storage { -void NpyNodeCopier::populateInMemoryStructures() { +void NpyNodeCopier::populateInMemoryStructures(processor::ExecutionContext* executionContext) { initializeNpyReaders(); initializeColumnsAndLists(); validateNpyReaders(); - populateColumnsAndLists(); + populateColumnsAndLists(executionContext); } void NpyNodeCopier::initializeNpyReaders() { @@ -72,7 +72,7 @@ void NpyNodeCopier::validateNpyReaders() { } } -void NpyNodeCopier::populateColumnsAndLists() { +void NpyNodeCopier::populateColumnsAndLists(processor::ExecutionContext* executionContext) { logger->info("Populating properties"); auto primaryKey = reinterpret_cast(tableSchema)->getPrimaryKey(); if (primaryKey.dataType.typeID != INT64) { diff --git a/src/storage/copier/rel_copier.cpp b/src/storage/copier/rel_copier.cpp index 4ffacc0363..97b5cb3cf8 100644 --- a/src/storage/copier/rel_copier.cpp +++ b/src/storage/copier/rel_copier.cpp @@ -56,7 +56,7 @@ void RelCopier::initializeColumnsAndLists() { } } -void RelCopier::populateColumnsAndLists() { +void RelCopier::populateColumnsAndLists(processor::ExecutionContext* executionContext) { populateAdjColumnsAndCountRelsInAdjLists(); if (adjListsPerDirection[FWD] != nullptr || adjListsPerDirection[BWD] != nullptr) { initAdjListsHeaders(); @@ -126,6 +126,7 @@ void RelCopier::initializeLists(RelDirection relDirection) { } void RelCopier::initAdjListsHeaders() { + // TODO(Semih): Schedule one at a time and wait. logger->debug("Initializing AdjListHeaders for rel {}.", tableSchema->tableName); for (auto relDirection : REL_DIRECTIONS) { if (!reinterpret_cast(tableSchema) @@ -143,6 +144,7 @@ void RelCopier::initAdjListsHeaders() { } void RelCopier::initListsMetadata() { + // TODO(Semih): Schedule one at a time and wait. logger->debug( "Initializing adjLists and propertyLists metadata for rel {}.", tableSchema->tableName); for (auto relDirection : REL_DIRECTIONS) { @@ -315,6 +317,7 @@ void RelCopier::sortAndCopyOverflowValues() { 1; auto numBuckets = numNodes / 256; numBuckets += (numNodes % 256 != 0); + // TODO(Semih): Schedule one at a time. for (auto& property : tableSchema->properties) { if (property.dataType.typeID == STRING || property.dataType.typeID == VAR_LIST) { offset_t offsetStart = 0, offsetEnd = 0; diff --git a/src/storage/copier/table_copier.cpp b/src/storage/copier/table_copier.cpp index e93885d6d8..d88f93607a 100644 --- a/src/storage/copier/table_copier.cpp +++ b/src/storage/copier/table_copier.cpp @@ -19,20 +19,20 @@ TableCopier::TableCopier(CopyDescription& copyDescription, std::string outputDir tableSchema{catalog.getReadOnlyVersion()->getTableSchema(tableID)}, tablesStatistics{ tablesStatistics} {} -uint64_t TableCopier::copy() { +uint64_t TableCopier::copy(processor::ExecutionContext* executionContext) { logger->info(StringUtils::string_format("Copying {} file to table {}.", CopyDescription::getFileTypeName(copyDescription.fileType), tableSchema->tableName)); - populateInMemoryStructures(); + populateInMemoryStructures(executionContext); updateTableStatistics(); saveToFile(); logger->info("Done copying file to table {}.", tableSchema->tableName); return numRows; } -void TableCopier::populateInMemoryStructures() { +void TableCopier::populateInMemoryStructures(processor::ExecutionContext* executionContext) { countNumLines(copyDescription.filePaths); initializeColumnsAndLists(); - populateColumnsAndLists(); + populateColumnsAndLists(executionContext); } void TableCopier::countNumLines(const std::vector& filePaths) { @@ -137,24 +137,6 @@ arrow::Status TableCopier::initCSVReader( return arrow::Status::OK(); } -arrow::Status TableCopier::initArrowReaderAndCheckStatus( - std::shared_ptr& ipc_reader, const std::string& filePath) { - auto status = initArrowReader(ipc_reader, filePath); - throwCopyExceptionIfNotOK(status); - return status; -} - -arrow::Status TableCopier::initArrowReader( - std::shared_ptr& ipc_reader, const std::string& filePath) { - std::shared_ptr infile; - - ARROW_ASSIGN_OR_RAISE( - infile, arrow::io::ReadableFile::Open(filePath, arrow::default_memory_pool())); - - ARROW_ASSIGN_OR_RAISE(ipc_reader, arrow::ipc::RecordBatchFileReader::Open(infile)); - return arrow::Status::OK(); -} - arrow::Status TableCopier::initParquetReaderAndCheckStatus( std::unique_ptr& reader, const std::string& filePath) { auto status = initParquetReader(reader, filePath);