diff --git a/src/include/processor/operator/copy/copy_node.h b/src/include/processor/operator/copy/copy_node.h index a6f2462ca53..ba765643bfc 100644 --- a/src/include/processor/operator/copy/copy_node.h +++ b/src/include/processor/operator/copy/copy_node.h @@ -33,36 +33,27 @@ class CopyNodeSharedState { bool hasLoggedWAL; }; -struct CopyNodeDataInfo { +struct CopyNodeInfo { DataPos rowIdxVectorPos; DataPos filePathVectorPos; std::vector dataColumnPoses; + const common::CopyDescription& copyDesc; + storage::NodeTable* table; + storage::RelsStore* relsStore; + catalog::Catalog* catalog; + storage::WAL* wal; }; class CopyNode : public Sink { public: - CopyNode(std::shared_ptr sharedState, CopyNodeDataInfo copyNodeDataInfo, - const common::CopyDescription& copyDesc, storage::NodeTable* table, - storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal, + CopyNode(std::shared_ptr sharedState, CopyNodeInfo copyNodeInfo, std::unique_ptr resultSetDescriptor, - std::unique_ptr child, uint32_t id, const std::string& paramsString) - : Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child), - id, paramsString}, - sharedState{std::move(sharedState)}, copyNodeDataInfo{std::move(copyNodeDataInfo)}, - copyDesc{copyDesc}, table{table}, relsStore{relsStore}, catalog{catalog}, wal{wal}, - rowIdxVector{nullptr}, filePathVector{nullptr} { - auto tableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(table->getTableID()); - copyStates.resize(tableSchema->getNumProperties()); - for (auto i = 0u; i < tableSchema->getNumProperties(); i++) { - auto& property = tableSchema->properties[i]; - copyStates[i] = std::make_unique(property.dataType); - } - } + std::unique_ptr child, uint32_t id, const std::string& paramsString); inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override { - rowIdxVector = resultSet->getValueVector(copyNodeDataInfo.rowIdxVectorPos).get(); - filePathVector = resultSet->getValueVector(copyNodeDataInfo.filePathVectorPos).get(); - for (auto& arrowColumnPos : copyNodeDataInfo.dataColumnPoses) { + rowIdxVector = resultSet->getValueVector(copyNodeInfo.rowIdxVectorPos).get(); + filePathVector = resultSet->getValueVector(copyNodeInfo.filePathVectorPos).get(); + for (auto& arrowColumnPos : copyNodeInfo.dataColumnPoses) { dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get()); } } @@ -71,9 +62,9 @@ class CopyNode : public Sink { if (!isCopyAllowed()) { throw common::CopyException("COPY commands can only be executed once on a table."); } - auto nodeTableSchema = - catalog->getReadOnlyVersion()->getNodeTableSchema(table->getTableID()); - sharedState->initialize(nodeTableSchema, wal->getDirectory()); + auto nodeTableSchema = copyNodeInfo.catalog->getReadOnlyVersion()->getNodeTableSchema( + copyNodeInfo.table->getTableID()); + sharedState->initialize(nodeTableSchema, copyNodeInfo.wal->getDirectory()); } void executeInternal(ExecutionContext* context) override; @@ -81,8 +72,8 @@ class CopyNode : public Sink { void finalize(ExecutionContext* context) override; inline std::unique_ptr clone() override { - return std::make_unique(sharedState, copyNodeDataInfo, copyDesc, table, relsStore, - catalog, wal, resultSetDescriptor->copy(), children[0]->clone(), id, paramsString); + return std::make_unique(sharedState, copyNodeInfo, resultSetDescriptor->copy(), + children[0]->clone(), id, paramsString); } protected: @@ -97,9 +88,9 @@ class CopyNode : public Sink { std::pair getFilePathAndRowIdxInFile(); private: - inline bool isCopyAllowed() { - auto nodesStatistics = table->getNodeStatisticsAndDeletedIDs(); - return nodesStatistics->getNodeStatisticsAndDeletedIDs(table->getTableID()) + inline bool isCopyAllowed() const { + auto nodesStatistics = copyNodeInfo.table->getNodeStatisticsAndDeletedIDs(); + return nodesStatistics->getNodeStatisticsAndDeletedIDs(copyNodeInfo.table->getTableID()) ->getNumTuples() == 0; } @@ -116,12 +107,7 @@ class CopyNode : public Sink { protected: std::shared_ptr sharedState; - CopyNodeDataInfo copyNodeDataInfo; - common::CopyDescription copyDesc; - storage::NodeTable* table; - storage::RelsStore* relsStore; - catalog::Catalog* catalog; - storage::WAL* wal; + CopyNodeInfo copyNodeInfo; common::ValueVector* rowIdxVector; common::ValueVector* filePathVector; std::vector dataColumnVectors; diff --git a/src/processor/mapper/map_copy.cpp b/src/processor/mapper/map_copy.cpp index 90cfdec13df..684ba018d74 100644 --- a/src/processor/mapper/map_copy.cpp +++ b/src/processor/mapper/map_copy.cpp @@ -73,10 +73,17 @@ std::unique_ptr PlanMapper::mapLogicalCopyNodeToPhysical(Logic auto copyNodeSharedState = std::make_shared(readFileSharedState->numRows, memoryManager); std::unique_ptr copyNode; - CopyNodeDataInfo copyNodeDataInfo{rowIdxVectorPos, filePathVectorPos, dataColumnPoses}; + CopyNodeInfo copyNodeDataInfo{ + rowIdxVectorPos, + filePathVectorPos, + dataColumnPoses, + copy->getCopyDescription(), + storageManager.getNodesStore().getNodeTable(copy->getTableID()), + &storageManager.getRelsStore(), + catalog, + storageManager.getWAL(), + }; copyNode = std::make_unique(copyNodeSharedState, copyNodeDataInfo, - copy->getCopyDescription(), storageManager.getNodesStore().getNodeTable(copy->getTableID()), - &storageManager.getRelsStore(), catalog, storageManager.getWAL(), std::make_unique(copy->getSchema()), std::move(readFile), getOperatorID(), copy->getExpressionsForPrinting()); auto outputExpressions = binder::expression_vector{copy->getOutputExpression()}; diff --git a/src/processor/operator/copy/copy_node.cpp b/src/processor/operator/copy/copy_node.cpp index b925cd0b78c..c51f04ef9bd 100644 --- a/src/processor/operator/copy/copy_node.cpp +++ b/src/processor/operator/copy/copy_node.cpp @@ -49,6 +49,22 @@ void CopyNodeSharedState::initializeColumns( } } +CopyNode::CopyNode(std::shared_ptr sharedState, CopyNodeInfo copyNodeInfo, + std::unique_ptr resultSetDescriptor, + std::unique_ptr child, uint32_t id, const std::string& paramsString) + : Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child), id, + paramsString}, + sharedState{std::move(sharedState)}, copyNodeInfo{std::move(copyNodeInfo)}, + rowIdxVector{nullptr}, filePathVector{nullptr} { + auto tableSchema = this->copyNodeInfo.catalog->getReadOnlyVersion()->getNodeTableSchema( + this->copyNodeInfo.table->getTableID()); + copyStates.resize(tableSchema->getNumProperties()); + for (auto i = 0u; i < tableSchema->getNumProperties(); i++) { + auto& property = tableSchema->properties[i]; + copyStates[i] = std::make_unique(property.dataType); + } +} + std::pair CopyNode::getStartAndEndRowIdx(common::vector_idx_t columnIdx) { auto startRowIdx = rowIdxVector->getValue(rowIdxVector->state->selVector->selectedPositions[0]); @@ -73,8 +89,8 @@ void CopyNode::executeInternal(kuzu::processor::ExecutionContext* context) { auto [startRowIdx, endRowIdx] = getStartAndEndRowIdx(0 /* columnIdx */); auto [filePath, startRowIdxInFile] = getFilePathAndRowIdxInFile(); for (auto i = 0u; i < sharedState->columns.size(); i++) { - auto columnChunk = - sharedState->columns[i]->createInMemColumnChunk(startRowIdx, endRowIdx, ©Desc); + auto columnChunk = sharedState->columns[i]->createInMemColumnChunk( + startRowIdx, endRowIdx, ©NodeInfo.copyDesc); columnChunk->copyArrowArray( *ArrowColumnVector::getArrowColumn(dataColumnVectors[i]), copyStates[i].get()); columnChunks.push_back(std::move(columnChunk)); @@ -85,20 +101,23 @@ void CopyNode::executeInternal(kuzu::processor::ExecutionContext* context) { } void CopyNode::finalize(kuzu::processor::ExecutionContext* context) { - auto tableID = table->getTableID(); + auto tableID = copyNodeInfo.table->getTableID(); if (sharedState->pkIndex) { sharedState->pkIndex->flush(); } for (auto& column : sharedState->columns) { column->saveToFile(); } - for (auto& relTableSchema : catalog->getAllRelTableSchemasContainBoundTable(tableID)) { - relsStore->getRelTable(relTableSchema->tableID) + for (auto& relTableSchema : + copyNodeInfo.catalog->getAllRelTableSchemasContainBoundTable(tableID)) { + copyNodeInfo.relsStore->getRelTable(relTableSchema->tableID) ->batchInitEmptyRelsForNewNodes(relTableSchema, sharedState->numRows); } - table->getNodeStatisticsAndDeletedIDs()->setNumTuplesForTable(tableID, sharedState->numRows); + copyNodeInfo.table->getNodeStatisticsAndDeletedIDs()->setNumTuplesForTable( + tableID, sharedState->numRows); auto outputMsg = StringUtils::string_format("{} number of tuples has been copied to table: {}.", - sharedState->numRows, catalog->getReadOnlyVersion()->getTableName(tableID).c_str()); + sharedState->numRows, + copyNodeInfo.catalog->getReadOnlyVersion()->getTableName(tableID).c_str()); FactorizedTableUtils::appendStringToTable( sharedState->table.get(), outputMsg, context->memoryManager); } @@ -197,8 +216,8 @@ void CopyNode::populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overf void CopyNode::logCopyWALRecord() { std::unique_lock xLck{sharedState->mtx}; if (!sharedState->hasLoggedWAL) { - wal->logCopyNodeRecord(table->getTableID()); - wal->flushAllPages(); + copyNodeInfo.wal->logCopyNodeRecord(copyNodeInfo.table->getTableID()); + copyNodeInfo.wal->flushAllPages(); sharedState->hasLoggedWAL = true; } }